""" ASM1: dependency graph ============================================================================ """ # Copyright (C) 2024 Juan Pablo Carbajal # Copyright (C) 2024 Mariane Yvonne Schneider # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Author: Juan Pablo Carbajal import matplotlib.pyplot as plt import numpy as np from sympy import * from pint import UnitRegistry try: import dsafeatures except ModuleNotFoundError: import sys import os sys.path.insert(0, os.path.abspath("../..")) import dsafeatures.odemodel as m import dsafeatures.symtools as st from dsafeatures.printing import * init_printing() ureg = UnitRegistry() # units registry plt.style.use('figures.mplstyle') # %% # Load the model # -------------- asm = m.ODEModel.from_csv(name="ASM1") # States X = asm.states # Process rates r_sym, r = asm.rates, asm.rates_expr # Matrix M = asm.coef_matrix # State derivatives dotX, ddotX = asm.derivative(2) # %% # Graph # ------- g = asm.graph # node lists rate_n, st_n, iso_n, misc_n, in_n, meas_n = [], [], [], [], [], [] for n, d in g.nodes(data=True): match d["layer"]: case "rate": rate_n.append(n) case "state": st_n.append(n) case "isolated": iso_n.append(n) case "other": misc_n.append(n) case "input": in_n.append(n) case "measurement": meas_n.append(n) # nodes properties for visualization vis_attr = {"isolated": dict(color="grey", size=1600), "rate": dict(color="gold", size=600), "measurement": dict(color="pink", size=1600), "state": dict(color="red", size=1600), "input": dict(color="violet", size=1600), "other": dict(color="black", size=800), } # set initial positions pos = st.nx.multipartite_layout(g, "layer") # organize core core = rate_n + st_n + meas_n pos_core = st.nx.shell_layout(g.subgraph(core).to_undirected(), [meas_n, rate_n, st_n], center=[0, 0]) pos.update(pos_core) ho, aN, hoN, gaO2, da, ghAn, dh, ghO2 = rate_n XcbN, _ = asm.state_by_symbol_name("X_cbN") Xcb, _ = asm.state_by_symbol_name("X_cb") NHx, _ = asm.state_by_symbol_name("S_NHx") Xa, _ = asm.state_by_symbol_name("X_a") Xh, _ = asm.state_by_symbol_name("X_h") Sb, _ = asm.state_by_symbol_name("S_b") SbN, _ = asm.state_by_symbol_name("S_bN") O2, _ = asm.state_by_symbol_name("S_O2") N2, _ = asm.state_by_symbol_name("S_N2") NOx, _ = asm.state_by_symbol_name("S_NOx") Alk, _ = asm.state_by_symbol_name("S_alk") Xnbe, _ = asm.state_by_symbol_name("X_nb,e") pos[hoN] = [0.17, 0.64] pos[NOx] = [0.17, 0.29] pos[XcbN] = [-0.17, 0.8] pos[Xh] = [0, 0] pos[O2] = [0, 0.3] pos[N2] = [0.3, 0.31] pos[Xcb] = [-0.25, 0.2] pos[Sb] = [0.07, -0.2] pos[Xa] = [-0.4, -0.15] pos[Xnbe] = [-0.3, 0.33] pos[SbN] = [0.4, 0.6] pos[ghO2] = [-0.2, -0.3] pos[ghAn] = [0.28, -0.06] pos[NHx] = [0.07, -0.4] pos[ho] = [-0.3, -0.032] pos[aN] = [0.3, -0.4] pos[Alk] = [0.2, -0.7] pos[dh] = [-0.15, 0.45] pos[da] = [-0.5, 0.17] for n in iso_n: pos[n][1] -= 0.4 # Randon positions using graphviz # pos = st.nx.nx_agraph.graphviz_layout(g.to_undirected()) st.nx.draw(g, pos=pos, node_color=[vis_attr[d["layer"]]["color"] for _, d in g.nodes(data=True)], node_size=[vis_attr[d["layer"]]["size"] for _, d in g.nodes(data=True)], with_labels=True, labels={x: latex(x, mode="inline") for x in g.nodes}, arrowsize=20, ) plt.show() # %% # Graph without processes # ------------------------ dotX_s = dotX.subs(zip(r_sym, r)) gr = st.dependency_graph(dotX_s, vars=list(X), eqs_names=list(X)) attr = {} for x in gr.nodes: if gr.degree[x] == 0: # isolated variable lyr = "isolated" elif gr.in_degree[x] == 0: # input variable lyr = "input" elif gr.out_degree[x] == 0: # measurement lyr = "measurement" elif x in r_n: # rate lyr = "rate" elif x in X: lyr = "state" else: lyr = "other" attr[x] = dict(layer=lyr) st.nx.set_node_attributes(gr, attr) grs = gr.copy() grs.remove_nodes_from(n for n, d in gr.nodes(data=True) if d["layer"] != "state") # %% plt.figure() sz = [vis_attr[d["layer"]]["size"] for _, d in grs.nodes(data=True)] pos_ = st.nx.nx_agraph.graphviz_layout(grs.to_undirected()) st.nx.draw_networkx_nodes(grs, pos_, node_size=sz, node_color=[vis_attr[d["layer"]]["color"] for _, d in grs.nodes(data=True)]) st.nx.draw_networkx_labels(grs, pos=pos_, labels={x: latex(x, mode="inline") for x in grs.nodes}, ) st.nx.draw_networkx_edges(grs, pos_, connectionstyle="arc3,rad=0.2", node_size=sz, arrowsize=20, ) # st.nx.draw(grs, pos=pos, # node_color=[vis_attr[d["layer"]]["color"] for _, d in grs.nodes(data=True)], # node_size=[vis_attr[d["layer"]]["size"] for _, d in grs.nodes(data=True)], # with_labels=True, # labels={x: latex(x, mode="inline") for x in grs.nodes}, # arrowsize=20, # ) # %% # Model components # ------------------ # Symbols and process rates name # ******************************* def to_latex(x): if "_" in x: p = x.split("_", 1) x = fr"{p[0]}_\text{{{p[1]}}}" return fr"${x}$" for c in ("states", "processrates"): info_ = asm.component[c].rename(columns={"description": "Description"}) info_["This work"] = info_.sympy.apply(lambda x: latex(x, mode='inline')) info_["Other name"] = info_.name.apply(to_latex) print(info_[["This work", "Other name", "Description"]].to_latex(index=False)) # %% # Description of active states info_ = asm.component["states"].set_index("sympy") info_ = [fr"\item[{latex(x, mode='inline')}] {info_.loc[x].description}" for x in X if x in core] info_ = [r"\begin{itemize}"] + info_ + [r"\end{itemize}"] print("\n".join(info_)) # %% # Parameters # ************ info_ = asm.component["parameters"][["sympy", "value", "name", "unit", "description"]].copy() info_.rename(columns={"description": "Description", "value": "Value", "unit": "Units"}, inplace=True) info_["This work"] = info_.sympy.apply(lambda x: latex(x, mode='inline')) info_["Other name"] = info_.name.apply(to_latex) info_["Value"] = info_.Value.apply(lambda x: latex(x, mode='inline')) def units_to_latex(x): try: y = ureg.parse_units(x) if y.dimensionless: # handle unit kinds, WIP in pint https://github.com/hgrecco/pint/pull/1967 return x y = f"{y:Lx}" return y.replace("[", "").replace("]", "") except AssertionError: return "$-$" info_["Units"] = info_.Units.apply(units_to_latex) info_["Description"] = info_.Description.str.replace(r"\(([\\\w]+)[_]([\\\w]+)\)", r"($\1_\\text{\2}$)", regex=True) info_.sort_values(by="This work", inplace=True) print(info_[["This work", "Other name", "Value", "Units", "Description"]].to_latex(index=False)) # %% # States vector # ************** print(latex(X)) # %% # Matrix # ******* M_entries = [Symbol(fr"m_{{{i}\,{j.name}}}") for i in X for j in r_sym] Ms = Matrix(np.asarray(M_entries).reshape(*M.shape)) for i in range(M.shape[0]): for j in range(M.shape[1]): if isinstance(M[i, j], Number): Ms[i, j] = M[i, j] print(latex(Ms)) # %% info_ = [fr"{latex(ms)} &\coloneqq {latex(m)} \\" for ms, m in zip(Ms, M) if not isinstance(m, Number)] info_ = [r"\begin{align}"] + info_ + [r"\end{align}"] print("\n".join(info_)) # %% # Rates vector # ************ print(latex(r_sym)) # %% info_ = [fr"{latex(r_)} &\coloneqq {latex(f_)} \\" for r_, f_ in zip(r_sym, r)] info_ = [r"\begin{align}"] + info_ + [r"\end{align}"] print("\n".join(info_)) # %% # ODE # **** def symb_to_dot(x): if "_" in x.name: p = x.name.split("_", 1) x = fr"\dot{{{p[0]}}}_{p[1]}" else: x = fr"\dot{{{x.name}}}" return x dotX_sym = Matrix([Symbol(symb_to_dot(x)) for x in X]) dotX_expr = Ms * Matrix([Symbol(x_.name) for x_ in r_sym]) sub_order = ["state", "measurement", "isolated", "other"] for layer in sub_order: info_ = [] for x_, dx_, f_ in zip(X, dotX_sym, dotX_expr): if g.nodes[x_]["layer"] == layer: info_.append(fr"{latex(dx_)} &\coloneqq {latex(f_)} \\") if info_: info_ = [r"\begin{align}"] + info_ + [fr"\label{{eq:asm1-ode-{layer}}}\end{{align}}"] print("\n".join(info_)) # info_ = [] # order = [0] * len(X) # sub_order = ["state", "measurement", "isolated", "other"] # for idx, x_dx_f_ in enumerate(zip(X, dotX_sym, dotX_expr)): # x_, dx_, f_ = x_dx_f_ # order[idx] = sub_order.index(g.nodes[x_]["layer"]) # info_.append(fr"{latex(dx_)} &\coloneqq {latex(f_)} \\") # info_ = sorted(info_, key=lambda x_: order[info_.index(x_)]) # info_ = [r"\begin{align}"]+ info_ + [r"\end{align}"] # print("\n".join(info_)) # %% plt.show()