diff --git a/covid/data/case_data.py b/covid/data/case_data.py index f4779dd0d1a27df645916fc2dcf1ddd2f17ab6dc..b8ddbb0178fc63b65a064f4c0e8462a37f5ed1df 100644 --- a/covid/data/case_data.py +++ b/covid/data/case_data.py @@ -86,6 +86,7 @@ class CasesData: check_date_bounds(df, date_low, date_high) check_date_format(df) check_lad19cd_format(df) + df = df.rename(columns={"date": "time"}) return True def adapt(df, config): @@ -140,7 +141,7 @@ class CasesData: dates = pd.date_range(date_low, date_high, closed="left") multi_index = pd.MultiIndex.from_product([areacodes, dates]) ser = df["cases"].reindex(multi_index, fill_value=0.0) - ser.index.names = ["location", "date"] + ser.index.names = ["location", "time"] ser.name = "cases" return ser @@ -182,13 +183,13 @@ class CasesData: df = df.groupby(["lad19cd", "lab_report_date"]).count() df = df.rename(columns={"specimen_date": "cases"}) - df.index.names = ["lad19cd", "date"] + df.index.names = ["lad19cd", "time"] df = df.sort_index() # Fill in all dates, and add 0s for empty counts dates = pd.date_range(date_low, date_high, closed="left") multi_indexes = pd.MultiIndex.from_product( - [areacodes, dates], names=["location", "date"] + [areacodes, dates], names=["location", "time"] ) results = df["cases"].reindex(multi_indexes, fill_value=0.0) return results.sort_index() diff --git a/covid/data/data.py b/covid/data/data.py index c34e929e79d3f12bbd048e3a09d9bf40672779af..2cae110e7d724892e33a2b026e387a7a1da5f2b2 100644 --- a/covid/data/data.py +++ b/covid/data/data.py @@ -3,6 +3,7 @@ well-known formats""" from warnings import warn import numpy as np +import xarray import pandas as pd __all__ = [ @@ -13,7 +14,7 @@ __all__ = [ ] -def read_mobility(path, locations): +def read_mobility(path, locations=None): """Reads in CSV with mobility matrix. CSV format: ,,,.... @@ -24,29 +25,36 @@ def read_mobility(path, locations): :returns: a numpy matrix sorted by on both rows and cols. """ mobility = pd.read_csv(path) - print("Locations: ", locations) - mobility = mobility[ - mobility["From"].isin(locations) & mobility["To"].isin(locations) - ] - mobility = mobility.sort_values(["From", "To"]) - mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index() - mob_matrix = mobility.pivot(index="To", columns="From", values="Flow") + mobility = mobility.rename(columns={"From": "src", "To": "dest"}) + if locations is not None: + mobility = mobility[ + mobility["src"].isin(locations) & mobility["dest"].isin(locations) + ] + mobility = mobility.sort_values(["src", "dest"]) + mobility = ( + mobility.groupby(["src", "dest"]).agg({"Flow": sum}).reset_index() + ) + mob_matrix = mobility.pivot(index="dest", columns="src", values="Flow") mob_matrix[mob_matrix.isna()] = 0.0 - return mob_matrix + return xarray.DataArray( + mob_matrix, dims=["location_dest", "location_src"], name="mobility" + ) -def read_population(path, locations): +def read_population(path, locations=None): """Reads population CSV :param path: CSV file :param locations: locations to use :returns: a pandas Series indexed by LTLAs """ pop = pd.read_csv(path, index_col="lad19cd") - pop = pop[pop.index.isin(locations)] + if locations is not None: + pop = pop[pop.index.isin(locations)] pop = pop.sum(axis=1) pop = pop.sort_index() - pop.name = "n" - return pop + pop.name = "population" + pop.index.name = "location" + return xarray.DataArray(pop) def read_traffic_flow( @@ -58,10 +66,13 @@ def read_traffic_flow( """ if path is None: dates = np.arange(date_low, date_high, np.timedelta64(1, "D")) - return pd.DataFrame(np.ones(dates.shape[0], np.float64), - index=dates, - columns=["percent"]) - + return xarray.DataArray( + np.ones(dates.shape[0], np.float64), + name="flow", + dims=["date"], + coords=[dates], + ) + commute_raw = pd.read_excel( path, index_col="Date", skiprows=5, usecols=["Date", "Cars"] ) @@ -76,8 +87,8 @@ def read_traffic_flow( commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0] commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0] commute["Cars"] = commute["Cars"] / 100.0 - commute.columns = ["percent"] - return commute + commute.columns = ["flow"] + return xarray.DataArray(commute) def _merge_ltla(series): diff --git a/covid/model_spec.py b/covid/model_spec.py index 3e115d4895e6bf24f9d4ed91af574e6cb21cfec3..ce821e08dcbc97398ff8a1cafb57767339ab1d65 100644 --- a/covid/model_spec.py +++ b/covid/model_spec.py @@ -2,6 +2,7 @@ import pandas as pd import numpy as np +import xarray import tensorflow as tf import tensorflow_probability as tfp @@ -43,20 +44,30 @@ def gather_data(config): ) # tier_restriction = data.TierData.process(config)[:, :, [0, 2, 3, 4]] - date_range = [date_low, date_high] - weekday = ( - pd.date_range(date_low, date_high - np.timedelta64(1, "D")).weekday < 5 + dates = pd.date_range(*config["date_range"], closed="left") + weekday = xarray.DataArray( + dates.weekday < 5, + name="weekday", + dims=["time"], + coords=[dates.to_numpy()], ) cases = data.CasesData.process(config).to_xarray() - return dict( - C=mobility.to_numpy().astype(DTYPE), - W=commute_volume.to_numpy().astype(DTYPE), - N=popsize.to_numpy().astype(DTYPE), - weekday=weekday.astype(DTYPE), - date_range=date_range, - locations=locations, - cases=cases, + return ( + xarray.Dataset( + dict( + C=mobility.astype(DTYPE), + W=commute_volume.astype(DTYPE), + N=popsize.astype(DTYPE), + weekday=weekday.astype(DTYPE), + locations=xarray.DataArray( + locations["name"], + dims=["location"], + coords=[locations["lad19cd"]], + ), + ) + ), + xarray.Dataset(dict(cases=cases)), ) diff --git a/covid/ruffus_pipeline.py b/covid/ruffus_pipeline.py index 6850c8eaddee1ca734f3c5acc440deaefd064b0b..faa4ffb434ecde581c9c350eb34efaa0923115b2 100644 --- a/covid/ruffus_pipeline.py +++ b/covid/ruffus_pipeline.py @@ -2,6 +2,10 @@ import os import yaml +from datetime import datetime +from uuid import uuid1 +import json +import netCDF4 as nc import pandas as pd import ruffus as rf @@ -28,25 +32,45 @@ def _make_append_work_dir(work_dir): return lambda filename: os.path.expandvars(os.path.join(work_dir, filename)) +def _create_metadata(config): + return dict( + pipeline_id=uuid1().hex, + created_at=str(datetime.now()), + inference_library="GEM", + inference_library_version="0.1.alpha0", + pipeline_config=json.dumps(config, default=str), + ) + + +def _create_nc_file(output_file, meta_data_dict): + nc_file = nc.Dataset(output_file, "w", format="NETCDF4") + for k, v in meta_data_dict.items(): + setattr(nc_file, k, v) + nc_file.close() + + def run_pipeline(global_config, results_directory, cli_options): wd = _make_append_work_dir(results_directory) + pipeline_meta = _create_metadata(global_config) + # Pipeline starts here @rf.mkdir(results_directory) - @rf.originate(wd("config.yaml"), global_config) - def save_config(output_file, config): - with open(output_file, "w") as f: - yaml.dump(config, f) + @rf.originate(wd("inferencedata.nc"), global_config) + def process_data(output_file, config): + _create_nc_file(output_file, pipeline_meta) + assemble_data(output_file, config["ProcessData"]) @rf.transform( - save_config, + process_data, rf.formatter(), - wd("pipeline_data.pkl"), + wd("config.yaml"), global_config, ) - def process_data(input_file, output_file, config): - assemble_data(output_file, config["ProcessData"]) + def save_config(input_file, output_file, config): + with open(output_file, "w") as f: + yaml.dump(config, f) @rf.transform( process_data, @@ -194,7 +218,15 @@ def run_pipeline(global_config, results_directory, cli_options): # DSTL Summary rf.transform( - [[process_data, insample7, insample14, medium_term, next_generation_matrix]], + [ + [ + process_data, + insample7, + insample14, + medium_term, + next_generation_matrix, + ] + ], rf.formatter(), wd("summary_longformat.xlsx"), )(summary_longformat) diff --git a/covid/tasks/assemble_data.py b/covid/tasks/assemble_data.py index dedfe73fbd4345048977827d61d204e0c2292a42..a000742ffbeabe2d8a47b265a14cc0b0aecabafb 100644 --- a/covid/tasks/assemble_data.py +++ b/covid/tasks/assemble_data.py @@ -1,17 +1,19 @@ """Function responsible for assembling all data required to instantiate the COVID19 model""" - -import pickle as pkl - +import os from covid.model_spec import gather_data -def assemble_data(output_file, config): +def assemble_data(filename, config): - all_data = gather_data(config) - with open(output_file, "wb") as f: - pkl.dump(all_data, f) + constant_data, observations = gather_data(config) + if os.path.exists(filename): + mode = "a" + else: + mode = "w" + constant_data.to_netcdf(filename, group="constant_data", mode=mode) + observations.to_netcdf(filename, group="observations", mode="a") if __name__ == "__main__": @@ -21,7 +23,7 @@ if __name__ == "__main__": parser = ArgumentParser(description="Bundle data into a pickled dictionary") parser.add_argument("config_file", help="Global config file") - parser.add_argument("output_file", help="Data bundle pkl file") + parser.add_argument("output_file", help="InferenceData file") args = parser.parse_args() with open(args.config_file, "r") as f: diff --git a/covid/tasks/case_exceedance.py b/covid/tasks/case_exceedance.py index 4ea37dc56ec1ba3a5eb4b670ac09fc82af135e10..8fde9015c4c1f56a9ee0a2286e600543b1c65016 100644 --- a/covid/tasks/case_exceedance.py +++ b/covid/tasks/case_exceedance.py @@ -14,15 +14,14 @@ def case_exceedance(input_files, lag): """ data_file, prediction_file = input_files - with open(data_file, "rb") as f: - data = pkl.load(f) + data = xarray.open_dataset(data_file, group="observations") - prediction = xarray.open_dataset(prediction_file)["events"] + prediction = xarray.open_dataset(prediction_file, group="predictions")[ + "events" + ] modelled_cases = np.sum(prediction[..., :lag, -1], axis=-1) observed_cases = np.sum(data["cases"][:, -lag:], axis=-1) - if observed_cases.dims[0] == "lad19cd": - observed_cases = observed_cases.rename({"lad19cd": "location"}) exceedance = np.mean(modelled_cases < observed_cases, axis=0) return exceedance diff --git a/covid/tasks/inference.py b/covid/tasks/inference.py index 6105f342c23da7c2ce13ba582cf3c45b0e95d7b2..d1721e1818ef75ed75ef88e92b4fd1c54e7570e8 100644 --- a/covid/tasks/inference.py +++ b/covid/tasks/inference.py @@ -1,9 +1,11 @@ """MCMC Test Rig for COVID-19 UK model""" # pylint: disable=E402 -import h5py import pickle as pkl from time import perf_counter + +import h5py +import xarray import tqdm import yaml import numpy as np @@ -33,14 +35,14 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): else: print("Using CPU") - with open(data_file, "rb") as f: - data = pkl.load(f) + data = xarray.open_dataset(data_file, group="constant_data") + cases = xarray.open_dataset(data_file, group="observations")["cases"] # We load in cases and impute missing infections first, since this sets the # time epoch which we are analysing. # Impute censored events, return cases - print("Data shape:", data["cases"].shape) - events = model_spec.impute_censored_events(data["cases"].astype(DTYPE)) + print("Data shape:", cases.shape) + events = model_spec.impute_censored_events(cases.astype(DTYPE)) # Initial conditions are calculated by calculating the state # at the beginning of the inference period @@ -50,13 +52,16 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): # to set up a sensible initial state. state = compute_state( initial_state=tf.concat( - [data["N"][:, tf.newaxis], tf.zeros_like(events[:, 0, :])], + [ + tf.constant(data["N"], DTYPE)[:, tf.newaxis], + tf.zeros_like(events[:, 0, :]), + ], axis=-1, ), events=events, stoichiometry=model_spec.STOICHIOMETRY, ) - start_time = state.shape[1] - data["cases"].shape[1] + start_time = state.shape[1] - cases.shape[1] initial_state = state[:, start_time, :] events = events[:, start_time:, :] @@ -251,10 +256,10 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): tf.random.set_seed(2) current_state = [ - np.array( + tf.constant( [0.6, 0.0, 0.0, 0.1], dtype=DTYPE ), # , 0.0, 0.0, 0.0, 0.0, 0.0], dtype=DTYPE), - np.zeros( + tf.zeros( model.model["xi"](0.0, 0.1).event_shape[-1] + 1, dtype=DTYPE, ), @@ -267,32 +272,20 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): posterior = Posterior( output_file, sample_dict={ - "beta2": (samples[0][:, 0], (NUM_BURST_SAMPLES,)), - "gamma0": (samples[0][:, 1], (NUM_BURST_SAMPLES,)), - "gamma1": (samples[0][:, 2], (NUM_BURST_SAMPLES,)), - "sigma": (samples[0][:, 3], (NUM_BURST_SAMPLES,)), - "beta3": ( - tf.zeros([1, 5], dtype=DTYPE), - (NUM_BURST_SAMPLES, 2), - ), # (samples[0][:, 4:], (NUM_BURST_SAMPLES, 2)), - "beta1": (samples[1][:, 0], (NUM_BURST_SAMPLES,)), - "xi": ( - samples[1][:, 1:], - (NUM_BURST_SAMPLES, samples[1].shape[1] - 1), - ), - "events": ( - samples[2], - (NUM_BURST_SAMPLES, min(32, events.shape[0]), 32, 1), - ), + "beta2": samples[0][:, 0], + "gamma0": samples[0][:, 1], + "gamma1": samples[0][:, 2], + "sigma": samples[0][:, 3], + "beta3": tf.zeros([1, 5], dtype=DTYPE), + "beta1": samples[1][:, 0], + "xi": samples[1][:, 1:], + "events": samples[2], }, results_dict=results, num_samples=NUM_SAVED_SAMPLES, ) posterior._file.create_dataset("initial_state", data=initial_state) - posterior._file.create_dataset( - "date_range", - data=np.array(data["date_range"]).astype(h5py.string_dtype()), - ) + # We loop over successive calls to sample because we have to dump results # to disc, or else end OOM (even on a 32GB system). # with tf.profiler.experimental.Profile("/tmp/tf_logdir"): diff --git a/covid/tasks/insample_predictive_timeseries.py b/covid/tasks/insample_predictive_timeseries.py index 88b0969bf0d23f53dad82b4827b204dc1ce09164..bca9560b02d31bcfefde8b426e308d64b7959d74 100644 --- a/covid/tasks/insample_predictive_timeseries.py +++ b/covid/tasks/insample_predictive_timeseries.py @@ -65,18 +65,13 @@ def insample_predictive_timeseries(input_files, output_dir, lag): prediction_file, data_file = input_files lag = int(lag) - prediction = xarray.open_dataset(prediction_file)["events"] + prediction = xarray.open_dataset(prediction_file, group="predictions")[ + "events" + ] prediction = prediction[..., :lag, -1] # Just removals - with open(data_file, "rb") as f: - data = pkl.load(f) - - cases = data["cases"] - lads = data["locations"] - - # TODO remove legacy code! - if "lad19cd" in cases.dims: - cases = cases.rename({"lad19cd": "location"}) + cases = xarray.open_dataset(data_file, group="observations")["cases"] + lads = xarray.open_dataset(data_file, group="constant_data")["locations"] output_dir = Path(output_dir) output_dir.mkdir(exist_ok=True) @@ -93,8 +88,8 @@ def insample_predictive_timeseries(input_files, output_dir, lag): pred_mean.loc[location, :], pred_quants.loc[:, location, :], cases.loc[location][-lag:], - cases.coords["date"][-lag:], - lads.loc[lads["lad19cd"] == location, "name"].iloc[0], + cases.coords["time"][-lag:], + lads.loc[location].data, ) plt.savefig(output_dir.joinpath(f"{location.data}.png")) plt.close() diff --git a/covid/tasks/next_generation_matrix.py b/covid/tasks/next_generation_matrix.py index a38df237bac27de3b57878bc621ecdf192cde369..71b57cb618d8d09e27b42aa9e27419f5719347a7 100644 --- a/covid/tasks/next_generation_matrix.py +++ b/covid/tasks/next_generation_matrix.py @@ -7,6 +7,7 @@ import tensorflow as tf from covid import model_spec +from covid.util import copy_nc_attrs from gemlib.util import compute_state @@ -56,8 +57,7 @@ def calc_posterior_ngm(samples, covar_data): def next_generation_matrix(input_files, output_file): - with open(input_files[0], "rb") as f: - covar_data = pkl.load(f) + covar_data = xarray.open_dataset(input_files[0], group="constant_data") with open(input_files[1], "rb") as f: samples = pkl.load(f) @@ -68,15 +68,16 @@ def next_generation_matrix(input_files, output_file): ngm, coords=[ np.arange(ngm.shape[0]), - covar_data["locations"]["lad19cd"], - covar_data["locations"]["lad19cd"], + covar_data.coords["location"], + covar_data.coords["location"], ], dims=["iteration", "dest", "src"], ) ngm = xarray.Dataset({"ngm": ngm}) # Output - ngm.to_netcdf(output_file) + ngm.to_netcdf(output_file, group="posterior_predictive") + copy_nc_attrs(input_files[0], output_file) if __name__ == "__main__": diff --git a/covid/tasks/overall_rt.py b/covid/tasks/overall_rt.py index adf74db83c199fcda81495b02ca374667717b30c..cf562d77666afe5dc7dc097af630f578284b0f57 100644 --- a/covid/tasks/overall_rt.py +++ b/covid/tasks/overall_rt.py @@ -12,7 +12,9 @@ from covid.summary import ( def overall_rt(next_generation_matrix, output_file): - ngms = xarray.open_dataset(next_generation_matrix)["ngm"] + ngms = xarray.open_dataset( + next_generation_matrix, group="posterior_predictive" + )["ngm"] b, _ = power_iteration(ngms) rt = rayleigh_quotient(ngms, b) q = np.arange(0.05, 1.0, 0.05) diff --git a/covid/tasks/predict.py b/covid/tasks/predict.py index 34ba49368cd4fa8d7237502dc0dab89be109dbf9..7b66555c24a70c27297ba0aad3df977e03164832 100644 --- a/covid/tasks/predict.py +++ b/covid/tasks/predict.py @@ -6,6 +6,7 @@ import pickle as pkl import tensorflow as tf from covid import model_spec +from covid.util import copy_nc_attrs from gemlib.util import compute_state @@ -71,16 +72,19 @@ def read_pkl(filename): def predict(data, posterior_samples, output_file, initial_step, num_steps): - covar_data = read_pkl(data) + covar_data = xarray.open_dataset(data, group="constant_data") + cases = xarray.open_dataset(data, group="observations") samples = read_pkl(posterior_samples) if initial_step < 0: initial_step = samples["seir"].shape[-2] + initial_step - dates = np.arange(covar_data["date_range"][0] + np.timedelta64(initial_step, "D"), - covar_data["date_range"][0] + np.timedelta64(initial_step + num_steps, "D"), - np.timedelta64(1, "D")) - del covar_data["date_range"] + origin_date = np.array(cases.coords["time"][0]) + dates = np.arange( + origin_date + np.timedelta64(initial_step, "D"), + origin_date + np.timedelta64(initial_step + num_steps, "D"), + np.timedelta64(1, "D"), + ) estimated_init_state, predicted_events = predicted_incidence( samples, covar_data, initial_step, num_steps @@ -90,7 +94,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): predicted_events, coords=[ np.arange(predicted_events.shape[0]), - covar_data["locations"]["lad19cd"], + covar_data.coords["location"], dates, np.arange(predicted_events.shape[3]), ], @@ -100,7 +104,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): estimated_init_state, coords=[ np.arange(estimated_init_state.shape[0]), - covar_data["locations"]["lad19cd"], + covar_data.coords["location"], np.arange(estimated_init_state.shape[-1]), ], dims=("iteration", "location", "state"), @@ -108,7 +112,9 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps): ds = xarray.Dataset( {"events": prediction, "initial_state": estimated_init_state} ) - ds.to_netcdf(output_file) + ds.to_netcdf(output_file, group="predictions") + ds.close() + copy_nc_attrs(data, output_file) if __name__ == "__main__": diff --git a/covid/tasks/summarize.py b/covid/tasks/summarize.py index 6a2240a74f219b528128f94a619916ce795687cd..d3b728dd2a4f28833b1dd7068835d447111ba2b5 100644 --- a/covid/tasks/summarize.py +++ b/covid/tasks/summarize.py @@ -20,7 +20,7 @@ def rt(input_file, output_file): :param output_file: a .csv of mean (ci) values """ - ngm = xarray.open_dataset(input_file)["ngm"] + ngm = xarray.open_dataset(input_file, group="posterior_predictive")["ngm"] rt = np.sum(ngm, axis=-2) rt_summary = mean_and_ci(rt, name="Rt") @@ -41,7 +41,7 @@ def infec_incidence(input_file, output_file): :param output_file: csv with prediction summaries """ - prediction = xarray.open_dataset(input_file)["events"] + prediction = xarray.open_dataset(input_file, group="predictions")["events"] offset = 4 timepoints = SUMMARY_DAYS + offset @@ -77,17 +77,15 @@ def prevalence(input_files, output_file): offset = 4 # Account for recording lag timepoints = SUMMARY_DAYS + offset - with open(input_files[0], "rb") as f: - data = pkl.load(f) - - prediction = xarray.open_dataset(input_files[1]) + data = xarray.open_dataset(input_files[0], group="constant_data") + prediction = xarray.open_dataset(input_files[1], group="predictions") predicted_state = compute_state( prediction["initial_state"], prediction["events"], STOICHIOMETRY ) def calc_prev(state, name=None): - prev = np.sum(state[..., 1:3], axis=-1) / np.squeeze(data["N"]) + prev = np.sum(state[..., 1:3], axis=-1) / np.array(data["N"]) return mean_and_ci(prev, name=name) idx = prediction.coords["location"] diff --git a/covid/tasks/summary_geopackage.py b/covid/tasks/summary_geopackage.py index a43ec07d65e0a3b56c46b6683044f1825ab84729..04c3feaef5bafc23ba581c80dcaa99cca5eb873c 100644 --- a/covid/tasks/summary_geopackage.py +++ b/covid/tasks/summary_geopackage.py @@ -1,6 +1,7 @@ """Summarises posterior distribution into a geopackage""" import pickle as pkl +import xarray import pandas as pd import geopandas as gp @@ -26,12 +27,11 @@ def summary_geopackage(input_files, output_file, config): """ # Read in the first input file - with open(input_files.pop(0), "rb") as f: - data = pkl.load(f) + data = xarray.open_dataset(input_files.pop(0), group="constant_data") # Load and filter geopackage geo = gp.read_file(config["base_geopackage"], layer=config["base_layer"]) - geo = geo[geo["lad19cd"].isin(data["locations"]["lad19cd"])] + geo = geo[geo["lad19cd"].isin(data.coords["location"])] geo = geo.sort_values(by="lad19cd") # Dump data into the geopackage diff --git a/covid/tasks/summary_longformat.py b/covid/tasks/summary_longformat.py index 416e5dee15da1b0afbdb8cb6042c5cc7f0962699..30fe58cc91467a888f0e7e4ae2be8da52b2b3417 100644 --- a/covid/tasks/summary_longformat.py +++ b/covid/tasks/summary_longformat.py @@ -36,7 +36,7 @@ def prevalence(prediction, popsize): ) prev_per_1e5 = ( prev[..., 1:3].sum(dim="state").reset_coords(drop=True) - / popsize[np.newaxis, :, np.newaxis] + / np.array(popsize)[np.newaxis, :, np.newaxis] * 100000 ) return xarray2summarydf(prev_per_1e5) @@ -66,7 +66,7 @@ def weekly_pred_cases_per_100k(prediction, popsize): ) # Divide by population sizes week_incidence = ( - week_incidence / popsize[np.newaxis, :, np.newaxis] * 100000 + week_incidence / np.array(popsize)[np.newaxis, :, np.newaxis] * 100000 ) return xarray2summarydf(week_incidence) @@ -84,24 +84,24 @@ def summary_longformat(input_files, output_file): location,value_name,value,q0.025,q0.975]` """ - with open(input_files[0], "rb") as f: - data = pkl.load(f) - da = data["cases"].rename({"date": "time"}) - df = da.to_dataframe(name="value").reset_index() + data = xarray.open_dataset(input_files[0], group="constant_data") + cases = xarray.open_dataset(input_files[0], group="observations")["cases"] + + df = cases.to_dataframe(name="value").reset_index() df["value_name"] = "newCasesBySpecimenDate" df["0.05"] = np.nan df["0.5"] = np.nan df["0.95"] = np.nan # Insample predictive incidence - insample = xarray.open_dataset(input_files[1]) + insample = xarray.open_dataset(input_files[1], group="predictions") insample_df = xarray2summarydf( insample["events"][..., 2].reset_coords(drop=True) ) insample_df["value_name"] = "insample7_Cases" df = pd.concat([df, insample_df], axis="index") - insample = xarray.open_dataset(input_files[2]) + insample = xarray.open_dataset(input_files[2], group="predictions") insample_df = xarray2summarydf( insample["events"][..., 2].reset_coords(drop=True) ) @@ -109,7 +109,7 @@ def summary_longformat(input_files, output_file): df = pd.concat([df, insample_df], axis="index") # Medium term absolute incidence - medium_term = xarray.open_dataset(input_files[3]) + medium_term = xarray.open_dataset(input_files[3], group="predictions") medium_df = xarray2summarydf( medium_term["events"][..., 2].reset_coords(drop=True) ) @@ -127,7 +127,7 @@ def summary_longformat(input_files, output_file): medium_df = xarray2summarydf( ( medium_term["events"][..., 2].reset_coords(drop=True) - / data["N"][np.newaxis, :, np.newaxis] + / np.array(data["N"])[np.newaxis, :, np.newaxis] ) * 100000 ) @@ -147,12 +147,14 @@ def summary_longformat(input_files, output_file): df = pd.concat([df, prev_df], axis="index") # Rt - ngms = xarray.load_dataset(input_files[4])["ngm"] + ngms = xarray.load_dataset(input_files[4], group="posterior_predictive")[ + "ngm" + ] rt = ngms.sum(dim="dest") rt = rt.rename({"src": "location"}) rt_summary = xarray2summarydf(rt) rt_summary["value_name"] = "R" - rt_summary["time"] = data["date_range"][1] + rt_summary["time"] = cases.coords["time"].data[-1] + np.timedelta64(1, "D") df = pd.concat([df, rt_summary], axis="index") quantiles = df.columns[df.columns.str.startswith("0.")] diff --git a/covid/tasks/within_between.py b/covid/tasks/within_between.py index 6b171b4dd4522912c761cb719d1119a9dc759194..04b4d66f3047015d29c49d62cdbf50eb0a099c54 100644 --- a/covid/tasks/within_between.py +++ b/covid/tasks/within_between.py @@ -3,6 +3,7 @@ import pickle as pkl import numpy as np import pandas as pd +import xarray import tensorflow as tf from gemlib.util import compute_state @@ -62,8 +63,7 @@ def within_between(input_files, output_file): :param output_file: a csv with within/between summary """ - with open(input_files[0], "rb") as f: - covar_data = pkl.load(f) + covar_data = xarray.open_dataset(input_files[0], group="constant_data") with open(input_files[1], "rb") as f: samples = pkl.load(f) @@ -85,7 +85,9 @@ def within_between(input_files, output_file): between_mean=np.mean(between, axis=0), p_within_gt_between=np.mean(within > between), ), - index=pd.Index(covar_data["locations"]["lad19cd"], name="location"), + index=pd.Index( + covar_data["locations"].coords["location"], name="location" + ), ) df.to_csv(output_file) diff --git a/covid/util.py b/covid/util.py index 9811e1ef233bd723beb37b5353bd37c99d4d9d8f..0da877c079f59417ed8d51e7ebec52be70803af2 100644 --- a/covid/util.py +++ b/covid/util.py @@ -4,6 +4,7 @@ import yaml import numpy as np import pandas as pd import h5py +import xarray import tensorflow as tf import tensorflow_probability as tfp from tensorflow_probability.python.internal import dtype_util @@ -12,6 +13,15 @@ tfd = tfp.distributions tfs = tfp.stats +def copy_nc_attrs(src, dest): + """Copies dataset attributes between two NetCDF datasets""" + with xarray.open_dataset(src) as s: + attrs = s.attrs + # Write empty root dataset with attributes + ds = xarray.Dataset(attrs=attrs) + ds.to_netcdf(dest, mode="a") + + def load_config(config_filename): with open(config_filename, "r") as f: return yaml.load(f, Loader=yaml.FullLoader) diff --git a/pyproject.toml b/pyproject.toml index 67b91f58b187844f9f64177597bae98aeab8d8c8..62082d45c95b08d8eff540c927d6ce6703361c09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ ruffus = "^2.8.4" tensorflow = "^2.4.0" jedi = "^0.17.2" XlsxWriter = "^1.3.7" +netCDF4 = "^1.5.6" +dask = {extras = ["array"], version = "^2021.2.0"} [tool.poetry.dev-dependencies] ipython = "^7.18.1"