Note
Go to the end to download the full example code.
ASM3: confounding on the inflection points of the O2 signal#
# Copyright (C) 2024 Juan Pablo Carbajal
# Copyright (C) 2024 Mariane Yvonne Schneider
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Author: Juan Pablo Carbajal <ajuanpi@gmail.com>
# Author: Mariane Yvonne Schneider <myschneider@meiru.ch>
import matplotlib.pyplot as plt
import pandas as pd
from sympy import *
from functools import partial
import numpy as np
import numpy.ma as ma
from scipy.integrate import solve_ivp
from scipy.special import expit
try:
import dsafeatures
except ModuleNotFoundError:
import sys
import os
sys.path.insert(0, os.path.abspath("../.."))
import dsafeatures.odemodel as m
import dsafeatures.symtools as st
import dsafeatures.signal_processing as sp
import dsafeatures.analysis as an
import dsafeatures.interactive as gui
from dsafeatures.printing import *
init_printing()
Load the model#
asm = m.ODEModel.from_csv(name="ASM3")
# States
X = asm.states
# Process rates
r_sym, r = asm.rates, asm.rates_expr
Jr = r.jacobian(X)
# Matrix
M = asm.coef_matrix
# State derivative
dotX = asm.derivative()
O2, idxO2 = asm.state_by_name("S_O2")
Aeration#
Define aeration
maxO2 = 9.0
def oxygen_PC(o2state, *, K=10.0, setpoint=maxO2, max_inflow=50.0, symbolic=True):
if symbolic:
sigmoid = lambda x: 1 / (1 + exp(-x))
else:
sigmoid = expit
O2_error = setpoint - o2state
# saturate at max_inflow
inflow = max_inflow * sigmoid(K*O2_error)
return inflow
# add areation
oxyflow = 1000.0 # oxygen intake flow
dotX[idxO2] += oxygen_PC(O2, max_inflow=oxyflow)
ddotX = st.ddXdt(M, Jr, dotX)
Inflection point curve#
ipc = ddotX[idxO2]
Parameter values#
param_values = dict(asm.parameters(and_values=True).set_index("sympy").value)
param_values = {k: v.subs(param_values) for k, v in param_values.items()}
replace parameter symbols with values
ipc_pv = ipc.subs(zip(r_sym, r)).subs(param_values).doit()
ipc_pv
Find on which states the IPC depends on
ipc_states = sorted(tuple(x for x in X if ipc.has(x)), key=str)
ipc_states
[S_\text{NHx}, S_\text{NOx}, S_\text{O2}, S_\text{alk}, S_\text{b}, X_\text{a}, X_\text{cb}, X_\text{h}, X_text{h,s}]
same for ODE system
dotX_pv = dotX.subs(zip(r_sym, r)).subs(param_values).doit()
dO2dt_states = sorted(tuple(x for x in X if dotX_pv[idxO2].has(x)), key=str)
dO2dt_states
[S_\text{NHx}, S_\text{O2}, S_\text{alk}, S_\text{b}, X_\text{a}, X_\text{h}, X_text{h,s}]
states_to_set = sorted(list(set(ipc_states) | set(dO2dt_states)), key=str)
Numerical check#
State values initial values
states_iv = asm.component["states"][["sympy", "initial_value"]].set_index("sympy").loc[list(X), "initial_value"]
Integrate numerically
Spline representation
fig, axs = plt.subplots(nrows=2)
ax = axs[0]
ax.plot(t, dx_val, label="symbolic")
ax.plot(t, dx_s(t), ':', label="numerical")
ax.set_ylabel(latex(O2, mode="inline")+" 1st derivative")
ax = axs[1]
ax.plot(t, ipc_val, label="symbolic")
ax.plot(t, ipc_s(t), ':', label="numerical")
ax.set_ylabel(latex(O2, mode="inline")+" 2nd derivative")
ax.set_xlabel("time")

Analysis#
%% Set limits for O2
Set value for NHx
NHx, idxNHx = asm.state_by_name("S_NHx")
Find zeros of ipc for all states with random values
# minimum value for derivative
dotO2_min = 40.0
# initialize random generator
rng = np.random.default_rng(list(map(ord, "abracadabra")))
# initial values of states to set
states_iv = asm.component["states"][["sympy", "initial_value"]].set_index("sympy").loc[states_to_set, "initial_value"]
nsamples = 200
samples = pd.DataFrame(columns=states_to_set, dtype=float)
for Y in states_to_set:
if Y == NHx:
samples.loc[:, Y] = rng.uniform(low=2.0, high=200.0, size=nsamples)
else:
samples.loc[:, Y] = rng.uniform(low=1e-3, high=min(max(states_iv[Y] * 10, 10), 5e3), size=nsamples)
if "hits" not in locals():
hits = dict()
ngrid = 60
# normalized grid
g1, g2 = np.meshgrid(*(np.linspace(0, 1, ngrid), )*2)
# grid for oxygen
xx = g1 * (O2_limits[2] - O2_limits[1]) + O2_limits[1]
# oxygen derivative
dotO2 = dotX_pv[idxO2]
for Y in states_to_set:
if Y == O2:
continue
# grid for Y state, default estimated maximum
ymax = max(20, 3 * states_iv[Y])
yy = g2 * ymax
other_states = tuple(s for s in states_to_set if s not in (O2, Y))
q, vv = an.masked_contours2d(ipc_pv, vars=(O2, Y), params=other_states,
vars_grid=(xx, yy), params_samples=samples.loc[:, list(other_states)].to_numpy(),
maskfunc=[(dotO2, dotO2_min)])
if vv:
ql = np.vstack([l_ for q_ in q for l_ in q_])
yrng = (ql[:, 1].min(), ql[:,1].max())
hits[Y] = {"states": other_states, "samples": np.asarray(vv), "contours": q, "range": yrng}
fig, ax = plt.subplots()
for q_ in q:
for l_ in q_:
ax.plot(l_[:,0], l_[:,1], 'k', alpha=0.5)
ax.set_xlabel(latex(O2, mode='inline'))
ax.set_ylabel(latex(Y, mode='inline'))
<lambdifygenerated-44>:22: RuntimeWarning: divide by zero encountered in reciprocal
x18 = _Dummy_544**(-1.0)
<lambdifygenerated-44>:63: RuntimeWarning: invalid value encountered in divide
return -1.17460317460317*_Dummy_537**2*x41*x53/_Dummy_544**2 - 18.047619047619*_Dummy_541*x10*x11 - _Dummy_545*x24*x54 - 0.12*x10 - 0.75*x22*x41 - 0.16*x41 - x50*(0.2*_Dummy_537*x13 - _Dummy_537*x43) - 0.15*x50*(5.0*_Dummy_540*_Dummy_544*x13*x14 - _Dummy_540*_Dummy_544*x20*x42) - 18.047619047619*x50*(1.0*_Dummy_539*_Dummy_541*_Dummy_543*x1*x7*x9 - 1.0*_Dummy_541*x11*x51) - 0.587301587301587*x50*(2.0*_Dummy_537*_Dummy_541*_Dummy_543*x13*x28*x30*x32 - 2.0*_Dummy_537*x34*x42) - x52*(0.15*_Dummy_539*x1 - 0.15*x51) - x52*(0.2*_Dummy_544*x13 - _Dummy_544*x43) - 0.587301587301587*x54*(-x26*x36*x53 + x37) - 18.047619047619*x56*(1.0*_Dummy_539*_Dummy_541*_Dummy_545*x1*x7*x9 - 1.0*_Dummy_543*x47*x9/x6**2) - 0.587301587301587*x56*(2.0*_Dummy_537*_Dummy_541*_Dummy_545*x13*x28*x30*x32 - x30*x58/x31**2) - 18.047619047619*x57*(1.0*_Dummy_539*_Dummy_543*_Dummy_545*x1*x7*x9 - 1.0*_Dummy_543*x47*x7/x8**2) - 0.587301587301587*x57*(2.0*_Dummy_537*_Dummy_543*_Dummy_545*x13*x28*x30*x32 - x32*x58/x29**2) - 0.15*(5.0*_Dummy_544*_Dummy_545*x13*x14 - 1.25*x17/(0.5*_Dummy_540 + 1)**2)*(3.0*_Dummy_538*x19 - _Dummy_540*x16*x20 - 0.6*x23)
<lambdifygenerated-44>:63: RuntimeWarning: invalid value encountered in multiply
return -1.17460317460317*_Dummy_537**2*x41*x53/_Dummy_544**2 - 18.047619047619*_Dummy_541*x10*x11 - _Dummy_545*x24*x54 - 0.12*x10 - 0.75*x22*x41 - 0.16*x41 - x50*(0.2*_Dummy_537*x13 - _Dummy_537*x43) - 0.15*x50*(5.0*_Dummy_540*_Dummy_544*x13*x14 - _Dummy_540*_Dummy_544*x20*x42) - 18.047619047619*x50*(1.0*_Dummy_539*_Dummy_541*_Dummy_543*x1*x7*x9 - 1.0*_Dummy_541*x11*x51) - 0.587301587301587*x50*(2.0*_Dummy_537*_Dummy_541*_Dummy_543*x13*x28*x30*x32 - 2.0*_Dummy_537*x34*x42) - x52*(0.15*_Dummy_539*x1 - 0.15*x51) - x52*(0.2*_Dummy_544*x13 - _Dummy_544*x43) - 0.587301587301587*x54*(-x26*x36*x53 + x37) - 18.047619047619*x56*(1.0*_Dummy_539*_Dummy_541*_Dummy_545*x1*x7*x9 - 1.0*_Dummy_543*x47*x9/x6**2) - 0.587301587301587*x56*(2.0*_Dummy_537*_Dummy_541*_Dummy_545*x13*x28*x30*x32 - x30*x58/x31**2) - 18.047619047619*x57*(1.0*_Dummy_539*_Dummy_543*_Dummy_545*x1*x7*x9 - 1.0*_Dummy_543*x47*x7/x8**2) - 0.587301587301587*x57*(2.0*_Dummy_537*_Dummy_543*_Dummy_545*x13*x28*x30*x32 - x32*x58/x29**2) - 0.15*(5.0*_Dummy_544*_Dummy_545*x13*x14 - 1.25*x17/(0.5*_Dummy_540 + 1)**2)*(3.0*_Dummy_538*x19 - _Dummy_540*x16*x20 - 0.6*x23)
<lambdifygenerated-45>:9: RuntimeWarning: divide by zero encountered in divide
return -0.75*_Dummy_551*x2/(_Dummy_551 + 2.0) - 18.047619047619*x0*x4/((_Dummy_552 + 0.5)*(_Dummy_554 + 1.0)) - 0.12*x0 - 0.16*x2 - 1.17460317460317*x3*x4/((_Dummy_552 + 0.1)*(_Dummy_554 + 0.01)*(_Dummy_548/_Dummy_555 + 1.0)) - 0.2*x3 + 1000.0/(8.19401262399051e-40*exp(10.0*_Dummy_556) + 1)
The numer of random samples of the states values that generated an ipc
{S_\text{NHx}: 3, S_\text{NOx}: 1, S_\text{alk}: 7, S_\text{b}: 1, X_\text{a}: 13, X_\text{cb}: 1, X_\text{h}: 2, X_text{h,s}: 3}
Trajectory of point in ipc#
ODE definition
Select plane y_state = sorted(hits, key=lambda k: max([y.allsegs[0][0][:,0].max()-y.allsegs[0][0][:,0].min() for y in hits[k][“contours”]]))[0] y_state = NHx y_state = context[“states”][“X_ANO”]
y_state, idxy = asm.state_by_name("X_OHO")
# initial values
iv_def = asm.component["states"][["sympy", "initial_value"]].set_index("sympy")["initial_value"]
iv_def = np.maximum(iv_def, 1.0) # avoid 0 initial values
f, axs = plt.subplots(nrows=2, tight_layout=True)
axs[0].set_xlabel(latex(O2, mode='inline'))
axs[0].set_ylabel(latex(y_state, mode='inline'))
axs[1].set_xlabel('t')
axs[1].set_ylabel(latex(O2, mode='inline'))
lgd = []
for q, idx_sample in zip(hits[y_state]["contours"], hits[y_state]["samples"]):
# get point on ipc
qmid = (2, 1200)
dmin = np.inf
pxy = []
cidx = -1
for ci, c in enumerate(q):
d_ = ((c - qmid)**2).sum(axis=1)
idx = np.argmin(d_)
if d_[idx] < dmin:
dmin = d_[idx]
pxy = c[idx]
cidx = ci
ptx, pty = pxy
# integrate ODE
# Generate initial value
iv = iv_def.copy()
iv_ = samples.loc[idx_sample].copy()
iv.loc[iv_.index] = iv_
iv[O2] = ptx
iv[y_state] = pty
iv_ls = iv[list(X)].to_list()
sol = solve_ivp(**args_, y0=iv_ls)
sol_r = solve_ivp(**args_, y0=iv_ls, args=(-1.0,))
ax = axs[0]
for ci, c in enumerate(q):
if ci == cidx:
ax.plot(c[:, 0], c[:, 1])
clc = ax.get_lines()[-1].get_color()
else:
ax.plot(c[:, 0], c[:, 1], color="k", alpha=0.5)
ax.plot(ptx, pty, 'xk')
ax = axs[1]
ax.plot(sol.t, sol.y[idxO2], color=clc)
ax.plot(-sol_r.t, sol_r.y[idxO2], color=clc)
ax.plot(0, ptx, 'xk')

Interactive#
After all the plots a new interactive plot appears This si not interactive in the example
# find sample with contour with max length
lmax = -np.inf
imax = 0
for i, q in zip(hits[y_state]["samples"], hits[y_state]["contours"]):
for c in q:
# approx. length
lc = np.sum(np.sqrt(np.sum(np.diff(c, axis=0)**2, axis=1)))
if lc > lmax:
lmax = lc
imax = i
qmax = hits[y_state]["samples"].tolist().index(imax)
# Generate initial value
iv = iv_def.copy()
iv_ = samples.loc[imax].copy()
iv.loc[iv_.index] = iv_
iv = iv[list(X)].to_list()
# plot these contours
f, axs = plt.subplots(nrows=3, tight_layout=True)
axs[0].set_xlabel(latex(O2, mode='inline'))
axs[0].set_ylabel(latex(y_state, mode='inline'))
axs[1].set_xlabel('t')
axs[1].set_ylabel(latex(O2, mode='inline'))
axs[2].set_xlabel('t')
axs[2].set_ylabel(latex(NHx, mode='inline'))
ax = axs[0]
for c in hits[y_state]["contours"][qmax]:
ax.plot(c[:, 0], c[:, 1])
# start interactive plot
Xlist = list(X)
gui.interactive_ipc_ode(O2_limits, (y_state, *hits[y_state]["range"]), Xlist, dotX_pv, iv,
also_show=[NHx],
fig=f)
plt.show()

Total running time of the script: (0 minutes 14.147 seconds)
Estimated memory usage: 88 MB