{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n# ASM1: confounding on the inflection points of the S_O\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Copyright (C) 2024 Juan Pablo Carbajal\n# Copyright (C) 2024 Mariane Yvonne Schneider\n#\n# This program is free software; you can redistribute it and/or modify\n# it under the terms of the GNU General Public License as published by\n# the Free Software Foundation; either version 3 of the License, or\n# (at your option) any later version.\n#\n# This program is distributed in the hope that it will be useful,\n# but WITHOUT ANY WARRANTY; without even the implied warranty of\n# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n# GNU General Public License for more details.\n#\n# You should have received a copy of the GNU General Public License\n# along with this program. If not, see .\n\n# Author: Juan Pablo Carbajal \n# Author: Mariane Yvonne Schneider \nimport matplotlib.pyplot as plt\nimport numpy as np\nimport pandas as pd\nfrom scipy.integrate import solve_ivp\nfrom sympy import *\n\ntry:\n import dsafeatures\nexcept ModuleNotFoundError:\n import sys\n import os\n\n sys.path.insert(0, os.path.abspath(\"../..\"))\n\nimport dsafeatures.odemodel as m\nimport dsafeatures.symtools as st\nimport dsafeatures.signal_processing as sp\nimport dsafeatures.analysis as an\nimport dsafeatures.interactive as gui\n\nfrom dsafeatures.printing import *\ninit_printing()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the model\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "asm = m.ODEModel.from_csv(name=\"ASM1\")\n# States\nX = asm.states\n# Process rates\nr_sym, r = asm.rates, asm.rates_expr\nJr = r.jacobian(X)\n# Matrix\nM = asm.coef_matrix\n# State derivative\ndotX = asm.derivative()\n\nO2, idxO2 = asm.state_by_name(\"S_O2\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Aeration\nDefine aeration\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "oxyflow = 1000.0 # oxygen intake flow\naer = m.oxygen_PC(O2, max_inflow=oxyflow)\ndotX[idxO2] += aer" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Inflection point curve\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "ipc = st.ddXdt(M[idxO2, :], Jr, dotX)[0] + aer.diff(O2) * dotX[idxO2]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parameter values\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "param_values = dict(asm.parameters(and_values=True).set_index(\"sympy\").value)\nparam_values = {k: v.subs(param_values) for k, v in param_values.items()}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "replace parameter symbols with values\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "ipc_pv = ipc.subs(zip(r_sym, r)).subs(param_values).doit()\nipc_pv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Find on which states the IPC depends on\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "ipc_states = sorted(tuple(x for x in X if ipc.has(x)), key=str)\nipc_states" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "same for ODE system\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "dotX_pv = dotX.subs(zip(r_sym, r)).subs(param_values).doit()\ndO2dt_states = sorted(tuple(x for x in X if dotX_pv[idxO2].has(x)), key=str)\ndO2dt_states" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "states_to_set = sorted(list(set(ipc_states) | set(dO2dt_states)), key=str)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Numerical check\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "State values initial values\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "states_iv = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\").loc[list(X), \"initial_value\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Integrate numerically\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "Tend = 0.125\nsol, dX_np, J_np = sp.integrate_numer(dotX_pv, X, X_iv=states_iv, t_span=(0, Tend), max_step=1e-3)\nipc_np = lambdify([tuple(X)], ipc_pv, \"numpy\", cse=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Spline representation\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "t = np.arange(0, Tend, Tend/1000)\ny = sol.sol(t)\ndx_val = np.zeros_like(t)\nipc_val = np.zeros_like(t)\nfor k, x_ in enumerate(y.T):\n dx_val[k] = dX_np(x_).flatten()[idxO2]\n ipc_val[k] = ipc_np(x_)\ndx_s = sp.make_interp_spline(t, dx_val, k=5)\nipc_s = dx_s.derivative()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "fig, axs = plt.subplots(nrows=3, sharex=True)\nax = axs[0]\nax.plot(t, y[idxO2], ':', label=\"numerical\")\nax.set_ylabel(latex(O2, mode=\"inline\"))\n\nax = axs[1]\nax.plot(t, dx_val, label=\"symbolic\")\nax.plot(t, dx_s(t), ':', label=\"numerical\")\nax.set_ylabel(latex(O2, mode=\"inline\")+\" 1st derivative\")\n\nax = axs[2]\nax.plot(t, ipc_val, label=\"symbolic\")\nax.plot(t, ipc_s(t), ':', label=\"numerical\")\nax.set_ylabel(latex(O2, mode=\"inline\")+\" 2nd derivative\")\nax.set_xlabel(\"time\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Analysis\n%%\nSet limits for O2\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "maxO2 = 9.0\nO2_limits = (O2, 1e-3, maxO2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set value for NHx\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "NHx, idxNHx = asm.state_by_name(\"S_NHx\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Find zeros of ipc for all states with random values\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# minimum value for derivative\ndotO2_min = 4e-2\n\n# initialize random generator\nrng = np.random.default_rng(list(map(ord, \"abracadabra\")))\n\n# initial values of states to set\nstates_iv = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\").loc[states_to_set, \"initial_value\"]\n\nnsamples = 100\nsamples = pd.DataFrame(columns=states_to_set, dtype=float)\nfor Y in states_to_set:\n if Y == NHx:\n samples.loc[:, Y] = rng.uniform(low=1.0, high=200.0, size=nsamples)\n else:\n samples.loc[:, Y] = rng.uniform(low=1e-3, high=min(max(states_iv[Y] * 10, 10), 5e3), size=nsamples)\n\nif \"hits\" not in locals():\n hits = dict()\n ngrid = 60\n # normalized grid\n g1, g2 = np.meshgrid(*(np.linspace(0, 1, ngrid), )*2)\n # grid for oxygen\n xx = g1 * (O2_limits[2] - O2_limits[1]) + O2_limits[1]\n # oxygen derivative\n dotO2 = dotX_pv[idxO2]\n\n for Y in states_to_set:\n if Y == O2:\n continue\n # grid for Y state, default estimated maximum\n ymax = max(20, 3 * states_iv[Y])\n yy = g2 * ymax\n\n other_states = tuple(s for s in states_to_set if s not in (O2, Y))\n q, vv = an.masked_contours2d(ipc_pv, vars=(O2, Y), params=other_states,\n vars_grid=(xx, yy), params_samples=samples.loc[:, list(other_states)].to_numpy(),\n maskfunc=[(dotO2, dotO2_min)])\n if vv:\n ql = np.vstack([l_ for q_ in q for l_ in q_])\n yrng = (ql[:, 1].min(), ql[:,1].max())\n hits[Y] = {\"states\": other_states, \"samples\": np.asarray(vv), \"contours\": q, \"range\": yrng}\n\n fig, ax = plt.subplots()\n for q_ in q:\n for l_ in q_:\n ax.plot(l_[:,0], l_[:,1], 'k', alpha=0.5)\n ax.set_xlabel(latex(O2, mode='inline')+' [g$\\mathrm{m}^{-3}$]')\n ax.set_ylabel(latex(Y, mode='inline')+' [g$\\mathrm{m}^{-3}$]')\n if Y == NHx:\n plt.axhline(y=1.0, color='r', linestyle='-', label='threshold (T)')\n plt.legend()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The numer of random samples of the states values that generated an ipc\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "hits_sz = {k: v[\"samples\"].size for k, v in hits.items()}\nhits_sz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Trajectory of point in ipc\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "ODE definition\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "args_ = an.numeric_ode(dotX_pv, list(X), ub={O2: maxO2})\nTmax = 30.0 / 1440\n\ndef ev(t, y, dir=1.0):\n return ipc_np(y)\nev.terminal = 0\nev.direction = -1\n\nargs_ = args_ | dict(t_span=(0, Tmax),\n first_step=1e-8,\n max_step=1e-3,\n t_eval=np.linspace(0, Tmax, 50),\n method=\"Radau\",\n events=[ev])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Select plane\ny_state = sorted(hits, key=lambda k: max([y.allsegs[0][0][:,0].max()-y.allsegs[0][0][:,0].min() for y in hits[k][\"contours\"]]))[0]\ny_state = NHx\ny_state = context[\"states\"][\"X_ANO\"]\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "y_state, idxy = asm.state_by_name(\"X_OHO\")\n\n# initial values\niv_def = asm.component[\"states\"][[\"sympy\", \"initial_value\"]].set_index(\"sympy\")[\"initial_value\"]\niv_def = np.maximum(iv_def, 1.0) # avoid 0 initial values\n\nf, axs = plt.subplots(nrows=2, tight_layout=True)\naxs[0].set_xlabel(latex(O2, mode='inline'))\naxs[0].set_ylabel(latex(y_state, mode='inline'))\naxs[1].set_xlabel('t')\naxs[1].set_ylabel(latex(O2, mode='inline'))\nlgd = []\nfor q, idx_sample in zip(hits[y_state][\"contours\"], hits[y_state][\"samples\"]):\n # get point on ipc\n qmid = (2, 1200)\n dmin = np.inf\n pxy = []\n cidx = -1\n for ci, c in enumerate(q):\n d_ = ((c - qmid)**2).sum(axis=1)\n idx = np.argmin(d_)\n if d_[idx] < dmin:\n dmin = d_[idx]\n pxy = c[idx]\n cidx = ci\n ptx, pty = pxy\n\n # integrate ODE\n\n # Generate initial value\n iv = iv_def.copy()\n iv_ = samples.loc[idx_sample].copy()\n iv.loc[iv_.index] = iv_\n iv[O2] = ptx\n iv[y_state] = pty\n iv_ls = iv[list(X)].to_list()\n\n sol = solve_ivp(**args_, y0=iv_ls)\n sol_r = solve_ivp(**args_, y0=iv_ls, args=(-1.0,))\n\n ax = axs[0]\n for ci, c in enumerate(q):\n if ci == cidx:\n ax.plot(c[:, 0], c[:, 1])\n clc = ax.get_lines()[-1].get_color()\n else:\n ax.plot(c[:, 0], c[:, 1], color=\"k\", alpha=0.5)\n ax.plot(ptx, pty, 'xk')\n\n ax = axs[1]\n ax.plot(sol.t, sol.y[idxO2], color=clc)\n ax.plot(-sol_r.t, sol_r.y[idxO2], color=clc)\n ax.plot(0, ptx, 'xk')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interactive\nAfter all the plots a new interactive plot appears\nThis si not interactive in the example\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# find sample with contour with max length\nlmax = -np.inf\nimax = 0\nfor i, q in zip(hits[y_state][\"samples\"], hits[y_state][\"contours\"]):\n for c in q:\n # approx. length\n lc = np.sum(np.sqrt(np.sum(np.diff(c, axis=0)**2, axis=1)))\n if lc > lmax:\n lmax = lc\n imax = i\nqmax = hits[y_state][\"samples\"].tolist().index(imax)\n\n# Generate initial value\niv = iv_def.copy()\niv_ = samples.loc[imax].copy()\niv.loc[iv_.index] = iv_\niv = iv[list(X)].to_list()\n\n# plot these contours\nf, axs = plt.subplots(nrows=3, tight_layout=True)\naxs[0].set_xlabel(latex(O2, mode='inline'))\naxs[0].set_ylabel(latex(y_state, mode='inline'))\naxs[1].set_xlabel('t')\naxs[1].set_ylabel(latex(O2, mode='inline'))\naxs[2].set_xlabel('t')\naxs[2].set_ylabel(latex(NHx, mode='inline'))\n\nax = axs[0]\nfor c in hits[y_state][\"contours\"][qmax]:\n ax.plot(c[:, 0], c[:, 1])\n# start interactive plot\nXlist = list(X)\ngui.interactive_ipc_ode(O2_limits, (y_state, *hits[y_state][\"range\"]), Xlist, dotX_pv, iv,\n also_show=[NHx],\n fig=f)\n\nplt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.2" } }, "nbformat": 4, "nbformat_minor": 0 }