{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# ASM3: confounding on the inflection points of the O2 signal\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Copyright (C) 2024 Juan Pablo Carbajal\n# Copyright (C) 2024 Mariane Yvonne Schneider\n#\n# This program is free software; you can redistribute it and/or modify\n# it under the terms of the GNU General Public License as published by\n# the Free Software Foundation; either version 3 of the License, or\n# (at your option) any later version.\n#\n# This program is distributed in the hope that it will be useful,\n# but WITHOUT ANY WARRANTY; without even the implied warranty of\n# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n# GNU General Public License for more details.\n#\n# You should have received a copy of the GNU General Public License\n# along with this program. If not, see .\n\n# Author: Juan Pablo Carbajal \n# Author: Mariane Yvonne Schneider \nimport matplotlib.pyplot as plt\nimport pandas as pd\n\nfrom sympy import *\n\nfrom functools import partial\nimport numpy as np\nimport numpy.ma as ma\n\nfrom scipy.integrate import solve_ivp\nfrom scipy.special import expit\n\ntry:\n import dsafeatures\nexcept ModuleNotFoundError:\n import sys\n import os\n\n sys.path.insert(0, os.path.abspath(\"../..\"))\n\nimport dsafeatures.odemodel as m\nimport dsafeatures.symtools as st\nimport dsafeatures.signal_processing as sp\nimport dsafeatures.analysis as an\nimport dsafeatures.interactive as gui\n\nfrom dsafeatures.printing import *\ninit_printing()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load the model\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"asm = m.ODEModel.from_csv(name=\"ASM3\")\n# States\nX = asm.states\n# Process rates\nr_sym, r = asm.rates, asm.rates_expr\nJr = r.jacobian(X)\n# Matrix\nM = asm.coef_matrix\n# State derivative\ndotX = asm.derivative()\n\nO2, idxO2 = asm.state_by_name(\"S_O2\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Aeration\nDefine aeration\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"maxO2 = 9.0\ndef oxygen_PC(o2state, *, K=10.0, setpoint=maxO2, max_inflow=50.0, symbolic=True):\n if symbolic:\n sigmoid = lambda x: 1 / (1 + exp(-x))\n else:\n sigmoid = expit\n O2_error = setpoint - o2state\n # saturate at max_inflow\n inflow = max_inflow * sigmoid(K*O2_error)\n return inflow\n\n\n# add areation\noxyflow = 1000.0 # oxygen intake flow\n\ndotX[idxO2] += oxygen_PC(O2, max_inflow=oxyflow)\nddotX = st.ddXdt(M, Jr, dotX)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Inflection point curve\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ipc = ddotX[idxO2]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parameter values\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"param_values = dict(asm.parameters(and_values=True).set_index(\"sympy\").value)\nparam_values = {k: v.subs(param_values) for k, v in param_values.items()}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"replace parameter symbols with values\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ipc_pv = ipc.subs(zip(r_sym, r)).subs(param_values).doit()\nipc_pv"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Find on which states the IPC depends on\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"ipc_states = sorted(tuple(x for x in X if ipc.has(x)), key=str)\nipc_states"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"same for ODE system\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"dotX_pv = dotX.subs(zip(r_sym, r)).subs(param_values).doit()\ndO2dt_states = sorted(tuple(x for x in X if dotX_pv[idxO2].has(x)), key=str)\ndO2dt_states"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"states_to_set = sorted(list(set(ipc_states) | set(dO2dt_states)), key=str)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Numerical check\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"State values initial values\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"states_iv = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\").loc[list(X), \"initial_value\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Integrate numerically\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"Tend = 0.01\nsol, dX_np, J_np = sp.integrate_numer(dotX_pv, X, X_iv=states_iv, t_span=(0, Tend))\nipc_np = lambdify([tuple(X)], ipc_pv, \"numpy\", cse=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Spline representation\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"t = np.arange(0, Tend, Tend/100)\ny = sol.sol(t)\ndx_val = np.zeros_like(t)\nipc_val = np.zeros_like(t)\nfor k, x_ in enumerate(y.T):\n dx_val[k] = dX_np(x_).flatten()[idxO2]\n ipc_val[k] = ipc_np(x_)\ndx_s = sp.make_interp_spline(t, dx_val, k=5)\nipc_s = dx_s.derivative()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"fig, axs = plt.subplots(nrows=2)\nax = axs[0]\nax.plot(t, dx_val, label=\"symbolic\")\nax.plot(t, dx_s(t), ':', label=\"numerical\")\nax.set_ylabel(latex(O2, mode=\"inline\")+\" 1st derivative\")\n\nax = axs[1]\nax.plot(t, ipc_val, label=\"symbolic\")\nax.plot(t, ipc_s(t), ':', label=\"numerical\")\nax.set_ylabel(latex(O2, mode=\"inline\")+\" 2nd derivative\")\nax.set_xlabel(\"time\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Analysis\n%%\nSet limits for O2\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"O2_limits = (O2, 1e-3, maxO2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set value for NHx\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"NHx, idxNHx = asm.state_by_name(\"S_NHx\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Find zeros of ipc for all states with random values\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# minimum value for derivative\ndotO2_min = 40.0\n\n# initialize random generator\nrng = np.random.default_rng(list(map(ord, \"abracadabra\")))\n\n# initial values of states to set\nstates_iv = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\").loc[states_to_set, \"initial_value\"]\n\nnsamples = 200\nsamples = pd.DataFrame(columns=states_to_set, dtype=float)\nfor Y in states_to_set:\n if Y == NHx:\n samples.loc[:, Y] = rng.uniform(low=2.0, high=200.0, size=nsamples)\n else:\n samples.loc[:, Y] = rng.uniform(low=1e-3, high=min(max(states_iv[Y] * 10, 10), 5e3), size=nsamples)\n\nif \"hits\" not in locals():\n hits = dict()\n ngrid = 60\n # normalized grid\n g1, g2 = np.meshgrid(*(np.linspace(0, 1, ngrid), )*2)\n # grid for oxygen\n xx = g1 * (O2_limits[2] - O2_limits[1]) + O2_limits[1]\n # oxygen derivative\n dotO2 = dotX_pv[idxO2]\n\n for Y in states_to_set:\n if Y == O2:\n continue\n # grid for Y state, default estimated maximum\n ymax = max(20, 3 * states_iv[Y])\n yy = g2 * ymax\n\n other_states = tuple(s for s in states_to_set if s not in (O2, Y))\n q, vv = an.masked_contours2d(ipc_pv, vars=(O2, Y), params=other_states,\n vars_grid=(xx, yy), params_samples=samples.loc[:, list(other_states)].to_numpy(),\n maskfunc=[(dotO2, dotO2_min)])\n if vv:\n ql = np.vstack([l_ for q_ in q for l_ in q_])\n yrng = (ql[:, 1].min(), ql[:,1].max())\n hits[Y] = {\"states\": other_states, \"samples\": np.asarray(vv), \"contours\": q, \"range\": yrng}\n\n fig, ax = plt.subplots()\n for q_ in q:\n for l_ in q_:\n ax.plot(l_[:,0], l_[:,1], 'k', alpha=0.5)\n ax.set_xlabel(latex(O2, mode='inline'))\n ax.set_ylabel(latex(Y, mode='inline'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The numer of random samples of the states values that generated an ipc\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"hits_sz = {k: v[\"samples\"].size for k, v in hits.items()}\nhits_sz"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Trajectory of point in ipc\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"ODE definition\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"args_ = an.numeric_ode(dotX_pv, list(X), ub={O2: maxO2})\nTmax = 30.0 / 1440\nargs_ = args_ | dict(t_span=(0, Tmax),\n first_step=1e-8,\n t_eval=np.linspace(0, Tmax, 50),\n method=\"Radau\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Select plane\ny_state = sorted(hits, key=lambda k: max([y.allsegs[0][0][:,0].max()-y.allsegs[0][0][:,0].min() for y in hits[k][\"contours\"]]))[0]\ny_state = NHx\ny_state = context[\"states\"][\"X_ANO\"]\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"y_state, idxy = asm.state_by_name(\"X_OHO\")\n\n# initial values\niv_def = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\")[\"initial_value\"]\niv_def = np.maximum(iv_def, 1.0) # avoid 0 initial values\n\nf, axs = plt.subplots(nrows=2, tight_layout=True)\naxs[0].set_xlabel(latex(O2, mode='inline'))\naxs[0].set_ylabel(latex(y_state, mode='inline'))\naxs[1].set_xlabel('t')\naxs[1].set_ylabel(latex(O2, mode='inline'))\nlgd = []\nfor q, idx_sample in zip(hits[y_state][\"contours\"], hits[y_state][\"samples\"]):\n # get point on ipc\n qmid = (2, 1200)\n dmin = np.inf\n pxy = []\n cidx = -1\n for ci, c in enumerate(q):\n d_ = ((c - qmid)**2).sum(axis=1)\n idx = np.argmin(d_)\n if d_[idx] < dmin:\n dmin = d_[idx]\n pxy = c[idx]\n cidx = ci\n ptx, pty = pxy\n\n # integrate ODE\n\n # Generate initial value\n iv = iv_def.copy()\n iv_ = samples.loc[idx_sample].copy()\n iv.loc[iv_.index] = iv_\n iv[O2] = ptx\n iv[y_state] = pty\n iv_ls = iv[list(X)].to_list()\n\n sol = solve_ivp(**args_, y0=iv_ls)\n sol_r = solve_ivp(**args_, y0=iv_ls, args=(-1.0,))\n\n ax = axs[0]\n for ci, c in enumerate(q):\n if ci == cidx:\n ax.plot(c[:, 0], c[:, 1])\n clc = ax.get_lines()[-1].get_color()\n else:\n ax.plot(c[:, 0], c[:, 1], color=\"k\", alpha=0.5)\n ax.plot(ptx, pty, 'xk')\n\n ax = axs[1]\n ax.plot(sol.t, sol.y[idxO2], color=clc)\n ax.plot(-sol_r.t, sol_r.y[idxO2], color=clc)\n ax.plot(0, ptx, 'xk')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Interactive\nAfter all the plots a new interactive plot appears\nThis si not interactive in the example\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# find sample with contour with max length\nlmax = -np.inf\nimax = 0\nfor i, q in zip(hits[y_state][\"samples\"], hits[y_state][\"contours\"]):\n for c in q:\n # approx. length\n lc = np.sum(np.sqrt(np.sum(np.diff(c, axis=0)**2, axis=1)))\n if lc > lmax:\n lmax = lc\n imax = i\nqmax = hits[y_state][\"samples\"].tolist().index(imax)\n\n# Generate initial value\niv = iv_def.copy()\niv_ = samples.loc[imax].copy()\niv.loc[iv_.index] = iv_\niv = iv[list(X)].to_list()\n\n# plot these contours\nf, axs = plt.subplots(nrows=3, tight_layout=True)\naxs[0].set_xlabel(latex(O2, mode='inline'))\naxs[0].set_ylabel(latex(y_state, mode='inline'))\naxs[1].set_xlabel('t')\naxs[1].set_ylabel(latex(O2, mode='inline'))\naxs[2].set_xlabel('t')\naxs[2].set_ylabel(latex(NHx, mode='inline'))\n\nax = axs[0]\nfor c in hits[y_state][\"contours\"][qmax]:\n ax.plot(c[:, 0], c[:, 1])\n# start interactive plot\nXlist = list(X)\ngui.interactive_ipc_ode(O2_limits, (y_state, *hits[y_state][\"range\"]), Xlist, dotX_pv, iv,\n also_show=[NHx],\n fig=f)\n\nplt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}