{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# ASM3: dependency graph\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Copyright (C) 2024 Juan Pablo Carbajal\n# Copyright (C) 2024 Mariane Yvonne Schneider\n#\n# This program is free software; you can redistribute it and/or modify\n# it under the terms of the GNU General Public License as published by\n# the Free Software Foundation; either version 3 of the License, or\n# (at your option) any later version.\n#\n# This program is distributed in the hope that it will be useful,\n# but WITHOUT ANY WARRANTY; without even the implied warranty of\n# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n# GNU General Public License for more details.\n#\n# You should have received a copy of the GNU General Public License\n# along with this program. If not, see .\n\n# Author: Juan Pablo Carbajal \nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom sympy import *\nfrom pint import UnitRegistry\n\ntry:\n import dsafeatures\nexcept ModuleNotFoundError:\n import sys\n import os\n\n sys.path.insert(0, os.path.abspath(\"../..\"))\n\nimport dsafeatures.odemodel as m\nimport dsafeatures.symtools as st\n\nfrom dsafeatures.printing import *\ninit_printing()\n\nureg = UnitRegistry() # units registry"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load the model\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"asm = m.ODEModel.from_csv(name=\"ASM3\")\n# States\nX = asm.states\n# Process rates\nr_sym, r = asm.rates, asm.rates_expr\n# Matrix\nM = asm.coef_matrix\n# State derivatives\ndotX = asm.derivative(1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Graph\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"g = asm.graph\n\n# nodes properties for visualization\nvis_attr = {\"isolated\": dict(color=\"grey\", size=1600),\n \"rate\": dict(color=\"gold\", size=600),\n \"measurement\": dict(color=\"pink\", size=1600),\n \"state\": dict(color=\"red\", size=1600),\n \"input\": dict(color=\"violet\", size=1600),\n \"other\": dict(color=\"black\", size=800),\n }\n\n# set initial positions\npos = st.nx.multipartite_layout(g, \"layer\")\np_ = np.asarray(list(pos.values()))\ncnt = np.asarray([p_.max(axis=0)[0], p_.mean(axis=0)[1]])\n# organize core\ncore = [n for n, d in g.nodes(data=True) if d[\"layer\"] in [\"rate\", \"state\"]]\npos_core = st.nx.spring_layout(g.subgraph(core).to_undirected(), k=0.4, iterations=5000, center=cnt, seed=1234321)\npos.update(pos_core)\n# orbit measurements and inputs\niso = [n for n, d in g.nodes(data=True) if d[\"layer\"] in [\"isolated\", \"other\"]]\npos_meas = st.nx.spring_layout(g.to_undirected(), pos=pos, fixed=core+iso, k=0.5, iterations=1000)\npos.update(pos_meas)\n# move isolated and other\nmx = min(c[0] for c in pos.values())\nfor n in iso:\n pos[n][0] = mx\n\n#N2, _ = asm.state_by_name(\"S_N2\")\n#pos[N2] -= pos[N2] * 0.4\n#NE, _ = asm.state_by_name(\"X_UE\")\n#pos[NE] -= pos[NE] * 0.2\n#Xh, _ = asm.state_by_name(\"X_OHO\")\n#pos[Xh] += [0.2, -0.15]\nr_n = [n for n, d in g.nodes(data=True) if d[\"layer\"]==\"rate\"]\npos[r_n[1]][1] += 0.25\nst.nx.draw(g, pos=pos,\n node_color=[vis_attr[d[\"layer\"]][\"color\"] for _, d in g.nodes(data=True)],\n node_size=[vis_attr[d[\"layer\"]][\"size\"] for _, d in g.nodes(data=True)],\n with_labels=True,\n labels={x: latex(x, mode=\"inline\") for x in g.nodes},\n arrowsize=20,\n )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Model components\nSymbols and process rates name\n*******************************\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def to_latex(x):\n if \"_\" in x:\n p = x.split(\"_\", 1)\n x = fr\"{p[0]}_\\text{{{p[1]}}}\"\n return fr\"${x}$\"\n\nfor c in (\"states\", \"processrates\"):\n info_ = asm.component[c].rename(columns={\"description\": \"Description\"})\n info_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\n info_[\"Other name\"] = info_.name.apply(to_latex)\n print(info_[[\"This work\", \"Other name\", \"Description\"]].to_latex(index=False))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Description of active states\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"info_ = asm.component[\"states\"].set_index(\"sympy\")\ninfo_ = [f\"\\item[{latex(x, mode='inline')}] {info_.loc[x].description}\" for x in X if x in core]\ninfo_ = [r\"\\begin{itemize}\"] + info_ + [r\"\\end{itemize}\"]\nprint(\"\\n\".join(info_))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parameters\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"info_ = asm.component[\"parameters\"][[\"sympy\", \"value\", \"name\", \"unit\", \"description\"]].copy()\ninfo_.rename(columns={\"description\": \"Description\",\n \"value\": \"Value\",\n \"unit\": \"Units\"}, inplace=True)\ninfo_[\"This work\"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))\ninfo_[\"Other name\"] = info_.name.apply(to_latex)\ninfo_[\"Value\"] = info_.Value.apply(lambda x: latex(x, mode='inline'))\n\n\ndef units_to_latex(x):\n try:\n y = ureg.parse_units(x)\n if y.dimensionless:\n # handle unit kinds, WIP in pint https://github.com/hgrecco/pint/pull/1967\n return x\n y = f\"{y:Lx}\"\n return y.replace(\"[\",\"\").replace(\"]\",\"\")\n except AssertionError:\n return \"$-$\"\n\ninfo_[\"Units\"] = info_.Units.apply(units_to_latex)\ninfo_[\"Description\"] = info_.Description.str.replace(r\"\\(([\\\\\\w]+)[_]([\\\\\\w]+)\\)\", r\"($\\1_\\\\text{\\2}$)\", regex=True)\ninfo_.sort_values(by=\"This work\", inplace=True)\nprint(info_[[\"This work\", \"Other name\", \"Value\", \"Units\", \"Description\"]].to_latex(index=False))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### States vector\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print(latex(X))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Matrix\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"M_entries = [Symbol(fr\"m_{{{i}\\,{j.name}}}\") for i in X for j in r_sym]\nMs = Matrix(np.asarray(M_entries).reshape(*M.shape))\nfor i in range(M.shape[0]):\n for j in range(M.shape[1]):\n if isinstance(M[i, j], Number):\n Ms[i, j] = M[i, j]\nprint(latex(Ms))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"info_ = [fr\"{latex(ms)} &\\coloneqq {latex(m)} \\\\\" for ms, m in zip(Ms, M) if not isinstance(m, Number)]\ninfo_ = [r\"\\begin{align}\"]+info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Rates vector\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"print(latex(r_sym))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"info_ = [fr\"{latex(r_)} &\\coloneqq {latex(f_)} \\\\\" for r_, f_ in zip(r_sym, r)]\ninfo_ = [r\"\\begin{align}\"]+info_ + [r\"\\end{align}\"]\nprint(\"\\n\".join(info_))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### ODE\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def symb_to_dot(x):\n if \"_\" in x.name:\n p = x.name.split(\"_\", 1)\n x = fr\"\\dot{{{p[0]}}}_{p[1]}\"\n else:\n x = fr\"\\dot{{{x.name}}}\"\n return x\ndotX_sym = Matrix([Symbol(symb_to_dot(x)) for x in X])\ndotX_expr = Ms * Matrix([Symbol(x_.name) for x_ in r_sym])\nsub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\nfor layer in sub_order:\n info_ = []\n for x_, dx_, f_ in zip(X, dotX_sym, dotX_expr):\n if g.nodes[x_][\"layer\"] == layer:\n info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n if info_:\n info_ = [r\"\\begin{align}\"]+ info_ + [fr\"\\label{{eq:asm1-ode-{layer}}}\\end{{align}}\"]\n print(\"\\n\".join(info_))\n\n# info_ = []\n# order = [0] * len(X)\n# sub_order = [\"state\", \"measurement\", \"isolated\", \"other\"]\n# for idx, x_dx_f_ in enumerate(zip(X, dotX_sym, dotX_expr)):\n# x_, dx_, f_ = x_dx_f_\n# order[idx] = sub_order.index(g.nodes[x_][\"layer\"])\n# info_.append(fr\"{latex(dx_)} &\\coloneqq {latex(f_)} \\\\\")\n# info_ = sorted(info_, key=lambda x_: order[info_.index(x_)])\n# info_ = [r\"\\begin{align}\"]+ info_ + [r\"\\end{align}\"]\n# print(\"\\n\".join(info_))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}