Commit 119e0dbd authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'add-aws' into mod-hmc-gibbs

parents 3462054d 16bad2b7
...@@ -134,7 +134,7 @@ def CovidUK(covariates, initial_state, initial_step, num_steps): ...@@ -134,7 +134,7 @@ def CovidUK(covariates, initial_state, initial_step, num_steps):
def alpha_t(alpha_0): def alpha_t(alpha_0):
return BrownianMotion( return BrownianMotion(
tf.range(num_steps, dtype=DTYPE), x0=alpha_0, scale=0.01 tf.range(num_steps, dtype=DTYPE), x0=alpha_0, scale=0.005
) )
def gamma0(): def gamma0():
......
"""A Ruffus-ised pipeline for COVID-19 analysis""" """A Ruffus-ised pipeline for COVID-19 analysis"""
import os
from os.path import expandvars from os.path import expandvars
import warnings
import yaml import yaml
import datetime import datetime
import s3fs
import ruffus as rf import ruffus as rf
from covid.ruffus_pipeline import run_pipeline from covid.ruffus_pipeline import run_pipeline
...@@ -61,6 +64,9 @@ if __name__ == "__main__": ...@@ -61,6 +64,9 @@ if __name__ == "__main__":
data_args.add_argument( data_args.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"] "--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
) )
data_args.add_argument(
"--aws", action='store_true', help="Push to AWS"
)
cli_options = argparser.parse_args() cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config) global_config = _import_global_config(cli_options.config)
......
"""Represents the analytic pipeline as a ruffus chain""" """Represents the analytic pipeline as a ruffus chain"""
import os import os
import warnings
import yaml import yaml
from datetime import datetime from datetime import datetime
from uuid import uuid1 from uuid import uuid1
import json import json
import netCDF4 as nc import netCDF4 as nc
import s3fs
import pandas as pd import pandas as pd
import ruffus as rf import ruffus as rf
...@@ -14,7 +16,7 @@ from covid.tasks import ( ...@@ -14,7 +16,7 @@ from covid.tasks import (
assemble_data, assemble_data,
mcmc, mcmc,
thin_posterior, thin_posterior,
next_generation_matrix, reproduction_number,
overall_rt, overall_rt,
predict, predict,
summarize, summarize,
...@@ -91,11 +93,11 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -91,11 +93,11 @@ def run_pipeline(global_config, results_directory, cli_options):
rf.transform( rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=wd("ngm.nc"), output=wd("reproduction_number.nc"),
)(next_generation_matrix) )(reproduction_number)
rf.transform( rf.transform(
input=next_generation_matrix, input=reproduction_number,
filter=rf.formatter(), filter=rf.formatter(),
output=wd("national_rt.xlsx"), output=wd("national_rt.xlsx"),
)(overall_rt) )(overall_rt)
...@@ -146,7 +148,7 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -146,7 +148,7 @@ def run_pipeline(global_config, results_directory, cli_options):
# Summarisation # Summarisation
rf.transform( rf.transform(
input=next_generation_matrix, input=reproduction_number,
filter=rf.formatter(), filter=rf.formatter(),
output=wd("rt_summary.csv"), output=wd("rt_summary.csv"),
)(summarize.rt) )(summarize.rt)
...@@ -184,15 +186,15 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -184,15 +186,15 @@ def run_pipeline(global_config, results_directory, cli_options):
df.to_csv(output_file) df.to_csv(output_file)
# Plot in-sample # Plot in-sample
@rf.transform( # @rf.transform(
input=[insample7, insample14], # input=[insample7, insample14],
filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"), # filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"),
add_inputs=rf.add_inputs(process_data), # add_inputs=rf.add_inputs(process_data),
output="{path[0]}/insample_plots{LAG[0]}", # output="{path[0]}/insample_plots{LAG[0]}",
extras=["{LAG[0]}"], # extras=["{LAG[0]}"],
) # )
def plot_insample_predictive_timeseries(input_files, output_dir, lag): # def plot_insample_predictive_timeseries(input_files, output_dir, lag):
insample_predictive_timeseries(input_files, output_dir, lag) # insample_predictive_timeseries(input_files, output_dir, lag)
# Geopackage # Geopackage
rf.transform( rf.transform(
...@@ -221,11 +223,34 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -221,11 +223,34 @@ def run_pipeline(global_config, results_directory, cli_options):
insample7, insample7,
insample14, insample14,
medium_term, medium_term,
next_generation_matrix, reproduction_number,
] ]
], ],
rf.formatter(), rf.formatter(),
wd("summary_longformat.xlsx"), wd("summary_longformat.xlsx"),
)(summary_longformat) )(summary_longformat)
@rf.active_if(cli_options.aws)
@rf.transform(
input=[
process_data,
run_mcmc,
insample7,
insample14,
medium_term,
reproduction_number,
],
filter=rf.formatter(),
output="{subdir[0][0]}/{basename[0]}{ext[0]}",
extras=[global_config["AWSS3"]],
)
def upload_to_aws(input_file, output_file, config):
obj_path = f"{config['bucket']}/{output_file}"
s3 = s3fs.S3FileSystem(profile=config["profile"])
if not s3.exists(obj_path):
print(f"Copy {input_file} to {obj_path}", flush=True)
s3.put(input_file, obj_path)
else:
warnings.warn(f"Path '{obj_path}' already exists, not uploading.")
rf.cmdline.run(cli_options) rf.cmdline.run(cli_options)
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
from covid.tasks.assemble_data import assemble_data from covid.tasks.assemble_data import assemble_data
from covid.tasks.inference import mcmc from covid.tasks.inference import mcmc
from covid.tasks.thin_posterior import thin_posterior from covid.tasks.thin_posterior import thin_posterior
from covid.tasks.next_generation_matrix import next_generation_matrix from covid.tasks.next_generation_matrix import reproduction_number
from covid.tasks.overall_rt import overall_rt from covid.tasks.overall_rt import overall_rt
from covid.tasks.predict import predict from covid.tasks.predict import predict
import covid.tasks.summarize as summarize import covid.tasks.summarize as summarize
...@@ -18,7 +18,7 @@ __all__ = [ ...@@ -18,7 +18,7 @@ __all__ = [
"assemble_data", "assemble_data",
"mcmc", "mcmc",
"thin_posterior", "thin_posterior",
"next_generation_matrix", "reproduction_number",
"overall_rt", "overall_rt",
"predict", "predict",
"summarize", "summarize",
......
...@@ -5,13 +5,12 @@ import numpy as np ...@@ -5,13 +5,12 @@ import numpy as np
import xarray import xarray
import tensorflow as tf import tensorflow as tf
from covid import model_spec from covid import model_spec
from covid.util import copy_nc_attrs from covid.util import copy_nc_attrs
from gemlib.util import compute_state from gemlib.util import compute_state
def calc_posterior_ngm(samples, covar_data): def calc_posterior_rit(samples, initial_state, times, covar_data):
"""Calculates effective reproduction number for batches of metapopulations """Calculates effective reproduction number for batches of metapopulations
:param theta: a tensor of batched theta parameters [B] + theta.shape :param theta: a tensor of batched theta parameters [B] + theta.shape
:param xi: a tensor of batched xi parameters [B] + xi.shape :param xi: a tensor of batched xi parameters [B] + xi.shape
...@@ -20,63 +19,75 @@ def calc_posterior_ngm(samples, covar_data): ...@@ -20,63 +19,75 @@ def calc_posterior_ngm(samples, covar_data):
:param covar_data: the covariate data :param covar_data: the covariate data
:return a batched vector of R_it estimates :return a batched vector of R_it estimates
""" """
times = tf.convert_to_tensor(times)
def r_fn(args): def r_fn(args):
beta1_, beta2_, beta3_, sigma_, xi_, gamma0_, events_ = args
t = events_.shape[-2] - 1 par = tf.nest.pack_sequence_as(samples, args)
state = compute_state( state = compute_state(
samples["init_state"], events_, model_spec.STOICHIOMETRY initial_state, par["seir"], model_spec.STOICHIOMETRY
)
state = tf.gather(state, t, axis=-2) # State on final inference day
par = dict(
beta1=beta1_,
beta2=beta2_,
beta3=beta3_,
sigma=sigma_,
gamma0=gamma0_,
xi=xi_,
) )
ngm_fn = model_spec.next_generation_matrix_fn(covar_data, par) del par["seir"]
ngm = ngm_fn(t, state)
return ngm def fn(t):
state_ = tf.gather(
state, t, axis=-2
) # State on final inference day
ngm_fn = model_spec.next_generation_matrix_fn(covar_data, par)
ngm = ngm_fn(t, state_)
return ngm
ngm = tf.vectorized_map(fn, elems=times)
return tf.reduce_sum(ngm, axis=-2) # sum over destinations
return tf.vectorized_map( return tf.vectorized_map(
r_fn, r_fn,
elems=( elems=tf.nest.flatten(samples),
samples["beta1"],
samples["beta2"],
samples["beta3"],
samples["sigma"],
samples["xi"],
samples["gamma0"],
samples["seir"],
),
) )
def next_generation_matrix(input_files, output_file): CHUNKSIZE = 50
def reproduction_number(input_files, output_file):
covar_data = xarray.open_dataset(input_files[0], group="constant_data") covar_data = xarray.open_dataset(input_files[0], group="constant_data")
with open(input_files[1], "rb") as f: with open(input_files[1], "rb") as f:
samples = pkl.load(f) samples = pkl.load(f)
num_samples = samples["seir"].shape[0]
initial_state = samples["initial_state"]
del samples["initial_state"]
times = np.arange(covar_data.coords["time"].shape[0])
# Compute ngm posterior in chunks to prevent over-memory
r_its = []
for i in range(0, num_samples, CHUNKSIZE):
start = i
end = np.minimum(i + CHUNKSIZE, num_samples)
print(f"Chunk {start}:{end}", flush=True)
subsamples = {k: v[start:end] for k, v in samples.items()}
r_it = calc_posterior_rit(subsamples, initial_state, times, covar_data)
r_its.append(r_it)
# Compute ngm posterior r_it = xarray.DataArray(
ngm = calc_posterior_ngm(samples, covar_data).numpy() tf.concat(r_its, axis=0),
ngm = xarray.DataArray(
ngm,
coords=[ coords=[
np.arange(ngm.shape[0]), np.arange(num_samples),
covar_data.coords["location"], covar_data.coords["time"][times],
covar_data.coords["location"], covar_data.coords["location"],
], ],
dims=["iteration", "dest", "src"], dims=["iteration", "time", "location"],
) )
ngm = xarray.Dataset({"ngm": ngm}) weight = covar_data["N"] / covar_data["N"].sum()
r_t = (r_it * weight).sum(dim="location")
ds = xarray.Dataset({"R_it": r_it, "R_t": r_t})
# Output # Output
ngm.to_netcdf(output_file, group="posterior_predictive") ds.to_netcdf(output_file, group="posterior_predictive")
copy_nc_attrs(input_files[0], output_file) copy_nc_attrs(input_files[0], output_file)
...@@ -86,22 +97,20 @@ if __name__ == "__main__": ...@@ -86,22 +97,20 @@ if __name__ == "__main__":
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
"-s", "samples",
"--samples",
type=str, type=str,
description="A pickle file with MCMC samples", help="A pickle file with MCMC samples",
required=True,
) )
parser.add_argument( parser.add_argument(
"-d", "-d",
"--data", "--data",
type=str, type=str,
decription="A data glob pickle file", help="A data glob pickle file",
require=True, required=True,
) )
parser.add_argument( parser.add_argument(
"-o", "--output", type=str, description="The output file", require=True "-o", "--output", type=str, help="The output file", required=True
) )
args = parser.parse_args() args = parser.parse_args()
next_generation_matrix([args.data, args.samples], args.output) reproduction_number([args.data, args.samples], args.output)
...@@ -10,17 +10,18 @@ from covid.summary import ( ...@@ -10,17 +10,18 @@ from covid.summary import (
) )
def overall_rt(next_generation_matrix, output_file): def overall_rt(inference_data, output_file):
r_t = xarray.open_dataset(inference_data, group="posterior_predictive")[
"R_t"
]
ngms = xarray.open_dataset(
next_generation_matrix, group="posterior_predictive"
)["ngm"]
b, _ = power_iteration(ngms)
rt = rayleigh_quotient(ngms, b)
q = np.arange(0.05, 1.0, 0.05) q = np.arange(0.05, 1.0, 0.05)
rt_quantiles = pd.DataFrame( quantiles = r_t.isel(time=-1).quantile(q=q)
{"Rt": np.quantile(rt, q, axis=-1)}, index=q quantiles.to_dataframe().T.to_excel(output_file)
).T.to_excel(output_file) # pd.DataFrame({"Rt": np.quantile(r_t, q, axis=-1)}, index=q).T.to_excel(
# output_file
# )
if __name__ == "__main__": if __name__ == "__main__":
...@@ -30,10 +31,11 @@ if __name__ == "__main__": ...@@ -30,10 +31,11 @@ if __name__ == "__main__":
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
"input_file", "input_file",
description="The input .pkl file containing the next generation matrix", type=str,
help="The input .pkl file containing the next generation matrix",
) )
parser.add_argument( parser.add_argument(
"output_file", description="The name of the output .xlsx file" "output_file", type=str, help="The name of the output .xlsx file"
) )
args = parser.parse_args() args = parser.parse_args()
......
...@@ -10,7 +10,9 @@ from covid.util import copy_nc_attrs ...@@ -10,7 +10,9 @@ from covid.util import copy_nc_attrs
from gemlib.util import compute_state from gemlib.util import compute_state
def predicted_incidence(posterior_samples, covar_data, init_step, num_steps): def predicted_incidence(
posterior_samples, init_state, covar_data, init_step, num_steps
):
"""Runs the simulation forward in time from `init_state` at time `init_time` """Runs the simulation forward in time from `init_state` at time `init_time`
for `num_steps`. for `num_steps`.
:param param: a dictionary of model parameters :param param: a dictionary of model parameters
...@@ -21,48 +23,37 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps): ...@@ -21,48 +23,37 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps):
transitions transitions
""" """
@tf.function
def sim_fn(args):
beta1_, beta2_, sigma_, xi_, gamma0_, gamma1_, init_ = args
par = dict(
beta1=beta1_,
beta2=beta2_,
sigma=sigma_,
xi=xi_,
gamma0=gamma0_,
gamma1=gamma1_,
)
model = model_spec.CovidUK(
covar_data,
initial_state=init_,
initial_step=init_step,
num_steps=num_steps,
)
sim = model.sample(**par)
return sim["seir"]
posterior_state = compute_state( posterior_state = compute_state(
posterior_samples["init_state"], init_state,
posterior_samples["seir"], posterior_samples["seir"],
model_spec.STOICHIOMETRY, model_spec.STOICHIOMETRY,
) )
init_state = posterior_state[..., init_step, :] posterior_samples["new_init_state"] = posterior_state[..., init_step, :]
del posterior_samples["seir"]
events = tf.map_fn(
sim_fn, @tf.function
elems=( def do_sim():
posterior_samples["beta1"], def sim_fn(args):
posterior_samples["beta2"], par = tf.nest.pack_sequence_as(posterior_samples, args)
posterior_samples["sigma"], init_ = par["new_init_state"]
posterior_samples["xi"], del par["new_init_state"]
posterior_samples["gamma0"],
posterior_samples["gamma1"], model = model_spec.CovidUK(
init_state, covar_data,
), initial_state=init_,
fn_output_signature=(tf.float64), initial_step=init_step,
) num_steps=num_steps,
return init_state, events )
sim = model.sample(**par)
return sim["seir"]
return tf.map_fn(
sim_fn,
elems=tf.nest.flatten(posterior_samples),
fn_output_signature=(tf.float64),
)
return posterior_samples["new_init_state"], do_sim()
def read_pkl(filename): def read_pkl(filename):
...@@ -74,7 +65,10 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): ...@@ -74,7 +65,10 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
covar_data = xarray.open_dataset(data, group="constant_data") covar_data = xarray.open_dataset(data, group="constant_data")
cases = xarray.open_dataset(data, group="observations") cases = xarray.open_dataset(data, group="observations")
samples = read_pkl(posterior_samples) samples = read_pkl(posterior_samples)
initial_state = samples["initial_state"]
del samples["initial_state"]
if initial_step < 0: if initial_step < 0:
initial_step = samples["seir"].shape[-2] + initial_step initial_step = samples["seir"].shape[-2] + initial_step
...@@ -87,11 +81,11 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): ...@@ -87,11 +81,11 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
) )
estimated_init_state, predicted_events = predicted_incidence( estimated_init_state, predicted_events = predicted_incidence(
samples, covar_data, initial_step, num_steps samples, initial_state, covar_data, initial_step, num_steps
) )
prediction = xarray.DataArray( prediction = xarray.DataArray(
predicted_events, predicted_events.numpy(),
coords=[ coords=[
np.arange(predicted_events.shape[0]), np.arange(predicted_events.shape[0]),
covar_data.coords["location"], covar_data.coords["location"],
...@@ -101,7 +95,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): ...@@ -101,7 +95,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
dims=("iteration", "location", "time", "event"), dims=("iteration", "location", "time", "event"),
) )
estimated_init_state = xarray.DataArray( estimated_init_state = xarray.DataArray(
estimated_init_state, estimated_init_state.numpy(),
coords=[ coords=[
np.arange(estimated_init_state.shape[0]), np.arange(estimated_init_state.shape[0]),
covar_data.coords["location"], covar_data.coords["location"],
...@@ -123,23 +117,21 @@ if __name__ == "__main__": ...@@ -123,23 +117,21 @@ if __name__ == "__main__":
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
"-i", "--initial-step", type=int, default=0, description="Initial step" "-i", "--initial-step", type=int, default=0, help="Initial step"
)
parser.add_argument(