Commit ffd59342 authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'refactor' into 'master'

Split code into covid-pipeline and covid19uk

See merge request !42
parents 4a43ffde 769a4e8e
"""General argument parsing for all scripts"""
import argparse
def cli_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="configuration file")
parser.add_argument(
"-r",
"--results",
type=str,
default=None,
help="override config file results dir",
)
args = parser.parse_args(args)
return args
"""Tensorflow configuration options"""
import tensorflow as tf
import numpy as np
floatX = np.float64
"""Methods to read in COVID-19 data and output
well-known formats"""
from warnings import warn
import numpy as np
import pandas as pd
__all__ = [
"read_mobility",
"read_population",
"read_traffic_flow",
"read_phe_cases",
]
def read_mobility(path):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
<id>,<val>,<val>,...
...
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
mobility = mobility[
mobility["From"].str.startswith("E")
& mobility["To"].str.startswith("E")
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
def read_population(path):
"""Reads population CSV
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.str.startswith("E")]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
def read_traffic_flow(
path: str, date_low: np.datetime64, date_high: np.datetime64
):
"""Read traffic flow data, returning a timeseries between dates.
:param path: path to a traffic flow CSV with <date>,<Car> columns
:returns: a Pandas timeseries
"""
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
commute_raw.index = pd.to_datetime(commute_raw.index, format="%Y-%m-%d")
commute_raw.sort_index(axis=0, inplace=True)
commute = pd.DataFrame(
index=np.arange(date_low, date_high, np.timedelta64(1, "D"))
)
commute = commute.merge(
commute_raw, left_index=True, right_index=True, how="left"
)
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
def _merge_ltla(series):
london = ["E09000001", "E09000033"]
corn_scilly = ["E06000052", "E06000053"]
series.loc[series.isin(london)] = ",".join(london)
series.loc[series.isin(corn_scilly)] = ",".join(corn_scilly)
return series
def read_phe_cases(
path, date_low, date_high, pillar="both", date_type="specimen", ltlas=None
):
"""Reads a PHE Anonymised Line Listing for dates in [low_date, high_date)
:param path: path to PHE Anonymised Line Listing Data
:param low_date: lower date bound
:param high_date: upper date bound
:returns: a Pandas data frame of LTLAs x dates
"""
date_type_map = {"specimen": "specimen_date", "report": "lab_report_date"}
pillar_map = {"both": None, "1": "Pillar 1", "2": "Pillar 2"}
line_listing = pd.read_csv(
path, usecols=[date_type_map[date_type], "LTLA_code", "pillar"]
)[[date_type_map[date_type], "LTLA_code", "pillar"]]
line_listing.columns = ["date", "lad19cd", "pillar"]
line_listing["lad19cd"] = _merge_ltla(line_listing["lad19cd"])
# Select dates
line_listing["date"] = pd.to_datetime(
line_listing["date"], format="%d/%m/%Y"
)
line_listing = line_listing[
(date_low <= line_listing["date"]) & (line_listing["date"] < date_high)
]
# Choose pillar
if pillar_map[pillar] is not None:
line_listing = line_listing.loc[
line_listing["pillar"] == pillar_map[pillar]
]
# Drop na rows
orig_len = line_listing.shape[0]
line_listing = line_listing.dropna(axis=0)
warn(
f"Removed {orig_len - line_listing.shape[0]} rows of {orig_len} \
due to missing values ({100. * (orig_len - line_listing.shape[0])/orig_len}%)"
)
# Aggregate by date/region
case_counts = line_listing.groupby(["date", "lad19cd"]).size()
case_counts.name = "count"
# Re-index
dates = pd.date_range(date_low, date_high, closed="left")
if ltlas is None:
ltlas = case_counts.index.levels[1]
index = pd.MultiIndex.from_product(
[dates, ltlas], names=["date", "lad19cd"]
)
case_counts = case_counts.reindex(index, fill_value=0)
return case_counts.reset_index().pivot(
index="lad19cd", columns="date", values="count"
)
def read_tier_restriction_data(
tier_restriction_csv, lad19cd_lookup, date_low, date_high
):
data = pd.read_csv(tier_restriction_csv)
data.loc[:, "date"] = pd.to_datetime(data["date"])
# Group merged ltlas
london = ["City of London", "Westminster"]
corn_scilly = ["Cornwall", "Isles of Scilly"]
data.loc[data["ltla"].isin(london), "ltla"] = ":".join(london)
data.loc[data["ltla"].isin(corn_scilly), "ltla"] = ":".join(corn_scilly)
# Fix up dodgy names
data.loc[
data["ltla"] == "Blackburn With Darwen", "ltla"
] = "Blackburn with Darwen"
# Merge
data = lad19cd_lookup.merge(
data, how="left", left_on="lad19nm", right_on="ltla"
)
# Re-index
data.index = pd.MultiIndex.from_frame(data[["date", "lad19cd"]])
data = data[["tier_2", "tier_3", "national_lockdown"]]
data = data[~data.index.duplicated()]
dates = pd.date_range(date_low, date_high - pd.Timedelta(1, "D"))
lad19cd = lad19cd_lookup["lad19cd"].sort_values().unique()
new_index = pd.MultiIndex.from_product([dates, lad19cd])
data = data.reindex(new_index, fill_value=0.0)
warn(f"Tier summary: {np.mean(data, axis=0)}")
# Pack into [T, M, V] array.
arr_data = data.to_xarray().to_array()
return np.transpose(arr_data, axes=[1, 2, 0])
def read_challen_tier_restriction(tier_restriction_csv, date_low, date_high):
tiers = pd.read_csv(tier_restriction_csv)
tiers["date"] = pd.to_datetime(tiers["date"], format="%Y-%m-%d")
tiers["code"] = _merge_ltla(tiers["code"])
# Separate out December tiers
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "three"),
"tier",
] = "dec_three"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "two"),
"tier",
] = "dec_two"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "one"),
"tier",
] = "dec_one"
index = pd.MultiIndex.from_frame(tiers[["date", "code", "tier"]])
index = index.sort_values()
index = index[~index.duplicated()]
ser = pd.Series(1.0, index=index, name="value")
ser = ser[date_low : (date_high - np.timedelta64(1, "D"))]
xarr = ser.to_xarray()
xarr.data[np.isnan(xarr.data)] = 0.0
return xarr.loc[..., ["two", "three", "dec_two", "dec_three"]]
"""Provides functions to format data"""
import pandas as pd
def _expand_quantiles(q_dict):
"""Expand a dictionary of quantiles"""
q_str = [
"0.05",
"0.1",
"0.15",
"0.2",
"0.25",
"0.3",
"0.35",
"0.4",
"0.45",
"0.5",
"0.55",
"0.6",
"0.65",
"0.7",
"0.75",
"0.8",
"0.85",
"0.9",
"0.95",
]
quantiles = {f"Quantile {q}": None for q in q_str}
if q_dict is None:
return quantiles
for k, v in q_dict.items():
q_key = (
f"Quantile {float(k)}" # Coerce back to float to strip trailing 0s
)
if q_key not in quantiles.keys():
raise KeyError(f"quantile '{k}' not compatible with template form")
quantiles[q_key] = v
return [
pd.Series(v, name=k).reset_index(drop=True)
for k, v in quantiles.items()
]
def _split_dates(dates):
if dates is None:
return {"day": None, "month": None, "year": None}
if hasattr(dates, "__iter__"):
dx = pd.DatetimeIndex(dates)
else:
dx = pd.DatetimeIndex([dates])
return {"day": dx.day, "month": dx.month, "year": dx.year}
def make_dstl_template(
group=None,
model=None,
scenario=None,
model_type=None,
version=None,
creation_date=None,
value_date=None,
age_band=None,
geography=None,
value_type=None,
value=None,
quantiles=None,
):
"""Formats a DSTL-type Excel results template"""
# Process date
creation_date_parts = _split_dates(creation_date)
value_date_parts = _split_dates(value_date)
quantile_series = _expand_quantiles(quantiles)
fields = {
"Group": group,
"Model": model,
"Scenario": scenario,
"ModelType": model_type,
"Version": version,
"Creation Day": creation_date_parts["day"],
"Creation Month": creation_date_parts["month"],
"Creation Year": creation_date_parts["year"],
"Day of Value": value_date_parts["day"],
"Month of Value": value_date_parts["month"],
"Year of Value": value_date_parts["year"],
"AgeBand": age_band,
"Geography": geography,
"ValueType": value_type,
"Value": value,
}
fields = [
pd.Series(v, name=k).reset_index(drop=True) for k, v in fields.items()
]
return pd.concat(fields + quantile_series, axis="columns").ffill(
axis="index"
)
"""A Ruffus-ised pipeline for COVID-19 analysis"""
import os
from os.path import expandvars
import warnings
import yaml
import datetime
import s3fs
import ruffus as rf
from covid.ruffus_pipeline import run_pipeline
def _import_global_config(config_file):
with open(config_file, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
if __name__ == "__main__":
# Ruffus wrapper around argparse used to give us ruffus
# cmd line switches as well as our own config
argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
data_args = argparser.add_argument_group(
"Data options", "Options controlling input data"
)
data_args.add_argument(
"-c",
"--config",
type=str,
help="global configuration file",
required=True,
)
data_args.add_argument(
"-r",
"--results-directory",
type=str,
help="pipeline results directory",
required=True,
)
data_args.add_argument(
"--date-range",
type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d"),
nargs=2,
help="Date range [low high)",
metavar="ISO6801",
)
data_args.add_argument(
"--reported-cases", type=str, help="Path to case file"
)
data_args.add_argument(
"--commute-volume", type=str, help="Path to commute volume file"
)
data_args.add_argument(
"--case-date-type",
type=str,
help="Case date type (specimen | report)",
choices=["specimen", "report"],
)
data_args.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
)
data_args.add_argument("--aws", action="store_true", help="Push to AWS")
cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config)
if cli_options.date_range is not None:
global_config["ProcessData"]["date_range"][0] = cli_options.date_range[
0
]
global_config["ProcessData"]["date_range"][1] = cli_options.date_range[
1
]
if cli_options.reported_cases is not None:
global_config["ProcessData"]["CasesData"]["address"] = expandvars(
cli_options.reported_cases
)
if cli_options.commute_volume is not None:
global_config["ProcessData"]["commute_volume"] = expandvars(
cli_options.commute_volume
)
if cli_options.case_date_type is not None:
global_config["ProcessData"][
"case_date_type"
] = cli_options.case_date_type
if cli_options.pillar is not None:
opts = {
"both": ["Pillar 1", "Pillar 2"],
"1": ["Pillar 1"],
"2": ["Pillar 2"],
}
global_config["ProcessData"]["CasesData"]["pillars"] = opts[
cli_options.pillar
]
run_pipeline(global_config, cli_options.results_directory, cli_options)
"Plot functions for Covid-19 data brick"
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
tfs = tfp.stats
def plot_prediction(prediction_period, sims, case_reports):
sims = tf.reduce_sum(sims, axis=-2) # Sum over all meta-populations
quantiles = [2.5, 50, 97.5]
dates = np.arange(prediction_period[0],
prediction_period[1],
np.timedelta64(1, 'D'))
total_infected = tfs.percentile(tf.reduce_sum(sims[:, :, 1:3], axis=2), q=quantiles, axis=0)
removed = tfs.percentile(sims[:, :, 3], q=quantiles, axis=0)
removed_observed = tfs.percentile(removed * 0.1, q=quantiles, axis=0)
fig = plt.figure()
filler = plt.fill_between(dates, total_infected[0, :], total_infected[2, :], color='lightgray', alpha=0.8, label="95% credible interval")
plt.fill_between(dates, removed[0, :], removed[2, :], color='lightgray', alpha=0.8)
plt.fill_between(dates, removed_observed[0, :], removed_observed[2, :], color='lightgray', alpha=0.8)
ti_line = plt.plot(dates, total_infected[1, :], '-', color='red', alpha=0.4, label="Infected")
rem_line = plt.plot(dates, removed[1, :], '-', color='blue', label="Removed")
ro_line = plt.plot(dates, removed_observed[1, :], '-', color='orange', label='Predicted detections')
data_range = [case_reports['DateVal'].to_numpy().min(), case_reports['DateVal'].to_numpy().max()]
one_day = np.timedelta64(1, 'D')
data_dates = np.arange(data_range[0], data_range[1]+one_day, one_day)
marks = plt.plot(data_dates, case_reports['CumCases'].to_numpy(), '+', label='Observed cases')
plt.legend([ti_line[0], rem_line[0], ro_line[0], filler, marks[0]],
["Infected", "Removed", "Predicted detections", "95% credible interval", "Observed counts"])
plt.grid(color='lightgray', linestyle='dotted')
plt.xlabel("Date")
plt.ylabel("Individuals")
fig.autofmt_xdate()
plt.show()
def plot_case_incidence(date_range, sims):
# Number of new cases per day
dates = np.arange(date_range[0], date_range[1])
new_cases = sims[:, :, :, 3].sum(axis=2)
new_cases = new_cases[:, 1:] - new_cases[:, :-1]
new_cases = tfs.percentile(new_cases, q=[2.5, 50, 97.5], axis=0)/10000.
fig = plt.figure()
plt.fill_between(dates[:-1], new_cases[0, :], new_cases[2, :], color='lightgray', label="95% credible interval")
plt.plot(dates[:-1], new_cases[1, :], '-', alpha=0.2, label='New cases')
plt.grid(color='lightgray', linestyle='dotted')
plt.xlabel("Date")
plt.ylabel("Incidence per 10,000")
fig.autofmt_xdate()
plt.show()
\ No newline at end of file
"""Represents the analytic pipeline as a ruffus chain"""
import os
import warnings
import yaml
from datetime import datetime
from uuid import uuid1
import json
import s3fs
import netCDF4 as nc
import s3fs
import pandas as pd
import ruffus as rf
from covid.tasks import (
assemble_data,
mcmc,
thin_posterior,
reproduction_number,
overall_rt,
predict,
summarize,
within_between,
case_exceedance,
summary_geopackage,
insample_predictive_timeseries,
summary_longformat,
)
__all__ = ["run_pipeline"]
def _make_append_work_dir(work_dir):
return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
def _create_metadata(config):
return dict(
pipeline_id=uuid1().hex,
created_at=str(datetime.now()),
inference_library="GEM",
inference_library_version="0.1.alpha0",
pipeline_config=json.dumps(config, default=str),
)
def _create_nc_file(output_file, meta_data_dict):
nc_file = nc.Dataset(output_file, "w", format="NETCDF4")
for k, v in meta_data_dict.items():
setattr(nc_file, k, v)
nc_file.close()
def run_pipeline(global_config, results_directory, cli_options):
wd = _make_append_work_dir(results_directory)
pipeline_meta = _create_metadata(global_config)
# Pipeline starts here
@rf.mkdir(results_directory)
@rf.originate(wd("config.yaml"), global_config)
def save_config(output_file, config):
with open(output_file, "w") as f:
yaml.dump(config, f)
@rf.follows(save_config)
@rf.originate(wd("inferencedata.nc"), global_config)
def process_data(output_file, config):
_create_nc_file(output_file, pipeline_meta)
assemble_data(output_file, config["ProcessData"])
@rf.transform(
process_data,
rf.formatter(),
wd("posterior.hd5"),
global_config,
)
def run_mcmc(input_file, output_file, config):
mcmc(input_file, output_file, config["Mcmc"])
@rf.transform(
input=run_mcmc,
filter=rf.formatter(),
output=wd("thin_samples.pkl"),
extras=[global_config],
)
def thin_samples(input_file, output_file, config):
thin_posterior(input_file, output_file, config["ThinPosterior"])
# Rt related steps
rf.transform(
input=[[process_data, thin_samples]],
filter=rf.formatter(),
output=wd("reproduction_number.nc"),
)(reproduction_number)
rf.transform(
input=reproduction_number,
filter