Commit 5630746b authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'master' into mod-hmc-gibbs

parents 9286ee61 6274bc57
......@@ -86,6 +86,7 @@ class CasesData:
check_date_bounds(df, date_low, date_high)
check_date_format(df)
check_lad19cd_format(df)
df = df.rename(columns={"date": "time"})
return True
def adapt(df, config):
......@@ -140,7 +141,7 @@ class CasesData:
dates = pd.date_range(date_low, date_high, closed="left")
multi_index = pd.MultiIndex.from_product([areacodes, dates])
ser = df["cases"].reindex(multi_index, fill_value=0.0)
ser.index.names = ["location", "date"]
ser.index.names = ["location", "time"]
ser.name = "cases"
return ser
......@@ -182,13 +183,13 @@ class CasesData:
df = df.groupby(["lad19cd", "lab_report_date"]).count()
df = df.rename(columns={"specimen_date": "cases"})
df.index.names = ["lad19cd", "date"]
df.index.names = ["lad19cd", "time"]
df = df.sort_index()
# Fill in all dates, and add 0s for empty counts
dates = pd.date_range(date_low, date_high, closed="left")
multi_indexes = pd.MultiIndex.from_product(
[areacodes, dates], names=["location", "date"]
[areacodes, dates], names=["location", "time"]
)
results = df["cases"].reindex(multi_indexes, fill_value=0.0)
return results.sort_index()
......
......@@ -3,6 +3,7 @@ well-known formats"""
from warnings import warn
import numpy as np
import xarray
import pandas as pd
__all__ = [
......@@ -13,7 +14,7 @@ __all__ = [
]
def read_mobility(path, locations):
def read_mobility(path, locations=None):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
......@@ -24,29 +25,36 @@ def read_mobility(path, locations):
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
print("Locations: ", locations)
mobility = mobility[
mobility["From"].isin(locations) & mobility["To"].isin(locations)
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mobility = mobility.rename(columns={"From": "src", "To": "dest"})
if locations is not None:
mobility = mobility[
mobility["src"].isin(locations) & mobility["dest"].isin(locations)
]
mobility = mobility.sort_values(["src", "dest"])
mobility = (
mobility.groupby(["src", "dest"]).agg({"Flow": sum}).reset_index()
)
mob_matrix = mobility.pivot(index="dest", columns="src", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
return xarray.DataArray(
mob_matrix, dims=["location_dest", "location_src"], name="mobility"
)
def read_population(path, locations):
def read_population(path, locations=None):
"""Reads population CSV
:param path: CSV file
:param locations: locations to use
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.isin(locations)]
if locations is not None:
pop = pop[pop.index.isin(locations)]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
pop.name = "population"
pop.index.name = "location"
return xarray.DataArray(pop)
def read_traffic_flow(
......@@ -58,10 +66,13 @@ def read_traffic_flow(
"""
if path is None:
dates = np.arange(date_low, date_high, np.timedelta64(1, "D"))
return pd.DataFrame(np.ones(dates.shape[0], np.float64),
index=dates,
columns=["percent"])
return xarray.DataArray(
np.ones(dates.shape[0], np.float64),
name="flow",
dims=["date"],
coords=[dates],
)
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
......@@ -76,8 +87,8 @@ def read_traffic_flow(
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
commute.columns = ["flow"]
return xarray.DataArray(commute)
def _merge_ltla(series):
......
......@@ -2,6 +2,7 @@
import pandas as pd
import numpy as np
import xarray
import tensorflow as tf
import tensorflow_probability as tfp
......@@ -43,20 +44,30 @@ def gather_data(config):
)
# tier_restriction = data.TierData.process(config)[:, :, [0, 2, 3, 4]]
date_range = [date_low, date_high]
weekday = (
pd.date_range(date_low, date_high - np.timedelta64(1, "D")).weekday < 5
dates = pd.date_range(*config["date_range"], closed="left")
weekday = xarray.DataArray(
dates.weekday < 5,
name="weekday",
dims=["time"],
coords=[dates.to_numpy()],
)
cases = data.CasesData.process(config).to_xarray()
return dict(
C=mobility.to_numpy().astype(DTYPE),
W=commute_volume.to_numpy().astype(DTYPE),
N=popsize.to_numpy().astype(DTYPE),
weekday=weekday.astype(DTYPE),
date_range=date_range,
locations=locations,
cases=cases,
return (
xarray.Dataset(
dict(
C=mobility.astype(DTYPE),
W=commute_volume.astype(DTYPE),
N=popsize.astype(DTYPE),
weekday=weekday.astype(DTYPE),
locations=xarray.DataArray(
locations["name"],
dims=["location"],
coords=[locations["lad19cd"]],
),
)
),
xarray.Dataset(dict(cases=cases)),
)
......
......@@ -2,6 +2,10 @@
import os
import yaml
from datetime import datetime
from uuid import uuid1
import json
import netCDF4 as nc
import pandas as pd
import ruffus as rf
......@@ -28,10 +32,29 @@ def _make_append_work_dir(work_dir):
return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
def _create_metadata(config):
return dict(
pipeline_id=uuid1().hex,
created_at=str(datetime.now()),
inference_library="GEM",
inference_library_version="0.1.alpha0",
pipeline_config=json.dumps(config, default=str),
)
def _create_nc_file(output_file, meta_data_dict):
nc_file = nc.Dataset(output_file, "w", format="NETCDF4")
for k, v in meta_data_dict.items():
setattr(nc_file, k, v)
nc_file.close()
def run_pipeline(global_config, results_directory, cli_options):
wd = _make_append_work_dir(results_directory)
pipeline_meta = _create_metadata(global_config)
# Pipeline starts here
@rf.mkdir(results_directory)
@rf.originate(wd("config.yaml"), global_config)
......@@ -39,13 +62,11 @@ def run_pipeline(global_config, results_directory, cli_options):
with open(output_file, "w") as f:
yaml.dump(config, f)
@rf.transform(
save_config,
rf.formatter(),
wd("pipeline_data.pkl"),
global_config,
)
def process_data(input_file, output_file, config):
@rf.follows(save_config)
@rf.originate(wd("inferencedata.nc"), global_config)
def process_data(output_file, config):
_create_nc_file(output_file, pipeline_meta)
assemble_data(output_file, config["ProcessData"])
@rf.transform(
......@@ -194,7 +215,15 @@ def run_pipeline(global_config, results_directory, cli_options):
# DSTL Summary
rf.transform(
[[process_data, insample14, medium_term, next_generation_matrix]],
[
[
process_data,
insample7,
insample14,
medium_term,
next_generation_matrix,
]
],
rf.formatter(),
wd("summary_longformat.xlsx"),
)(summary_longformat)
......
"""Function responsible for assembling all data required
to instantiate the COVID19 model"""
import pickle as pkl
import os
from covid.model_spec import gather_data
def assemble_data(output_file, config):
def assemble_data(filename, config):
all_data = gather_data(config)
with open(output_file, "wb") as f:
pkl.dump(all_data, f)
constant_data, observations = gather_data(config)
if os.path.exists(filename):
mode = "a"
else:
mode = "w"
constant_data.to_netcdf(filename, group="constant_data", mode=mode)
observations.to_netcdf(filename, group="observations", mode="a")
if __name__ == "__main__":
......@@ -21,7 +23,7 @@ if __name__ == "__main__":
parser = ArgumentParser(description="Bundle data into a pickled dictionary")
parser.add_argument("config_file", help="Global config file")
parser.add_argument("output_file", help="Data bundle pkl file")
parser.add_argument("output_file", help="InferenceData file")
args = parser.parse_args()
with open(args.config_file, "r") as f:
......
......@@ -14,15 +14,14 @@ def case_exceedance(input_files, lag):
"""
data_file, prediction_file = input_files
with open(data_file, "rb") as f:
data = pkl.load(f)
data = xarray.open_dataset(data_file, group="observations")
prediction = xarray.open_dataset(prediction_file)["events"]
prediction = xarray.open_dataset(prediction_file, group="predictions")[
"events"
]
modelled_cases = np.sum(prediction[..., :lag, -1], axis=-1)
observed_cases = np.sum(data["cases"][:, -lag:], axis=-1)
if observed_cases.dims[0] == "lad19cd":
observed_cases = observed_cases.rename({"lad19cd": "location"})
exceedance = np.mean(modelled_cases < observed_cases, axis=0)
return exceedance
......
"""MCMC Test Rig for COVID-19 UK model"""
# pylint: disable=E402
import h5py
import pickle as pkl
from time import perf_counter
import h5py
import xarray
import tqdm
import yaml
import numpy as np
......@@ -270,7 +272,12 @@ def draws_to_dict(draws):
def run_mcmc(
joint_log_prob_fn, current_state, initial_conditions, config, output_file
joint_log_prob_fn,
current_state,
param_bijector,
initial_conditions,
config,
output_file,
):
first_window_size, slow_window_size, last_window_size = _get_window_sizes(
......@@ -325,13 +332,13 @@ def run_mcmc(
event_kernel_kwargs=event_kernel_kwargs,
trace_fn=trace_results_fn,
)
current_state = [s[-1] for s in draws]
draws[0] = param_bijector.inverse(draws[0])
posterior.write_samples(
draws_to_dict(draws),
first_dim_offset=offset,
draws_to_dict(draws), first_dim_offset=offset,
)
posterior.write_results(trace, first_dim_offset=offset)
offset += first_window_size
current_state = [s[-1] for s in draws]
# Slow adaptation sampling
hmc_kernel_kwargs["step_size"] = step_size
......@@ -357,9 +364,9 @@ def run_mcmc(
hmc_kernel_kwargs["step_size"] = step_size
hmc_kernel_kwargs["momentum_distribution"] = momentum_distribution
current_state = [s[-1] for s in draws]
draws[0] = param_bijector.inverse(draws[0])
posterior.write_samples(
draws_to_dict(draws),
first_dim_offset=offset,
draws_to_dict(draws), first_dim_offset=offset,
)
posterior.write_results(trace, first_dim_offset=offset)
offset += window_num_draws
......@@ -367,7 +374,7 @@ def run_mcmc(
# Fast adaptation sampling
print(f"Fast window {last_window_size}")
dual_averaging_kwargs["num_adaptation_steps"] = last_window_size
draws, trace, step_size, weighted_running_variance = _fast_adapt_window(
draws, trace, step_size, _ = _fast_adapt_window(
num_draws=last_window_size,
joint_log_prob_fn=joint_log_prob_fn,
initial_position=current_state,
......@@ -377,9 +384,9 @@ def run_mcmc(
trace_fn=trace_results_fn,
)
current_state = [s[-1] for s in draws]
draws[0] = param_bijector.inverse(draws[0])
posterior.write_samples(
draws_to_dict(draws),
first_dim_offset=offset,
draws_to_dict(draws), first_dim_offset=offset,
)
posterior.write_results(trace, first_dim_offset=offset)
offset += last_window_size
......@@ -400,13 +407,12 @@ def run_mcmc(
trace_fn=trace_results_fn,
)
current_state = [state_part[-1] for state_part in draws]
draws[0] = param_bijector.inverse(draws[0])
posterior.write_samples(
draws_to_dict(draws),
first_dim_offset=offset,
draws_to_dict(draws), first_dim_offset=offset,
)
posterior.write_results(
trace,
first_dim_offset=offset,
trace, first_dim_offset=offset,
)
offset += config["num_burst_samples"]
......@@ -421,13 +427,13 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
else:
print("Using CPU")
with open(data_file, "rb") as f:
data = pkl.load(f)
data = xarray.open_dataset(data_file, group="constant_data")
cases = xarray.open_dataset(data_file, group="observations")["cases"]
# We load in cases and impute missing infections first, since this sets the
# time epoch which we are analysing.
# Impute censored events, return cases
events = model_spec.impute_censored_events(data["cases"].astype(DTYPE))
events = model_spec.impute_censored_events(cases.astype(DTYPE))
# Initial conditions are calculated by calculating the state
# at the beginning of the inference period
......@@ -437,13 +443,16 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
# to set up a sensible initial state.
state = compute_state(
initial_state=tf.concat(
[data["N"][:, tf.newaxis], tf.zeros_like(events[:, 0, :])],
[
tf.constant(data["N"], DTYPE)[:, tf.newaxis],
tf.zeros_like(events[:, 0, :]),
],
axis=-1,
),
events=events,
stoichiometry=model_spec.STOICHIOMETRY,
)
start_time = state.shape[1] - data["cases"].shape[1]
start_time = state.shape[1] - cases.shape[1]
initial_state = state[:, start_time, :]
events = events[:, start_time:, :]
......@@ -457,32 +466,32 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
num_steps=events.shape[1],
)
def joint_log_prob(unconstrained_params, events):
bij = tfb.Invert( # Forward transform unconstrains params
tfb.Blockwise(
[
tfb.Softplus(low=dtype_util.eps(DTYPE)),
tfb.Identity(),
tfb.Softplus(low=dtype_util.eps(DTYPE)),
tfb.Identity(),
],
block_sizes=[1, 2, 1, unconstrained_params.shape[-1] - 4],
)
param_bij = tfb.Invert( # Forward transform unconstrains params
tfb.Blockwise(
[
tfb.Softplus(low=dtype_util.eps(DTYPE)),
tfb.Identity(),
tfb.Softplus(low=dtype_util.eps(DTYPE)),
tfb.Identity(),
],
block_sizes=[1, 2, 1, model.event_shape["xi"][0] + 1],
)
params = bij.inverse(unconstrained_params)
return (
model.log_prob(
dict(
beta2=params[0],
gamma0=params[1],
gamma1=params[2],
sigma=params[3],
beta1=params[4],
xi=params[5:],
seir=events,
)
)
def joint_log_prob(unconstrained_params, events):
params = param_bij.inverse(unconstrained_params)
return model.log_prob(
dict(
beta2=params[0],
gamma0=params[1],
gamma1=params[2],
sigma=params[3],
beta1=params[4],
xi=params[5:],
seir=events,
)
+ bij.inverse_log_det_jacobian(unconstrained_params, event_ndims=1)
) + param_bij.inverse_log_det_jacobian(
unconstrained_params, event_ndims=1
)
# MCMC tracing functions
......@@ -508,14 +517,15 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
posterior = run_mcmc(
joint_log_prob_fn=joint_log_prob,
current_state=current_chain_state,
param_bijector=param_bij,
initial_conditions=initial_state,
config=config,
output_file=output_file,
)
posterior._file.create_dataset("initial_state", data=initial_state)
posterior._file.create_dataset(
"date_range",
data=np.array(data["date_range"])
"time",
data=np.array(cases.coords["time"])
.astype(str)
.astype(h5py.string_dtype()),
)
......@@ -549,9 +559,7 @@ if __name__ == "__main__":
"-o", "--output", type=str, help="Output file", required=True
)
parser.add_argument(
"data_file",
type=str,
help="Data pickle file",
"data_file", type=str, help="Data pickle file",
)
args = parser.parse_args()
......
......@@ -65,18 +65,13 @@ def insample_predictive_timeseries(input_files, output_dir, lag):
prediction_file, data_file = input_files
lag = int(lag)
prediction = xarray.open_dataset(prediction_file)["events"]
prediction = xarray.open_dataset(prediction_file, group="predictions")[
"events"
]
prediction = prediction[..., :lag, -1] # Just removals
with open(data_file, "rb") as f:
data = pkl.load(f)
cases = data["cases"]
lads = data["locations"]
# TODO remove legacy code!
if "lad19cd" in cases.dims:
cases = cases.rename({"lad19cd": "location"})
cases = xarray.open_dataset(data_file, group="observations")["cases"]
lads = xarray.open_dataset(data_file, group="constant_data")["locations"]
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
......@@ -93,8 +88,8 @@ def insample_predictive_timeseries(input_files, output_dir, lag):
pred_mean.loc[location, :],
pred_quants.loc[:, location, :],
cases.loc[location][-lag:],
cases.coords["date"][-lag:],
lads.loc[lads["lad19cd"] == location, "name"].iloc[0],
cases.coords["time"][-lag:],
lads.loc[location].data,
)
plt.savefig(output_dir.joinpath(f"{location.data}.png"))
plt.close()
......
......@@ -7,6 +7,7 @@ import tensorflow as tf
from covid import model_spec
from covid.util import copy_nc_attrs
from gemlib.util import compute_state
......@@ -56,8 +57,7 @@ def calc_posterior_ngm(samples, covar_data):
def next_generation_matrix(input_files, output_file):
with open(input_files[0], "rb") as f:
covar_data = pkl.load(f)
covar_data = xarray.open_dataset(input_files[0], group="constant_data")
with open(input_files[1], "rb") as f:
samples = pkl.load(f)
......@@ -68,15 +68,16 @@ def next_generation_matrix(input_files, output_file):
ngm,
coords=[
np.arange(ngm.shape[0]),
covar_data["locations"]["lad19cd"],
covar_data["locations"]["lad19cd"],
covar_data.coords["location"],
covar_data.coords["location"],
],
dims=["iteration", "dest", "src"],
)
ngm = xarray.Dataset({"ngm": ngm})
# Output
ngm.to_netcdf(output_file)
ngm.to_netcdf(output_file, group="posterior_predictive")
copy_nc_attrs(input_files[0], output_file)
if __name__ == "__main__":
......
......@@ -12,7 +12,9 @@ from covid.summary import (
def overall_rt(next_generation_matrix, output_file):
ngms = xarray.open_dataset(next_generation_matrix)["ngm"]
ngms = xarray.open_dataset(
next_generation_matrix, group="posterior_predictive"
)["ngm"]
b, _ = power_iteration(ngms)
rt = rayleigh_quotient(ngms, b)
q = np.arange(0.05, 1.0, 0.05)
......
......@@ -6,6 +6,7 @@ import pickle as pkl
import tensorflow as tf
from covid import model_spec
from covid.util import copy_nc_attrs