Commit 15177200 authored by Chris Jewell's avatar Chris Jewell
Browse files

Introduced Ruffus-ified pipeline steps

parent b62ba678
"""Function responsible for assembling all data required
to instantiate the COVID19 model"""
def assemble_data(output_file, config):
covar_data = {}
"""Summarises nowcasting and projection data into Geopackage"""
import numpy as np
import geopandas as gp
def geopackage(input_files, output_file, config_dict):
pass
"""Calculates and saves a next generation matrix"""
import argparse
import yaml
import pickle as pkl
import tensorflow as tf
from covid import model_spec
from gemlib.util import compute_state
def calc_posterior_ngm(param, events, init_state, covar_data):
"""Calculates effective reproduction number for batches of metapopulations
:param theta: a tensor of batched theta parameters [B] + theta.shape
:param xi: a tensor of batched xi parameters [B] + xi.shape
:param events: a [B, M, T, X] batched events tensor
:param init_state: the initial state of the epidemic at earliest inference date
:param covar_data: the covariate data
:return a batched vector of R_it estimates
"""
def r_fn(args):
beta1_, beta2_, beta3_, sigma_, xi_, gamma0_, events_ = args
t = events_.shape[-2] - 1
state = compute_state(init_state, events_, model_spec.STOICHIOMETRY)
state = tf.gather(state, t, axis=-2) # State on final inference day
model = model_spec.CovidUK(
covariates=covar_data,
initial_state=init_state,
initial_step=0,
num_steps=events_.shape[-2],
)
par = dict(
beta1=beta1_,
beta2=beta2_,
beta3=beta3_,
sigma=sigma_,
gamma0=gamma0_,
xi=xi_,
)
ngm_fn = model_spec.next_generation_matrix_fn(covar_data, par)
ngm = ngm_fn(t, state)
return ngm
return tf.vectorized_map(
r_fn,
elems=(
param["beta1"],
param["beta2"],
param["beta3"],
param["sigma"],
param["xi"],
param["gamma0"],
events,
),
)
def next_generation_matrix(input_files, output_file):
with open(input_files[0], "rb") as f:
covar_data = pkl.load(f)
with open(input_files[1], "rb") as f:
param = pkl.load(f)
# Compute ngm posterior
ngm = calc_posterior_ngm(
param, param["events"], param["init_state"], covar_data
)
# Output
with open(output_file, "wb") as f:
pkl.dump(ngm, f)
if __name__ == "__main__":
next_generation_matrix(input_files, output_file)
"""Calculates overall Rt given a posterior next generation matix"""
import numpy as np
import pickle as pkl
import pandas as pd
from covid.summary import (
rayleigh_quotient,
power_iteration,
)
def overall_rt(next_generation_matrix, output_file):
with open(next_generation_matrix, "rb") as f:
ngms = pkl.load(f)
b, _ = power_iteration(ngms)
rt = rayleigh_quotient(ngms, b)
q = np.arange(0.05, 1.0, 0.05)
rt_quantiles = pd.DataFrame(
{"Rt": np.quantile(rt, q, axis=-1)}, index=q
).T.to_excel(output_file)
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"input_file",
description="The input .pkl file containing the next generation matrix",
)
parser.add_argument(
"output_file", description="The name of the output .xlsx file"
)
args = parser.parse_args()
overall_rt(args.input_file, args.output_file)
"""Run predictions for COVID-19 model"""
import pickle as pkl
import tensorflow as tf
from covid import model_spec
from gemlib.util import compute_state
@tf.function
def predicted_incidence(posterior_samples, covar_data, init_step, num_steps):
"""Runs the simulation forward in time from `init_state` at time `init_time`
for `num_steps`.
:param param: a dictionary of model parameters
:covar_data: a dictionary of model covariate data
:param init_step: the initial time step
:param num_steps: the number of steps to simulate
:returns: a tensor of srt_quhape [B, M, num_steps, X] where X is the number of state
transitions
"""
def sim_fn(args):
beta1_, beta2_, beta3_, sigma_, gamma0_, gamma1_, init_ = args
par = dict(
beta1=beta1_,
beta2=beta2_,
beta3=beta3_,
xi=xi_,
gamma0=gamma0_,
gamma1=gamma1_,
)
model = model_spec.CovidUK(
covar_data,
initial_state=init_,
initial_step=init_step,
num_steps=num_steps,
)
sim = model.sample(**par)
return sim["seir"]
posterior_state = compute_state(
covar_data["init_state"],
posterior_samples["seir"],
model_spec.STOICHIOMETRY,
)
init_state = posterior_state[..., init_step, :]
events = tf.map_fn(
sim_fn,
elems=(
posterior_samples["beta1"],
posterior_samples["beta2"],
posterior_samples["beta3"],
posterior_samples["sigma"],
posterior_samples["xi"],
posterior_samples["gamma0"],
posterior_samples["gamma1"],
init_state,
),
fn_output_signature=(tf.float64),
)
return events
def read_pkl(filename):
with open(filename, "rb") as f:
return pkl.load(f)
def predict(data, posterior_samples, output_file, initial_step, num_steps):
covar_data = read_pkl(data)
samples = read_pkl(posterior_samples)
if initial_step < 0:
initial_step = samples["events"].shape[-2] + initial_step
prediction = predicted_incidence(
samples, covar_data, initial_step, num_steps
)
with open(output_file, "wb") as f:
pkl.dump(prediction.numpy(), f)
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument(
"-i", "--initial-step", type=int, default=0, description="Initial step"
)
parser.add_argument(
"-n", "--num-steps", type=int, default=1, description="Number of steps"
)
parser.add_argument(
"data_pkl", type=str, description="Covariate data pickle"
)
parser.add_argument(
"posterior_samples_pkl",
type=str,
description="Posterior samples pickle",
)
parser.add_argument(
"output_file",
type=str,
description="Output pkl file",
)
args = parser.parse_args()
predict(
args.data_pkl,
args.posterior_samples_pkl,
args.output_file,
args.initial_step,
args.num_steps,
)
"""Thin posterior"""
import h5py
import pickle as pkl
def thin_posterior(input_file, output_file, thin_idx):
f = h5py.File(input_file, "r", rdcc_nbytes=1024 ** 3, rdcc_nslots=1e6)
output_dict = dict(
beta1=f["samples/beta1"][thin_idx],
beta2=f["samples/beta2"][thin_idx],
beta3=f["samples/beta3"][
thin_idx,
],
sigma=f["samples/sigma"][
thin_idx,
],
xi=f["samples/xi"][thin_idx],
gamma0=f["samples/gamma0"][thin_idx],
gamma1=f["samples/gamma1"][thin_idx],
seir=f["samples/events"][thin_idx],
init_state=f["initial_state"][:],
)
f.close()
with open(output_file, "wb") as f:
pkl.dump(
output_dict,
f,
)
if __name__ == "__main__":
import yaml
import os
from covid.cli_arg_parse import cli_args
args = cli_args()
with open(args.config, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
def join_and_expand(prefix, filename):
return os.path.expandvars(os.path.join(prefix, filename))
input_file = join_and_expand(
config["output"]["results_dir"], config["output"]["posterior"]
)
output_file = join_and_expand(
config["output"]["results_dir"], config["ThinPosterior"]["output_file"]
)
thin_idx = range(
config["ThinPosterior"]["thin_start"],
config["ThinPosterior"]["thin_end"],
config["ThinPosterior"]["thin_by"],
)
thin_posterior(input_file, output_file, thin_idx)
......@@ -59,3 +59,7 @@ TierData:
input: api
address: None
format: api
NextGenerationMatrix:
output_file: next_generation_matrix.pkl
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment