Commit 023cbc5f authored by Chris Jewell's avatar Chris Jewell
Browse files

Use xarray/netCDF4 throughout the pipeline

CHANGES

1. Most tasks touched to use netCDF4 inferencedata file.
2. Added netCDF4 metadata copy to covid.util
3. Updated model_spec.py to use xarray
4. Updated covid.ruffus_pipeline to use netCDF4 files, adding pipeline metadata
5. Updated pyproject.toml
6. Removed chunk specification in inference.py.  Defer to automatic chunking for now.
parent eaa9500e
......@@ -86,6 +86,7 @@ class CasesData:
check_date_bounds(df, date_low, date_high)
check_date_format(df)
check_lad19cd_format(df)
df = df.rename(columns={"date": "time"})
return True
def adapt(df, config):
......@@ -140,7 +141,7 @@ class CasesData:
dates = pd.date_range(date_low, date_high, closed="left")
multi_index = pd.MultiIndex.from_product([areacodes, dates])
ser = df["cases"].reindex(multi_index, fill_value=0.0)
ser.index.names = ["location", "date"]
ser.index.names = ["location", "time"]
ser.name = "cases"
return ser
......@@ -182,13 +183,13 @@ class CasesData:
df = df.groupby(["lad19cd", "lab_report_date"]).count()
df = df.rename(columns={"specimen_date": "cases"})
df.index.names = ["lad19cd", "date"]
df.index.names = ["lad19cd", "time"]
df = df.sort_index()
# Fill in all dates, and add 0s for empty counts
dates = pd.date_range(date_low, date_high, closed="left")
multi_indexes = pd.MultiIndex.from_product(
[areacodes, dates], names=["location", "date"]
[areacodes, dates], names=["location", "time"]
)
results = df["cases"].reindex(multi_indexes, fill_value=0.0)
return results.sort_index()
......
......@@ -3,6 +3,7 @@ well-known formats"""
from warnings import warn
import numpy as np
import xarray
import pandas as pd
__all__ = [
......@@ -13,7 +14,7 @@ __all__ = [
]
def read_mobility(path, locations):
def read_mobility(path, locations=None):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
......@@ -24,29 +25,36 @@ def read_mobility(path, locations):
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
print("Locations: ", locations)
mobility = mobility[
mobility["From"].isin(locations) & mobility["To"].isin(locations)
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mobility = mobility.rename(columns={"From": "src", "To": "dest"})
if locations is not None:
mobility = mobility[
mobility["src"].isin(locations) & mobility["dest"].isin(locations)
]
mobility = mobility.sort_values(["src", "dest"])
mobility = (
mobility.groupby(["src", "dest"]).agg({"Flow": sum}).reset_index()
)
mob_matrix = mobility.pivot(index="dest", columns="src", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
return xarray.DataArray(
mob_matrix, dims=["location_dest", "location_src"], name="mobility"
)
def read_population(path, locations):
def read_population(path, locations=None):
"""Reads population CSV
:param path: CSV file
:param locations: locations to use
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.isin(locations)]
if locations is not None:
pop = pop[pop.index.isin(locations)]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
pop.name = "population"
pop.index.name = "location"
return xarray.DataArray(pop)
def read_traffic_flow(
......@@ -58,10 +66,13 @@ def read_traffic_flow(
"""
if path is None:
dates = np.arange(date_low, date_high, np.timedelta64(1, "D"))
return pd.DataFrame(np.ones(dates.shape[0], np.float64),
index=dates,
columns=["percent"])
return xarray.DataArray(
np.ones(dates.shape[0], np.float64),
name="flow",
dims=["date"],
coords=[dates],
)
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
......@@ -76,8 +87,8 @@ def read_traffic_flow(
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
commute.columns = ["flow"]
return xarray.DataArray(commute)
def _merge_ltla(series):
......
......@@ -2,6 +2,7 @@
import pandas as pd
import numpy as np
import xarray
import tensorflow as tf
import tensorflow_probability as tfp
......@@ -43,20 +44,30 @@ def gather_data(config):
)
# tier_restriction = data.TierData.process(config)[:, :, [0, 2, 3, 4]]
date_range = [date_low, date_high]
weekday = (
pd.date_range(date_low, date_high - np.timedelta64(1, "D")).weekday < 5
dates = pd.date_range(*config["date_range"], closed="left")
weekday = xarray.DataArray(
dates.weekday < 5,
name="weekday",
dims=["time"],
coords=[dates.to_numpy()],
)
cases = data.CasesData.process(config).to_xarray()
return dict(
C=mobility.to_numpy().astype(DTYPE),
W=commute_volume.to_numpy().astype(DTYPE),
N=popsize.to_numpy().astype(DTYPE),
weekday=weekday.astype(DTYPE),
date_range=date_range,
locations=locations,
cases=cases,
return (
xarray.Dataset(
dict(
C=mobility.astype(DTYPE),
W=commute_volume.astype(DTYPE),
N=popsize.astype(DTYPE),
weekday=weekday.astype(DTYPE),
locations=xarray.DataArray(
locations["name"],
dims=["location"],
coords=[locations["lad19cd"]],
),
)
),
xarray.Dataset(dict(cases=cases)),
)
......
......@@ -2,6 +2,10 @@
import os
import yaml
from datetime import datetime
from uuid import uuid1
import json
import netCDF4 as nc
import pandas as pd
import ruffus as rf
......@@ -28,25 +32,45 @@ def _make_append_work_dir(work_dir):
return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
def _create_metadata(config):
return dict(
pipeline_id=uuid1().hex,
created_at=str(datetime.now()),
inference_library="GEM",
inference_library_version="0.1.alpha0",
pipeline_config=json.dumps(config, default=str),
)
def _create_nc_file(output_file, meta_data_dict):
nc_file = nc.Dataset(output_file, "w", format="NETCDF4")
for k, v in meta_data_dict.items():
setattr(nc_file, k, v)
nc_file.close()
def run_pipeline(global_config, results_directory, cli_options):
wd = _make_append_work_dir(results_directory)
pipeline_meta = _create_metadata(global_config)
# Pipeline starts here
@rf.mkdir(results_directory)
@rf.originate(wd("config.yaml"), global_config)
def save_config(output_file, config):
with open(output_file, "w") as f:
yaml.dump(config, f)
@rf.originate(wd("inferencedata.nc"), global_config)
def process_data(output_file, config):
_create_nc_file(output_file, pipeline_meta)
assemble_data(output_file, config["ProcessData"])
@rf.transform(
save_config,
process_data,
rf.formatter(),
wd("pipeline_data.pkl"),
wd("config.yaml"),
global_config,
)
def process_data(input_file, output_file, config):
assemble_data(output_file, config["ProcessData"])
def save_config(input_file, output_file, config):
with open(output_file, "w") as f:
yaml.dump(config, f)
@rf.transform(
process_data,
......@@ -194,7 +218,15 @@ def run_pipeline(global_config, results_directory, cli_options):
# DSTL Summary
rf.transform(
[[process_data, insample7, insample14, medium_term, next_generation_matrix]],
[
[
process_data,
insample7,
insample14,
medium_term,
next_generation_matrix,
]
],
rf.formatter(),
wd("summary_longformat.xlsx"),
)(summary_longformat)
......
"""Function responsible for assembling all data required
to instantiate the COVID19 model"""
import pickle as pkl
import os
from covid.model_spec import gather_data
def assemble_data(output_file, config):
def assemble_data(filename, config):
all_data = gather_data(config)
with open(output_file, "wb") as f:
pkl.dump(all_data, f)
constant_data, observations = gather_data(config)
if os.path.exists(filename):
mode = "a"
else:
mode = "w"
constant_data.to_netcdf(filename, group="constant_data", mode=mode)
observations.to_netcdf(filename, group="observations", mode="a")
if __name__ == "__main__":
......@@ -21,7 +23,7 @@ if __name__ == "__main__":
parser = ArgumentParser(description="Bundle data into a pickled dictionary")
parser.add_argument("config_file", help="Global config file")
parser.add_argument("output_file", help="Data bundle pkl file")
parser.add_argument("output_file", help="InferenceData file")
args = parser.parse_args()
with open(args.config_file, "r") as f:
......
......@@ -14,15 +14,14 @@ def case_exceedance(input_files, lag):
"""
data_file, prediction_file = input_files
with open(data_file, "rb") as f:
data = pkl.load(f)
data = xarray.open_dataset(data_file, group="observations")
prediction = xarray.open_dataset(prediction_file)["events"]
prediction = xarray.open_dataset(prediction_file, group="predictions")[
"events"
]
modelled_cases = np.sum(prediction[..., :lag, -1], axis=-1)
observed_cases = np.sum(data["cases"][:, -lag:], axis=-1)
if observed_cases.dims[0] == "lad19cd":
observed_cases = observed_cases.rename({"lad19cd": "location"})
exceedance = np.mean(modelled_cases < observed_cases, axis=0)
return exceedance
......
"""MCMC Test Rig for COVID-19 UK model"""
# pylint: disable=E402
import h5py
import pickle as pkl
from time import perf_counter
import h5py
import xarray
import tqdm
import yaml
import numpy as np
......@@ -33,14 +35,14 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
else:
print("Using CPU")
with open(data_file, "rb") as f:
data = pkl.load(f)
data = xarray.open_dataset(data_file, group="constant_data")
cases = xarray.open_dataset(data_file, group="observations")["cases"]
# We load in cases and impute missing infections first, since this sets the
# time epoch which we are analysing.
# Impute censored events, return cases
print("Data shape:", data["cases"].shape)
events = model_spec.impute_censored_events(data["cases"].astype(DTYPE))
print("Data shape:", cases.shape)
events = model_spec.impute_censored_events(cases.astype(DTYPE))
# Initial conditions are calculated by calculating the state
# at the beginning of the inference period
......@@ -50,13 +52,16 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
# to set up a sensible initial state.
state = compute_state(
initial_state=tf.concat(
[data["N"][:, tf.newaxis], tf.zeros_like(events[:, 0, :])],
[
tf.constant(data["N"], DTYPE)[:, tf.newaxis],
tf.zeros_like(events[:, 0, :]),
],
axis=-1,
),
events=events,
stoichiometry=model_spec.STOICHIOMETRY,
)
start_time = state.shape[1] - data["cases"].shape[1]
start_time = state.shape[1] - cases.shape[1]
initial_state = state[:, start_time, :]
events = events[:, start_time:, :]
......@@ -251,10 +256,10 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
tf.random.set_seed(2)
current_state = [
np.array(
tf.constant(
[0.6, 0.0, 0.0, 0.1], dtype=DTYPE
), # , 0.0, 0.0, 0.0, 0.0, 0.0], dtype=DTYPE),
np.zeros(
tf.zeros(
model.model["xi"](0.0, 0.1).event_shape[-1] + 1,
dtype=DTYPE,
),
......@@ -267,32 +272,20 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
posterior = Posterior(
output_file,
sample_dict={
"beta2": (samples[0][:, 0], (NUM_BURST_SAMPLES,)),
"gamma0": (samples[0][:, 1], (NUM_BURST_SAMPLES,)),
"gamma1": (samples[0][:, 2], (NUM_BURST_SAMPLES,)),
"sigma": (samples[0][:, 3], (NUM_BURST_SAMPLES,)),
"beta3": (
tf.zeros([1, 5], dtype=DTYPE),
(NUM_BURST_SAMPLES, 2),
), # (samples[0][:, 4:], (NUM_BURST_SAMPLES, 2)),
"beta1": (samples[1][:, 0], (NUM_BURST_SAMPLES,)),
"xi": (
samples[1][:, 1:],
(NUM_BURST_SAMPLES, samples[1].shape[1] - 1),
),
"events": (
samples[2],
(NUM_BURST_SAMPLES, min(32, events.shape[0]), 32, 1),
),
"beta2": samples[0][:, 0],
"gamma0": samples[0][:, 1],
"gamma1": samples[0][:, 2],
"sigma": samples[0][:, 3],
"beta3": tf.zeros([1, 5], dtype=DTYPE),
"beta1": samples[1][:, 0],
"xi": samples[1][:, 1:],
"events": samples[2],
},
results_dict=results,
num_samples=NUM_SAVED_SAMPLES,
)
posterior._file.create_dataset("initial_state", data=initial_state)
posterior._file.create_dataset(
"date_range",
data=np.array(data["date_range"]).astype(h5py.string_dtype()),
)
# We loop over successive calls to sample because we have to dump results
# to disc, or else end OOM (even on a 32GB system).
# with tf.profiler.experimental.Profile("/tmp/tf_logdir"):
......
......@@ -65,18 +65,13 @@ def insample_predictive_timeseries(input_files, output_dir, lag):
prediction_file, data_file = input_files
lag = int(lag)
prediction = xarray.open_dataset(prediction_file)["events"]
prediction = xarray.open_dataset(prediction_file, group="predictions")[
"events"
]
prediction = prediction[..., :lag, -1] # Just removals
with open(data_file, "rb") as f:
data = pkl.load(f)
cases = data["cases"]
lads = data["locations"]
# TODO remove legacy code!
if "lad19cd" in cases.dims:
cases = cases.rename({"lad19cd": "location"})
cases = xarray.open_dataset(data_file, group="observations")["cases"]
lads = xarray.open_dataset(data_file, group="constant_data")["locations"]
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
......@@ -93,8 +88,8 @@ def insample_predictive_timeseries(input_files, output_dir, lag):
pred_mean.loc[location, :],
pred_quants.loc[:, location, :],
cases.loc[location][-lag:],
cases.coords["date"][-lag:],
lads.loc[lads["lad19cd"] == location, "name"].iloc[0],
cases.coords["time"][-lag:],
lads.loc[location].data,
)
plt.savefig(output_dir.joinpath(f"{location.data}.png"))
plt.close()
......
......@@ -7,6 +7,7 @@ import tensorflow as tf
from covid import model_spec
from covid.util import copy_nc_attrs
from gemlib.util import compute_state
......@@ -56,8 +57,7 @@ def calc_posterior_ngm(samples, covar_data):
def next_generation_matrix(input_files, output_file):
with open(input_files[0], "rb") as f:
covar_data = pkl.load(f)
covar_data = xarray.open_dataset(input_files[0], group="constant_data")
with open(input_files[1], "rb") as f:
samples = pkl.load(f)
......@@ -68,15 +68,16 @@ def next_generation_matrix(input_files, output_file):
ngm,
coords=[
np.arange(ngm.shape[0]),
covar_data["locations"]["lad19cd"],
covar_data["locations"]["lad19cd"],
covar_data.coords["location"],
covar_data.coords["location"],
],
dims=["iteration", "dest", "src"],
)
ngm = xarray.Dataset({"ngm": ngm})
# Output
ngm.to_netcdf(output_file)
ngm.to_netcdf(output_file, group="posterior_predictive")
copy_nc_attrs(input_files[0], output_file)
if __name__ == "__main__":
......
......@@ -12,7 +12,9 @@ from covid.summary import (
def overall_rt(next_generation_matrix, output_file):
ngms = xarray.open_dataset(next_generation_matrix)["ngm"]
ngms = xarray.open_dataset(
next_generation_matrix, group="posterior_predictive"
)["ngm"]
b, _ = power_iteration(ngms)
rt = rayleigh_quotient(ngms, b)
q = np.arange(0.05, 1.0, 0.05)
......
......@@ -6,6 +6,7 @@ import pickle as pkl
import tensorflow as tf
from covid import model_spec
from covid.util import copy_nc_attrs
from gemlib.util import compute_state
......@@ -71,16 +72,19 @@ def read_pkl(filename):
def predict(data, posterior_samples, output_file, initial_step, num_steps):
covar_data = read_pkl(data)
covar_data = xarray.open_dataset(data, group="constant_data")
cases = xarray.open_dataset(data, group="observations")
samples = read_pkl(posterior_samples)
if initial_step < 0:
initial_step = samples["seir"].shape[-2] + initial_step
dates = np.arange(covar_data["date_range"][0] + np.timedelta64(initial_step, "D"),
covar_data["date_range"][0] + np.timedelta64(initial_step + num_steps, "D"),
np.timedelta64(1, "D"))
del covar_data["date_range"]
origin_date = np.array(cases.coords["time"][0])
dates = np.arange(
origin_date + np.timedelta64(initial_step, "D"),
origin_date + np.timedelta64(initial_step + num_steps, "D"),
np.timedelta64(1, "D"),
)
estimated_init_state, predicted_events = predicted_incidence(
samples, covar_data, initial_step, num_steps
......@@ -90,7 +94,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
predicted_events,
coords=[
np.arange(predicted_events.shape[0]),
covar_data["locations"]["lad19cd"],
covar_data.coords["location"],
dates,
np.arange(predicted_events.shape[3]),
],
......@@ -100,7 +104,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
estimated_init_state,
coords=[
np.arange(estimated_init_state.shape[0]),
covar_data["locations"]["lad19cd"],
covar_data.coords["location"],
np.arange(estimated_init_state.shape[-1]),
],
dims=("iteration", "location", "state"),
......@@ -108,7 +112,9 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
ds = xarray.Dataset(
{"events": prediction, "initial_state": estimated_init_state}
)
ds.to_netcdf(output_file)
ds.to_netcdf(output_file, group="predictions")
ds.close()
copy_nc_attrs(data, output_file)
if __name__ == "__main__":
......
......@@ -20,7 +20,7 @@ def rt(input_file, output_file):
:param output_file: a .csv of mean (ci) values
"""
ngm = xarray.open_dataset(input_file)["ngm"]
ngm = xarray.open_dataset(input_file, group="posterior_predictive")["ngm"]
rt = np.sum(ngm, axis=-2)
rt_summary = mean_and_ci(rt, name="Rt")
......@@ -41,7 +41,7 @@ def infec_incidence(input_file, output_file):
:param output_file: csv with prediction summaries
"""
prediction = xarray.open_dataset(input_file)["events"]
prediction = xarray.open_dataset(input_file, group="predictions")["events"]
offset = 4
timepoints = SUMMARY_DAYS + offset
......@@ -77,17 +77,15 @@ def prevalence(input_files, output_file):
offset = 4 # Account for recording lag
timepoints = SUMMARY_DAYS + offset
with open(input_files[0], "rb") as f:
data = pkl.load(f)
prediction = xarray.open_dataset(input_files[1])
data = xarray.open_dataset(input_files[0], group="constant_data")
prediction = xarray.open_dataset(input_files[1], group="predictions")
predicted_state = compute_state(
prediction["initial_state"], prediction["events"], STOICHIOMETRY
)
def calc_prev(state, name=None):
prev = np.sum(state[..., 1:3], axis=-1) / np.squeeze(data["N"])
prev = np.sum(state[..., 1:3], axis=-1) / np.array(data["N"])
return mean_and_ci(prev, name=name)
idx = prediction.coords["location"]
......
"""Summarises posterior distribution into a geopackage"""