### Wrote chainbinom_simulate.py function. Halfway through re-writing the ODE...

`Wrote chainbinom_simulate.py function.  Halfway through re-writing the ODE class (which should now not be an ODE!).`
parent 97b545d4
 """Functions for chain binomial simulation.""" import numpy as np import tensorflow as tf import tensorflow_probability as tfp tfd = tfp.distributions def update_state(update, state, stoichiometry): update = tf.expand_dims(update, 1) # Rx1xN update *= tf.expand_dims(stoichiometry, -1) # RxSx1 update = tf.reduce_sum(update, axis=0) # SxN return state + update def chain_binomial_propagate(h, time_step): """Propagates the state of a population according to discrete time dynamics. def chain_binomial_propagate(h, time_step, stoichiometry): :param h: a hazard rate function returning the non-row-normalised Markov transition rate matrix :param time_step: the time step :returns : a function that propagate `state[t]` -> `state[t+time_step]` """ def propagate_fn(state): state_idx, rates = h(state) probs = 1 - tf.exp(-rates*time_step) # RxN state_mult = tf.scatter_nd(state_idx[:, None], state, shape=[state_idx.shape, state.shape, state.shape]) update = tfd.Binomial(state_mult, probs=probs).sample() # RxN update = tf.expand_dims(update, 1) # Rx1xN upd_shape = tf.concat([stoichiometry.shape, tf.fill([tf.rank(state)-1], 1)], axis=0) update *= tf.reshape(stoichiometry, upd_shape) # RxSx1 update = tf.reduce_sum(update, axis=0) state = state + update return state # State is assumed to be of shape [s, n] where s is the number of states # and n is the number of population strata. # TODO: having state as [s, n] means we have to do some funky transposition. It may be better # to have state.shape = [n, s] which avoids transposition below, but may lead to slower # rate calculations. rate_matrix = h(state) rate_matrix = tf.transpose(rate_matrix, perm=[2, 0, 1]) # Set diagonal to be the negative of the sum of other elements in each row rate_matrix = tf.linalg.set_diag(rate_matrix, -tf.reduce_sum(rate_matrix, axis=2)) # Calculate Markov transition probability matrix markov_transition = tf.linalg.expm(rate_matrix*time_step) # Sample new state new_state = tfd.Multinomial(total_count=tf.transpose(state), probs=markov_transition).sample() new_state = tf.reduce_sum(new_state, axis=1) return tf.transpose(new_state) return propagate_fn def chain_binomial_simulate(hazard_fn, state, start, end, time_step, stoichiometry): def chain_binomial_simulate(hazard_fn, state, start, end, time_step): propagate = chain_binomial_propagate(hazard_fn, time_step, stoichiometry) propagate = chain_binomial_propagate(hazard_fn, time_step) times = tf.range(start, end, time_step) output = tf.TensorArray(tf.float64, size=times.shape) ... ... @@ -40,6 +44,7 @@ def chain_binomial_simulate(hazard_fn, state, start, end, time_step, stoichiomet state = propagate(state) output = output.write(i, state) with tf.device("/CPU:0"): sim = output.gather(tf.range(times.shape)) sim = output.gather(tf.range(times.shape)) return times, sim
 ... ... @@ -102,7 +102,7 @@ class CovidUKODE: # TODO: add background case importation rate to the UK, e.g. def make_h(self, param): def h_fn(t, state): state = tf.unstack(state, axis=0) S, E, I, R = state # Integrator may produce time values outside the range desired, so # we clip, implicitly assuming the outside dates have the same ... ... @@ -114,15 +114,14 @@ class CovidUKODE: # TODO: add background case importation rate to the UK, e.g. infec_rate = param['beta1'] * ( tf.gather(self.M.matvec(I), m_switch) + param['beta2'] * self.Kbar * commute_volume * self.C.matvec(I / self.N_sum)) infec_rate = S / self.N * infec_rate infec_rate = infec_rate / self.N dS = -infec_rate dE = infec_rate - param['nu'] * E dI = param['nu'] * E - param['gamma'] * I dR = param['gamma'] * I SE = infec_rate EI = param['nu'] IR = param['gamma'] df = tf.stack([dS, dE, dI, dR]) return df p = 1 - tf.exp([SE, EI, IR]) return p return h_fn ... ...
 date,percent 2020-01-11,1.05 2020-01-12,1.07 2020-01-13,1.05 2020-01-14,1.05 2020-01-15,1.05 2020-01-16,1.06 2020-01-17,1.05 2020-01-18,1.06 2020-01-19,1.05 2020-01-20,1.06 2020-01-21,1.06 2020-01-22,1.07 2020-01-23,1.07 2020-01-24,1.07 2020-01-25,1.06 2020-01-26,1.06 2020-01-27,1.06 2020-01-28,1.06 2020-01-29,1.05 2020-01-30,1.03 2020-01-31,1.04 2020-02-01,1.05 2020-02-02,1.04 2020-02-03,1.02 2020-02-04,1.01 2020-02-05,1.02 2020-02-06,1.04 2020-02-07,1.03 2020-02-08,1.02 2020-02-09,1.01 2020-02-10,1.02 2020-02-11,1.02 2020-02-12,1.02 2020-02-13,1.01 2020-02-14,1.03 2020-02-15,1.01 2020-02-16,1.02 2020-02-17,1.02 2020-02-18,1.01 2020-02-19,1.02 2020-02-20,1.00 2020-02-21,0.98 2020-02-22,0.99 2020-02-23,1.00 2020-02-24,1.01 2020-02-25,1.01 2020-02-26,0.99 2020-02-27,0.98 2020-02-28,0.97 2020-02-29,0.96 2020-03-01,0.96 2020-03-02,0.94 2020-03-03,0.93 2020-03-04,0.93 2020-03-05,0.94 2020-03-06,0.95 2020-03-07,0.95 2020-03-08,0.94 2020-03-09,0.94 2020-03-10,0.91 2020-03-11,0.89 2020-03-12,0.85 2020-03-13,0.80 2020-03-14,0.77 2020-03-15,0.73 2020-03-16,0.64 2020-03-17,0.51 2020-03-18,0.40 2020-03-19,0.30 2020-03-20,0.20
 import unittest import tensorflow as tf import matplotlib.pyplot as plt import time from covid.impl.chainbinom_simulate import chain_binomial_simulate class TestSimulator(unittest.TestCase): def test_homogenousSIR(self): state = tf.constant([, , ], dtype=tf.float32) stoichiometry = tf.constant([[-1, 1, 0], [0, -1, 1]], dtype=tf.float32) def h(state): return tf.stack([[0.4 * state[1, 0]/tf.reduce_sum(state)], [0.14]]) sim = chain_binomial_simulate(h, state, 0., 365., 1, stoichiometry) print(sim) state = tf.constant([[999, 1500], [1, 10], [0, 0]], dtype=tf.float64) def h(param): def f(state): rates = [param * state/tf.reduce_sum(state, axis=0), tf.broadcast_to([param], shape=[state.shape])] indices = [[0, 1], [1, 2]] rate_matrix = tf.scatter_nd(updates=rates, indices=indices, shape=[state.shape, state.shape, state.shape]) return rate_matrix return f @tf.function def simulate_one(param): """One realisation of the epidemic process :param param: 1d tensor of parameters beta and gamma""" t, sim = chain_binomial_simulate(h(param), state, 0., 365., 1.) return sim @tf.function def simulate(param): """Simulate a bunch of epidemics at once""" sim = tf.map_fn(simulate_one, param, parallel_iterations=10) return sim start = time.perf_counter() param = tf.broadcast_to(tf.constant([0.4, 0.14], dtype=tf.float64), shape=[100, 2]) sim = simulate(param) end = time.perf_counter() print(f"Complete in {end-start} seconds") plt.plot(sim[0, :, 1, :]) plt.show() if __name__ == '__main__': ... ...
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!