ASM1: dependency graph#

# Copyright (C) 2024 Juan Pablo Carbajal
# Copyright (C) 2024 Mariane Yvonne Schneider
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Author: Juan Pablo Carbajal <ajuanpi@gmail.com>
import matplotlib.pyplot as plt
import numpy as np
from sympy import *
from pint import UnitRegistry

try:
    import dsafeatures
except ModuleNotFoundError:
    import sys
    import os

    sys.path.insert(0, os.path.abspath("../.."))

import dsafeatures.odemodel as m
import dsafeatures.symtools as st

from dsafeatures.printing import *

init_printing()

ureg = UnitRegistry()  # units registry

plt.style.use('figures.mplstyle')

Load the model#

asm = m.ODEModel.from_csv(name="ASM1")
# States
X = asm.states
# Process rates
r_sym, r = asm.rates, asm.rates_expr
# Matrix
M = asm.coef_matrix
# State derivatives
dotX, ddotX = asm.derivative(2)

Graph#

g = asm.graph

# node lists
rate_n, st_n, iso_n, misc_n, in_n, meas_n = [], [], [], [], [], []
for n, d in g.nodes(data=True):
    match d["layer"]:
        case "rate":
            rate_n.append(n)
        case "state":
            st_n.append(n)
        case "isolated":
            iso_n.append(n)
        case "other":
            misc_n.append(n)
        case "input":
            in_n.append(n)
        case "measurement":
            meas_n.append(n)

# nodes properties for visualization
vis_attr = {"isolated": dict(color="grey", size=1600),
            "rate": dict(color="gold", size=600),
            "measurement": dict(color="pink", size=1600),
            "state": dict(color="red", size=1600),
            "input": dict(color="violet", size=1600),
            "other": dict(color="black", size=800),
            }

# set initial positions
pos = st.nx.multipartite_layout(g, "layer")
# organize core
core = rate_n + st_n + meas_n
pos_core = st.nx.shell_layout(g.subgraph(core).to_undirected(), [meas_n, rate_n, st_n], center=[0, 0])
pos.update(pos_core)
ho, aN, hoN, gaO2, da, ghAn, dh, ghO2 = rate_n
XcbN, _ = asm.state_by_symbol_name("X_cbN")
Xcb, _ = asm.state_by_symbol_name("X_cb")
NHx, _ = asm.state_by_symbol_name("S_NHx")
Xa, _ = asm.state_by_symbol_name("X_a")
Xh, _ = asm.state_by_symbol_name("X_h")
Sb, _ = asm.state_by_symbol_name("S_b")
SbN, _ = asm.state_by_symbol_name("S_bN")
O2, _ = asm.state_by_symbol_name("S_O2")
N2, _ = asm.state_by_symbol_name("S_N2")
NOx, _ = asm.state_by_symbol_name("S_NOx")
Alk, _ = asm.state_by_symbol_name("S_alk")
Xnbe, _ = asm.state_by_symbol_name("X_nb,e")

pos[hoN] = [0.17, 0.64]
pos[NOx] = [0.17, 0.29]
pos[XcbN] = [-0.17, 0.8]
pos[Xh] = [0, 0]
pos[O2] = [0, 0.3]
pos[N2] = [0.3, 0.31]
pos[Xcb] = [-0.25, 0.2]
pos[Sb] = [0.07, -0.2]
pos[Xa] = [-0.4, -0.15]
pos[Xnbe] = [-0.3, 0.33]
pos[SbN] = [0.4, 0.6]
pos[ghO2] = [-0.2, -0.3]
pos[ghAn] = [0.28, -0.06]
pos[NHx] = [0.07, -0.4]
pos[ho] = [-0.3, -0.032]
pos[aN] = [0.3, -0.4]
pos[Alk] = [0.2, -0.7]
pos[dh] = [-0.15, 0.45]
pos[da] = [-0.5, 0.17]

for n in iso_n:
    pos[n][1] -= 0.4

# Randon positions using graphviz
# pos = st.nx.nx_agraph.graphviz_layout(g.to_undirected())
st.nx.draw(g, pos=pos,
           node_color=[vis_attr[d["layer"]]["color"] for _, d in g.nodes(data=True)],
           node_size=[vis_attr[d["layer"]]["size"] for _, d in g.nodes(data=True)],
           with_labels=True,
           labels={x: latex(x, mode="inline") for x in g.nodes},
           arrowsize=20,
           )
plt.show()
example asm1 graph

Graph without processes#

dotX_s = dotX.subs(zip(r_sym, r))
gr = st.dependency_graph(dotX_s, vars=list(X), eqs_names=list(X))
attr = {}
for x in gr.nodes:
    if gr.degree[x] == 0:
        # isolated variable
        lyr = "isolated"
    elif gr.in_degree[x] == 0:
        # input variable
        lyr = "input"
    elif gr.out_degree[x] == 0:
        # measurement
        lyr = "measurement"
    elif x in r_n:
        # rate
        lyr = "rate"
    elif x in X:
        lyr = "state"
    else:
        lyr = "other"
    attr[x] = dict(layer=lyr)
st.nx.set_node_attributes(gr, attr)

grs = gr.copy()
grs.remove_nodes_from(n for n, d in gr.nodes(data=True) if d["layer"] != "state")
Traceback (most recent call last):
  File "/builds/sbrml/dsa-signal-features/doc/examples/example_asm1_graph.py", line 161, in <module>
    elif x in r_n:
              ^^^
NameError: name 'r_n' is not defined
plt.figure()
sz = [vis_attr[d["layer"]]["size"] for _, d in grs.nodes(data=True)]
pos_ = st.nx.nx_agraph.graphviz_layout(grs.to_undirected())
st.nx.draw_networkx_nodes(grs, pos_, node_size=sz,
                          node_color=[vis_attr[d["layer"]]["color"] for _, d in grs.nodes(data=True)])
st.nx.draw_networkx_labels(grs, pos=pos_,
                          labels={x: latex(x, mode="inline") for x in grs.nodes},
                          )
st.nx.draw_networkx_edges(grs, pos_, connectionstyle="arc3,rad=0.2", node_size=sz,
                          arrowsize=20,
                          )


# st.nx.draw(grs, pos=pos,
#            node_color=[vis_attr[d["layer"]]["color"] for _, d in grs.nodes(data=True)],
#            node_size=[vis_attr[d["layer"]]["size"] for _, d in grs.nodes(data=True)],
#            with_labels=True,
#            labels={x: latex(x, mode="inline") for x in grs.nodes},
#            arrowsize=20,
#            )

Model components#

Symbols and process rates name#

def to_latex(x):
    if "_" in x:
        p = x.split("_", 1)
        x = fr"{p[0]}_\text{{{p[1]}}}"
    return fr"${x}$"


for c in ("states", "processrates"):
    info_ = asm.component[c].rename(columns={"description": "Description"})
    info_["This work"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))
    info_["Other name"] = info_.name.apply(to_latex)
    print(info_[["This work", "Other name", "Description"]].to_latex(index=False))

Description of active states

info_ = asm.component["states"].set_index("sympy")
info_ = [fr"\item[{latex(x, mode='inline')}] {info_.loc[x].description}" for x in X if x in core]
info_ = [r"\begin{itemize}"] + info_ + [r"\end{itemize}"]
print("\n".join(info_))

Parameters#

info_ = asm.component["parameters"][["sympy", "value", "name", "unit", "description"]].copy()
info_.rename(columns={"description": "Description",
                      "value": "Value",
                      "unit": "Units"}, inplace=True)
info_["This work"] = info_.sympy.apply(lambda x: latex(x, mode='inline'))
info_["Other name"] = info_.name.apply(to_latex)
info_["Value"] = info_.Value.apply(lambda x: latex(x, mode='inline'))


def units_to_latex(x):
    try:
        y = ureg.parse_units(x)
        if y.dimensionless:
            # handle unit kinds, WIP in pint https://github.com/hgrecco/pint/pull/1967
            return x
        y = f"{y:Lx}"
        return y.replace("[", "").replace("]", "")
    except AssertionError:
        return "$-$"


info_["Units"] = info_.Units.apply(units_to_latex)
info_["Description"] = info_.Description.str.replace(r"\(([\\\w]+)[_]([\\\w]+)\)", r"($\1_\\text{\2}$)", regex=True)
info_.sort_values(by="This work", inplace=True)
print(info_[["This work", "Other name", "Value", "Units", "Description"]].to_latex(index=False))

States vector#

print(latex(X))

Matrix#

M_entries = [Symbol(fr"m_{{{i}\,{j.name}}}") for i in X for j in r_sym]
Ms = Matrix(np.asarray(M_entries).reshape(*M.shape))
for i in range(M.shape[0]):
    for j in range(M.shape[1]):
        if isinstance(M[i, j], Number):
            Ms[i, j] = M[i, j]
print(latex(Ms))
info_ = [fr"{latex(ms)} &\coloneqq {latex(m)} \\" for ms, m in zip(Ms, M) if not isinstance(m, Number)]
info_ = [r"\begin{align}"] + info_ + [r"\end{align}"]
print("\n".join(info_))

Rates vector#

print(latex(r_sym))
info_ = [fr"{latex(r_)} &\coloneqq {latex(f_)} \\" for r_, f_ in zip(r_sym, r)]
info_ = [r"\begin{align}"] + info_ + [r"\end{align}"]
print("\n".join(info_))

ODE#

def symb_to_dot(x):
    if "_" in x.name:
        p = x.name.split("_", 1)
        x = fr"\dot{{{p[0]}}}_{p[1]}"
    else:
        x = fr"\dot{{{x.name}}}"
    return x


dotX_sym = Matrix([Symbol(symb_to_dot(x)) for x in X])
dotX_expr = Ms * Matrix([Symbol(x_.name) for x_ in r_sym])
sub_order = ["state", "measurement", "isolated", "other"]
for layer in sub_order:
    info_ = []
    for x_, dx_, f_ in zip(X, dotX_sym, dotX_expr):
        if g.nodes[x_]["layer"] == layer:
            info_.append(fr"{latex(dx_)} &\coloneqq {latex(f_)} \\")
    if info_:
        info_ = [r"\begin{align}"] + info_ + [fr"\label{{eq:asm1-ode-{layer}}}\end{{align}}"]
    print("\n".join(info_))

# info_ = []
# order = [0] * len(X)
# sub_order = ["state", "measurement", "isolated", "other"]
# for idx, x_dx_f_ in enumerate(zip(X, dotX_sym, dotX_expr)):
#     x_, dx_, f_ = x_dx_f_
#     order[idx] = sub_order.index(g.nodes[x_]["layer"])
#     info_.append(fr"{latex(dx_)} &\coloneqq {latex(f_)} \\")
# info_ = sorted(info_, key=lambda x_: order[info_.index(x_)])
# info_ = [r"\begin{align}"]+ info_ + [r"\end{align}"]
# print("\n".join(info_))

Total running time of the script: (0 minutes 1.216 seconds)

Estimated memory usage: 463 MB

Gallery generated by Sphinx-Gallery