{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n# ASM1: dependency graph\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Copyright (C) 2024 Juan Pablo Carbajal\n# Copyright (C) 2024 Mariane Yvonne Schneider\n#\n# This program is free software; you can redistribute it and/or modify\n# it under the terms of the GNU General Public License as published by\n# the Free Software Foundation; either version 3 of the License, or\n# (at your option) any later version.\n#\n# This program is distributed in the hope that it will be useful,\n# but WITHOUT ANY WARRANTY; without even the implied warranty of\n# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n# GNU General Public License for more details.\n#\n# You should have received a copy of the GNU General Public License\n# along with this program. If not, see .\n\n# Author: Juan Pablo Carbajal \nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom sympy import *\nfrom pint import UnitRegistry\n\ntry:\n import dsafeatures\nexcept ModuleNotFoundError:\n import sys\n import os\n\n sys.path.insert(0, os.path.abspath(\"../..\"))\n\nimport dsafeatures.odemodel as m\nimport dsafeatures.symtools as st\n\nfrom dsafeatures.printing import *\n\ninit_printing()\n\nureg = UnitRegistry() # units registry\n\nplt.style.use('figures.mplstyle')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the model\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "asm = m.ODEModel.from_csv(name=\"ASM1\")\n# States\nX = asm.states\n# Process rates\nr_sym, r = asm.rates, asm.rates_expr\n# Matrix\nM = asm.coef_matrix\n# State derivatives\ndotX, ddotX = asm.derivative(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "g = asm.graph\n\n# node lists\nrate_n, st_n, iso_n, misc_n, in_n, meas_n = [], [], [], [], [], []\nfor n, d in g.nodes(data=True):\n match d[\"layer\"]:\n case \"rate\":\n rate_n.append(n)\n case \"state\":\n st_n.append(n)\n case \"isolated\":\n iso_n.append(n)\n case \"other\":\n misc_n.append(n)\n case \"input\":\n in_n.append(n)\n case \"measurement\":\n meas_n.append(n)\n\n# nodes properties for visualization\nvis_attr = {\"isolated\": dict(color=\"grey\", size=1600),\n \"rate\": dict(color=\"gold\", size=600),\n \"measurement\": dict(color=\"pink\", size=1600),\n \"state\": dict(color=\"red\", size=1600),\n \"input\": dict(color=\"violet\", size=1600),\n \"other\": dict(color=\"black\", size=800),\n }\n\n# set initial positions\npos = st.nx.multipartite_layout(g, \"layer\")\n# organize core\ncore = rate_n + st_n + meas_n\npos_core = st.nx.shell_layout(g.subgraph(core).to_undirected(), [meas_n, rate_n, st_n], center=[0, 0])\npos.update(pos_core)\nho, aN, hoN, gaO2, da, ghAn, dh, ghO2 = rate_n\nXcbN, _ = asm.state_by_symbol_name(\"X_cbN\")\nXcb, _ = asm.state_by_symbol_name(\"X_cb\")\nNHx, _ = asm.state_by_symbol_name(\"S_NHx\")\nXa, _ = asm.state_by_symbol_name(\"X_a\")\nXh, _ = asm.state_by_symbol_name(\"X_h\")\nSb, _ = asm.state_by_symbol_name(\"S_b\")\nSbN, _ = asm.state_by_symbol_name(\"S_bN\")\nO2, _ = asm.state_by_symbol_name(\"S_O2\")\nN2, _ = asm.state_by_symbol_name(\"S_N2\")\nNOx, _ = asm.state_by_symbol_name(\"S_NOx\")\nAlk, _ = asm.state_by_symbol_name(\"S_alk\")\nXnbe, _ = asm.state_by_symbol_name(\"X_nb,e\")\n\npos[hoN] = [0.17, 0.64]\npos[NOx] = [0.17, 0.29]\npos[XcbN] = [-0.17, 0.8]\npos[Xh] = [0, 0]\npos[O2] = [0, 0.3]\npos[N2] = [0.3, 0.31]\npos[Xcb] = [-0.25, 0.2]\npos[Sb] = [0.07, -0.2]\npos[Xa] = [-0.4, -0.15]\npos[Xnbe] = [-0.3, 0.33]\npos[SbN] = [0.4, 0.6]\npos[ghO2] = [-0.2, -0.3]\npos[ghAn] = [0.28, -0.06]\npos[NHx] = [0.07, -0.4]\npos[ho] = [-0.3, -0.032]\npos[aN] = [0.3, -0.4]\npos[Alk] = [0.2, -0.7]\npos[dh] = [-0.15, 0.45]\npos[da] = [-0.5, 0.17]\n\nfor n in iso_n:\n pos[n][1] -= 0.4\n\n# Randon positions using graphviz\n# pos = st.nx.nx_agraph.graphviz_layout(g.to_undirected())\nst.nx.draw(g, pos=pos,\n node_color=[vis_attr[d[\"layer\"]][\"color\"] for _, d in g.nodes(data=True)],\n node_size=[vis_attr[d[\"layer\"]][\"size\"] for _, d in g.nodes(data=True)],\n with_labels=True,\n labels={x: latex(x, mode=\"inline\") for x in g.nodes},\n arrowsize=20,\n )\nplt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph without processes\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dotX_s = dotX.subs(zip(r_sym, r))\ngr = st.dependency_graph(dotX_s, vars=list(X), eqs_names=list(X))\nattr = {}\nfor x in gr.nodes:\n if gr.degree[x] == 0:\n # isolated variable\n lyr = \"isolated\"\n elif gr.in_degree[x] == 0:\n # input variable\n lyr = \"input\"\n elif gr.out_degree[x] == 0:\n # measurement\n lyr = \"measurement\"\n elif x in r_n:\n # rate\n lyr = \"rate\"\n elif x in X:\n lyr = \"state\"\n else:\n lyr = \"other\"\n attr[x] = dict(layer=lyr)\nst.nx.set_node_attributes(gr, attr)\n\ngrs = gr.copy()\ngrs.remove_nodes_from(n for n, d in gr.nodes(data=True) if d[\"layer\"] != \"state\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plt.figure()\nsz = [vis_attr[d[\"layer\"]][\"size\"] for _, d in grs.nodes(data=True)]\npos_ = st.nx.nx_agraph.graphviz_layout(grs.to_undirected())\nst.nx.draw_networkx_nodes(grs, pos_, node_size=sz,\n node_color=[vis_attr[d[\"layer\"]][\"color\"] for _, d in grs.nodes(data=True)])\nst.nx.draw_networkx_labels(grs, pos=pos_,\n labels={x: latex(x, mode=\"inline\") for x in grs.nodes},\n )\nst.nx.draw_networkx_edges(grs, pos_, connectionstyle=\"arc3,rad=0.2\", node_size=sz,\n arrowsize=20,\n )\n\n\n# st.nx.draw(grs, pos=pos,\n# node_color=[vis_attr[d[\"layer\"]][\"color\"] for _, d in grs.nodes(data=True)],\n# node_size=[vis_attr[d[\"layer\"]][\"size\"] for _, d in grs.nodes(data=True)],\n# with_labels=True,\n# labels={x: latex(x, mode=\"inline\") for x in grs.nodes},\n# arrowsize=20,\n# )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model components\nSymbols and process rates name\n*******************************\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def to_latex(x):\n if \"_\" in x:\n p = x.split(\"_\", 1)\n x = fr\"{p[0]}_\\text{{{p[1]}}}\"\n return fr\"${x}$\"\n\n\nfor c in (\"states\", \"processrates\"):\n info_ = asm.component[c].rename(columns={\"description\": \"Description\"})\n info_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\n info_[\"Other name\"] = info_.name.apply(to_latex)\n print(info_[[\"This work\", \"Other name\", \"Description\"]].to_latex(index=False))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Description of active states\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = asm.component[\"states\"].set_index(\"sympy\")\ninfo_ = [fr\"\\item[{latex(x, mode='inline')}] {info_.loc[x].description}\" for x in X if x in core]\ninfo_ = [r\"\\begin{itemize}\"] + info_ + [r\"\\end{itemize}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parameters\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = asm.component[\"parameters\"][[\"sympy\", \"value\", \"name\", \"unit\", \"description\"]].copy()\ninfo_.rename(columns={\"description\": \"Description\",\n \"value\": \"Value\",\n \"unit\": \"Units\"}, inplace=True)\ninfo_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\ninfo_[\"Other name\"] = info_.name.apply(to_latex)\ninfo_[\"Value\"] = info_.Value.apply(lambda x: latex(x, mode='inline'))\n\n\ndef units_to_latex(x):\n try:\n y = ureg.parse_units(x)\n if y.dimensionless:\n # handle unit kinds, WIP in pint https://github.com/hgrecco/pint/pull/1967\n return x\n y = f\"{y:Lx}\"\n return y.replace(\"[\", \"\").replace(\"]\", \"\")\n except AssertionError:\n return \"$-$\"\n\n\ninfo_[\"Units\"] = info_.Units.apply(units_to_latex)\ninfo_[\"Description\"] = info_.Description.str.replace(r\"\\(([\\\\\\w]+)[_]([\\\\\\w]+)\\)\", r\"($\\1_\\\\text{\\2}$)\", regex=True)\ninfo_.sort_values(by=\"This work\", inplace=True)\nprint(info_[[\"This work\", \"Other name\", \"Value\", \"Units\", \"Description\"]].to_latex(index=False))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### States vector\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(latex(X))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Matrix\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "M_entries = [Symbol(fr\"m_{{{i}\\,{j.name}}}\") for i in X for j in r_sym]\nMs = Matrix(np.asarray(M_entries).reshape(*M.shape))\nfor i in range(M.shape[0]):\n for j in range(M.shape[1]):\n if isinstance(M[i, j], Number):\n Ms[i, j] = M[i, j]\nprint(latex(Ms))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = [fr\"{latex(ms)} &\\coloneqq {latex(m)} \\\\\" for ms, m in zip(Ms, M) if not isinstance(m, Number)]\ninfo_ = [r\"\\begin{align}\"] + info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Rates vector\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(latex(r_sym))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = [fr\"{latex(r_)} &\\coloneqq {latex(f_)} \\\\\" for r_, f_ in zip(r_sym, r)]\ninfo_ = [r\"\\begin{align}\"] + info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ODE\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def symb_to_dot(x):\n if \"_\" in x.name:\n p = x.name.split(\"_\", 1)\n x = fr\"\\dot{{{p[0]}}}_{p[1]}\"\n else:\n x = fr\"\\dot{{{x.name}}}\"\n return x\n\n\ndotX_sym = Matrix([Symbol(symb_to_dot(x)) for x in X])\ndotX_expr = Ms * Matrix([Symbol(x_.name) for x_ in r_sym])\nsub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\nfor layer in sub_order:\n info_ = []\n for x_, dx_, f_ in zip(X, dotX_sym, dotX_expr):\n if g.nodes[x_][\"layer\"] == layer:\n info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n if info_:\n info_ = [r\"\\begin{align}\"] + info_ + [fr\"\\label{{eq:asm1-ode-{layer}}}\\end{{align}}\"]\n print(\"\\n\".join(info_))\n\n# info_ = []\n# order = [0] * len(X)\n# sub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\n# for idx, x_dx_f_ in enumerate(zip(X, dotX_sym, dotX_expr)):\n# x_, dx_, f_ = x_dx_f_\n# order[idx] = sub_order.index(g.nodes[x_][\"layer\"])\n# info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n# info_ = sorted(info_, key=lambda x_: order[info_.index(x_)])\n# info_ = [r\"\\begin{align}\"]+ info_ + [r\"\\end{align}\"]\n# print(\"\\n\".join(info_))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.2" } }, "nbformat": 4, "nbformat_minor": 0 }