{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n# ASM3: dependency graph\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Copyright (C) 2024 Juan Pablo Carbajal\n# Copyright (C) 2024 Mariane Yvonne Schneider\n#\n# This program is free software; you can redistribute it and/or modify\n# it under the terms of the GNU General Public License as published by\n# the Free Software Foundation; either version 3 of the License, or\n# (at your option) any later version.\n#\n# This program is distributed in the hope that it will be useful,\n# but WITHOUT ANY WARRANTY; without even the implied warranty of\n# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n# GNU General Public License for more details.\n#\n# You should have received a copy of the GNU General Public License\n# along with this program. If not, see .\n\n# Author: Juan Pablo Carbajal \nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom sympy import *\nfrom pint import UnitRegistry\n\ntry:\n import dsafeatures\nexcept ModuleNotFoundError:\n import sys\n import os\n\n sys.path.insert(0, os.path.abspath(\"../..\"))\n\nimport dsafeatures.odemodel as m\nimport dsafeatures.symtools as st\n\nfrom dsafeatures.printing import *\ninit_printing()\n\nureg = UnitRegistry() # units registry" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the model\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "asm = m.ODEModel.from_csv(name=\"ASM3\")\n# States\nX = asm.states\n# Process rates\nr_sym, r = asm.rates, asm.rates_expr\n# Matrix\nM = asm.coef_matrix\n# State derivatives\ndotX = asm.derivative(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Graph\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "g = asm.graph\n\n# nodes properties for visualization\nvis_attr = {\"isolated\": dict(color=\"grey\", size=1600),\n \"rate\": dict(color=\"gold\", size=600),\n \"measurement\": dict(color=\"pink\", size=1600),\n \"state\": dict(color=\"red\", size=1600),\n \"input\": dict(color=\"violet\", size=1600),\n \"other\": dict(color=\"black\", size=800),\n }\n\n# set initial positions\npos = st.nx.multipartite_layout(g, \"layer\")\np_ = np.asarray(list(pos.values()))\ncnt = np.asarray([p_.max(axis=0)[0], p_.mean(axis=0)[1]])\n# organize core\ncore = [n for n, d in g.nodes(data=True) if d[\"layer\"] in [\"rate\", \"state\"]]\npos_core = st.nx.spring_layout(g.subgraph(core).to_undirected(), k=0.4, iterations=5000, center=cnt, seed=1234321)\npos.update(pos_core)\n# orbit measurements and inputs\niso = [n for n, d in g.nodes(data=True) if d[\"layer\"] in [\"isolated\", \"other\"]]\npos_meas = st.nx.spring_layout(g.to_undirected(), pos=pos, fixed=core+iso, k=0.5, iterations=1000)\npos.update(pos_meas)\n# move isolated and other\nmx = min(c[0] for c in pos.values())\nfor n in iso:\n pos[n][0] = mx\n\n#N2, _ = asm.state_by_name(\"S_N2\")\n#pos[N2] -= pos[N2] * 0.4\n#NE, _ = asm.state_by_name(\"X_UE\")\n#pos[NE] -= pos[NE] * 0.2\n#Xh, _ = asm.state_by_name(\"X_OHO\")\n#pos[Xh] += [0.2, -0.15]\nr_n = [n for n, d in g.nodes(data=True) if d[\"layer\"]==\"rate\"]\npos[r_n[1]][1] += 0.25\nst.nx.draw(g, pos=pos,\n node_color=[vis_attr[d[\"layer\"]][\"color\"] for _, d in g.nodes(data=True)],\n node_size=[vis_attr[d[\"layer\"]][\"size\"] for _, d in g.nodes(data=True)],\n with_labels=True,\n labels={x: latex(x, mode=\"inline\") for x in g.nodes},\n arrowsize=20,\n )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model components\nSymbols and process rates name\n*******************************\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def to_latex(x):\n if \"_\" in x:\n p = x.split(\"_\", 1)\n x = fr\"{p[0]}_\\text{{{p[1]}}}\"\n return fr\"${x}$\"\n\nfor c in (\"states\", \"processrates\"):\n info_ = asm.component[c].rename(columns={\"description\": \"Description\"})\n info_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\n info_[\"Other name\"] = info_.name.apply(to_latex)\n print(info_[[\"This work\", \"Other name\", \"Description\"]].to_latex(index=False))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Description of active states\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = asm.component[\"states\"].set_index(\"sympy\")\ninfo_ = [f\"\\item[{latex(x, mode='inline')}] {info_.loc[x].description}\" for x in X if x in core]\ninfo_ = [r\"\\begin{itemize}\"] + info_ + [r\"\\end{itemize}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parameters\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = asm.component[\"parameters\"][[\"sympy\", \"value\", \"name\", \"unit\", \"description\"]].copy()\ninfo_.rename(columns={\"description\": \"Description\",\n \"value\": \"Value\",\n \"unit\": \"Units\"}, inplace=True)\ninfo_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\ninfo_[\"Other name\"] = info_.name.apply(to_latex)\ninfo_[\"Value\"] = info_.Value.apply(lambda x: latex(x, mode='inline'))\n\n\ndef units_to_latex(x):\n try:\n y = ureg.parse_units(x)\n if y.dimensionless:\n # handle unit kinds, WIP in pint https://github.com/hgrecco/pint/pull/1967\n return x\n y = f\"{y:Lx}\"\n return y.replace(\"[\",\"\").replace(\"]\",\"\")\n except AssertionError:\n return \"$-$\"\n\ninfo_[\"Units\"] = info_.Units.apply(units_to_latex)\ninfo_[\"Description\"] = info_.Description.str.replace(r\"\\(([\\\\\\w]+)[_]([\\\\\\w]+)\\)\", r\"($\\1_\\\\text{\\2}$)\", regex=True)\ninfo_.sort_values(by=\"This work\", inplace=True)\nprint(info_[[\"This work\", \"Other name\", \"Value\", \"Units\", \"Description\"]].to_latex(index=False))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### States vector\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(latex(X))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Matrix\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "M_entries = [Symbol(fr\"m_{{{i}\\,{j.name}}}\") for i in X for j in r_sym]\nMs = Matrix(np.asarray(M_entries).reshape(*M.shape))\nfor i in range(M.shape[0]):\n for j in range(M.shape[1]):\n if isinstance(M[i, j], Number):\n Ms[i, j] = M[i, j]\nprint(latex(Ms))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = [fr\"{latex(ms)} &\\coloneqq {latex(m)} \\\\\" for ms, m in zip(Ms, M) if not isinstance(m, Number)]\ninfo_ = [r\"\\begin{align}\"]+info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Rates vector\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "print(latex(r_sym))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "info_ = [fr\"{latex(r_)} &\\coloneqq {latex(f_)} \\\\\" for r_, f_ in zip(r_sym, r)]\ninfo_ = [r\"\\begin{align}\"]+info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### ODE\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def symb_to_dot(x):\n if \"_\" in x.name:\n p = x.name.split(\"_\", 1)\n x = fr\"\\dot{{{p[0]}}}_{p[1]}\"\n else:\n x = fr\"\\dot{{{x.name}}}\"\n return x\ndotX_sym = Matrix([Symbol(symb_to_dot(x)) for x in X])\ndotX_expr = Ms * Matrix([Symbol(x_.name) for x_ in r_sym])\nsub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\nfor layer in sub_order:\n info_ = []\n for x_, dx_, f_ in zip(X, dotX_sym, dotX_expr):\n if g.nodes[x_][\"layer\"] == layer:\n info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n if info_:\n info_ = [r\"\\begin{align}\"]+ info_ + [fr\"\\label{{eq:asm1-ode-{layer}}}\\end{{align}}\"]\n print(\"\\n\".join(info_))\n\n# info_ = []\n# order = [0] * len(X)\n# sub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\n# for idx, x_dx_f_ in enumerate(zip(X, dotX_sym, dotX_expr)):\n# x_, dx_, f_ = x_dx_f_\n# order[idx] = sub_order.index(g.nodes[x_][\"layer\"])\n# info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n# info_ = sorted(info_, key=lambda x_: order[info_.index(x_)])\n# info_ = [r\"\\begin{align}\"]+ info_ + [r\"\\end{align}\"]\n# print(\"\\n\".join(info_))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.2" } }, "nbformat": 4, "nbformat_minor": 0 }