Commit ee51c48d authored by Chris Jewell's avatar Chris Jewell
Browse files

Initial commit.

parents
#!/usr/bin/python -O
# SIS simulation in discrete time
from optparse import OptionParser
import pickle
import numpy as np
import matplotlib.pyplot as plt
def hhNetwork(numHouseHolds, numPerHouse):
nTotal = numHouseHolds * numPerHouse
net = np.zeros(shape=[nTotal,nTotal])
for i in range(numHouseHolds):
start = i*numPerHouse
stop = start + numPerHouse
net[start:stop,start:stop] = 1.
return net
class K:
def __init__(self,net):
self.net = net
def __call__(self, beta0, beta1, idx):
return beta0 + beta1*self.net[idx] # Background plus pairwise infection on network
@property
def shape(self):
return self.net.shape
class Simulation:
def __init__(self, simmatrix, T, par, kernel):
self.history = simmatrix
self.T = T
self.par = par
self.kernel = kernel
def getHHMat(self):
v = np.dot(self.history, self.kernel.T)
hhMat = v[:, np.arange(0,self.kernel.shape[1],
np.int32(self.kernel[0,:].sum()))]
hhMat = hhMat>0
return hhMat
def plotTimeSeries(self, ax=None):
if ax == None:
ax = plt.figure().gca()
nCls = self.history.sum(axis=1)
ax.plot(self.T, nCls, 'r-')
ax.plot(self.T, self.history.shape[1]-nCls, 'b-')
plt.show()
def plotMatrix(self, ax=None):
if ax == None:
ax = plt.figure().gca()
ax.imshow(self.history.T, aspect='auto')
ax.set_xlabel('Time/days')
ax.set_ylabel('Person id')
def plotMatrixHH(self, ax=None):
if ax == None:
ax = plt.figure().gca()
hhMat = self.getHHMat()
ax.imshow(hhMat.T,aspect='auto')
ax.set_xlabel('Time/days')
ax.set_ylabel('Household id')
class Simulator:
def __init__(self, kernel, timestep=1):
self.kernel = kernel
self.timestep = timestep
self.n = kernel.shape[0]
self.par = {'alpha': 0., 'beta': 0., 'gamma': 0.}
def calcKernel(self,par, idx): # Memoisation -- only recalculate if parameters change
if (par['alpha'] != self.par['alpha']) | (par['beta'] != self.par['beta']):
self.k = self.kernel(par['alpha'], par['beta'], idx)
return self.k
def propagate(self, par, state0, nTimeSteps=1):
newState = state0.copy()
for i in np.arange(nTimeSteps):
S = np.where(state0 == 0)[0]
I = np.where(state0 == 1)[0]
# S->I
k = self.calcKernel(par, np.ix_(I,S)) # pull relevant rows/cols from kernel
lambdaj = k.sum(axis=0) # sum across rows
StoI = None
if S.size > 0: # draw who gets infected
pSI = 1-np.exp(-lambdaj*self.timestep) # Rate -> probability
StoI = np.where(np.random.binomial(1,pSI))[0]
# I->S
pIS = 1. - np.exp(-par['gamma']*self.timestep) # Rate -> probability
ItoS = np.where(np.random.binomial(1,pIS,size=I.size))[0]
# Update state
newState[S[StoI]] = 1
newState[I[ItoS]] = 0
return newState
def simulate(self, par, tmax, state0=None):
if state0 is None:
state0 = np.random.binomial(1,0.5,size=kernel.shape[0])
assert state0.size == self.kernel.shape[0]
T = np.arange(0., tmax, self.timestep)
history = np.zeros([T.size, state0.size])
history[0,:] = state0
for t in range(1,T.size):
state0 = self.propagate(par,state0)
history[t,:] = state0
return Simulation(history,T,par,self.kernel.net)
if __name__ == '__main__':
parser = OptionParser(usage='usage: %prog [options]',
version="%prog 1.0")
parser.add_option("-a", "--alpha",
dest="alpha",
default=6e-6,
type='float',
help="community acquisition rate")
parser.add_option("-b", "--beta",
dest="beta",
default=0.12,
type='float',
help="household acquisition rate")
parser.add_option("-g", "--gamma",
dest="gamma",
default=0.14,
type='float',
help="de-acquisition rate")
parser.add_option("-m","--maxtime",
dest="maxtime",
default=100.,
type='float',
help="max time")
parser.add_option("-s","--timestep",
dest="timestep",
default=1.,
type='float',
help="timestep")
parser.add_option("-o", "--output",
dest="outfile",
type='string',
help="Output filename")
parser.add_option("-p","--plot",
action="store_true",
dest="doPlot",
default=False,
help="Plot the epidemic")
options,args = parser.parse_args()
net = hhNetwork(250,5)
kernel = K(net)
state0 = np.random.binomial(1,0.5,size=kernel.shape[0])
par = {'alpha': options.alpha,
'beta': options.beta,
'gamma': options.gamma}
simulator = Simulator(kernel, options.timestep)
sim = simulator.simulate(par, options.maxtime, state0)
print('Population prevalence at tmax: ', sim.history[-1,:].sum()/kernel.shape[0])
print('Household prevalence at tmax: ', sim.getHHMat()[-1,:].sum()/250.)
if options.doPlot:
f,ax = plt.subplots(2,1)
sim.plotMatrix(ax[0])
sim.plotMatrixHH(ax[1])
plt.show()
if options.outfile:
f = open(options.outfile,'wb')
pickle.dump(sim,f)
f.close()
# Homogeneous SIR model
import pickle as pkl
import arviz
from tqdm import tqdm
import numpy as np
import scipy.stats as st
import matplotlib.pyplot as plt
class SIR:
def __init__(self, pop_size=1000):
self.N = pop_size
def sample(self, beta=0.003, gamma=0.3):
state = [np.array([0, self.N - 1, 1, 0])]
while state[-1][2] > 0:
lambda_si = beta * np.prod(state[-1][1:3])
lambda_ir = gamma * state[-1][2]
ttne = np.random.exponential(1./(lambda_si+lambda_ir))
is_infection = np.random.uniform(0., lambda_si+lambda_ir) < lambda_si
if is_infection:
state.append(state[-1] + np.array([ttne, -1, 1, 0]))
else:
state.append(state[-1] + np.array([ttne, 0, -1, 1]))
return np.array(state)
def plot_sim(sim):
plt.plot(sim[:, 0], sim[:, 2])
plt.xlabel('t')
plt.ylabel('I')
plt.show()
def obs_sample(t, n, N, sim):
i_obs = []
for tt in t:
n_i = sim[sim[:, 0] < tt, 2][-1]
if n == N:
i_obs.append(n_i)
else:
i_obs.append(np.random.binomial(n, n_i/N, 1))
return np.array(i_obs)
def obs_ll(t, num_infec, num_sampled, N, sim):
"""Random binomial sampling observation model"""
part_ll = np.zeros_like(t)
for i, tt in enumerate(t):
n_i_true = sim[sim[:, 0] < tt, 2][-1]
prev = n_i_true/N
part_ll[i] = np.log(prev)*num_infec[i] + np.log(1. - prev)*(num_sampled[i]-num_infec[i])
part_ll[np.isnan(part_ll)] = -np.inf
return np.sum(part_ll)
def estim_ll(m, y, N, beta, gamma):
ll = np.zeros([m])
sir = SIR(N)
for i in range(m):
sim = sir.sample(beta, gamma)
ll[i] = obs_ll(y['t'], y['nI'], y['N'], N, sim)
return np.log(np.mean(np.exp(ll)))
def update_mc(pt, logpi, logp, tune):
"""Log random walk MH update"""
# Propose
ptcan = pt * np.exp(np.random.normal(loc=0., scale=tune))
# Estimate new likelihood
logpican = logp(ptcan) + np.sum(np.log(ptcan)) - np.sum(np.log(pt))
# Accept/reject
alpha = logpican - logpi
if np.log(np.random.uniform()) < alpha:
return True, ptcan, logpican
else:
return False, pt, logpi
def mcmc(samples, y, N, sim):
"""Runs MCMC to calculate posterior of beta and gamma"""
pt = [np.array([0.0006, 0.1])]
accept = 0.
pr_beta = st.distributions.gamma(a=6, scale=1./1000)
pr_gamma = st.distributions.gamma(a=3, scale=1./10)
def llfunc(pt):
return estim_ll(50, y, N, *pt) + pr_beta.logpdf(pt[0]) + pr_gamma.logpdf(pt[1])
ll = llfunc(pt[-1])
for i in tqdm(range(samples-1), mininterval=1):
a, x, ll = update_mc(pt[-1], ll, llfunc, [.4, .5])
pt.append(x)
accept += a
return np.array(pt), accept/samples
def density(x, adj=1.):
xx = np.linspace(x.min(), x.max(), 200)
z = st.gaussian_kde(x, bw_method=lambda d: st.gaussian_kde.silverman_factor(d) * adj)
return xx, z(xx)
if __name__ == '__main__':
N = 100
n = 10
with open('testsim.pkl','rb') as f:
sim = pkl.load(f)
tmax = sim[-1, 0]
t = np.linspace(0., tmax, 12)[1:11]
y = obs_sample(t, n, N, sim)
print("Observed data: ", y)
ll = obs_ll(t, y, np.full_like(y, n), N, sim)
print("Log likelihood:", ll)
obs_data = {'t': t, 'nI': y, 'N': np.full_like(t, n)}
ll_hat = estim_ll(50, obs_data, N, 0.0006, 0.3)
print("Marginal likelihood hat:", ll_hat)
post, accept = mcmc(2000, obs_data, N, sim)
with open('testpost.pkl', 'wb') as f:
pkl.dump(post, f)
print("Acceptance: ", accept)
fig, ax = plt.subplots(1,2)
ax[0].plot(post[:, 0])
ax[1].plot(post[:, 1])
plt.show()
\ No newline at end of file
# Runs pseudomarginal algorithm for household model
import numpy as np
import matplotlib.pyplot as plt
from beccamr.simSIS import Simulator, K, hhNetwork
def ad_sample(sim, hhids, n_hh, n_per_house):
t0 = np.random.randint(0, 30, size=n_hh*hhids.shape[0])
times = t0 + np.array([0, 30, 90, 180]).reshape([4, 1])
pids = np.array([np.random.choice(range(n_per_house), n_hh, replace=False)+hhid*n_per_house
for hhid in hhids]).flatten()
pids.sort()
sampled = np.array([sim.history[times[:, i], pids[i]] for i in np.arange(times.shape[1])])
return pids, times, sampled.T
def loglikelihood(sim, y):
pass
if __name__=='__main__':
net = hhNetwork(250, 4)
kernel = K(net)
state0 = np.random.binomial(1, 0.2, size=kernel.shape[0])
par = {'alpha': 6e-6, 'beta': 0.12, 'gamma': 0.14}
sis = Simulator(kernel, 1.)
sim = sis.simulate(par, 400., state0)
y = ad_sample(sim=sim, hhids=np.random.choice(np.arange(249), 110, replace=False),
n_hh=2, n_per_house=4)
print(y[0])
print(y[1])
print(y[2])
#plt.imshow(y)
#plt.show()
File added
File added
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment