Commit a221ca16 authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'ruffus-pipeline' into 'production'

Ruffus pipeline

See merge request !16
parents b62ba678 a0e94d91
"""Covid data adaptors and support code"""
from covid.data.data import (
read_phe_cases,
read_mobility,
read_population,
read_traffic_flow,
)
from covid.data.tiers import TierData
from covid.data.area_code import AreaCodeData
from covid.data.case_data import CasesData
__all__ = [
"TierData",
"AreaCodeData",
"read_phe_cases",
"CasesData",
"read_mobility",
"read_population",
"read_traffic_flow",
......
......@@ -66,16 +66,6 @@ class AreaCodeData:
if settings["format"] == "ons":
print("Retrieving Area Code data from the ONS")
data = response.json()
if config["GenerateOutput"]["storeInputs"]:
fn = format_output_filename(
config["GenerateOutput"]["scrapedDataDir"]
+ "AreaCodeData_ONS.json",
config,
)
with open(fn, "w") as f:
json.dump(data, f)
df = AreaCodeData.getJSON(json.dumps(data))
return df
......@@ -162,28 +152,22 @@ class AreaCodeData:
"""
Adapt the area codes to the desired dataframe format
"""
output_settings = config["GenerateOutput"]
settings = config["AreaCodeData"]
output = settings["output"]
regions = settings["regions"]
if settings["input"] == "processed":
return df
if settings["format"].lower() == "ons":
df = AreaCodeData.adapt_ons(df, regions, output, config)
df = AreaCodeData.adapt_ons(df, regions)
# if we have a predefined list of LADs, filter them down
if "lad19cds" in config:
df = df[[x in config["lad19cds"] for x in df.lad19cd.values]]
if output_settings["storeProcessedInputs"] and output != "None":
output = format_output_filename(output, config)
df.to_csv(output, index=False)
return df
def adapt_ons(df, regions, output, config):
def adapt_ons(df, regions):
colnames = ["lad19cd", "name"]
df.columns = colnames
filters = df["lad19cd"].str.contains(str.join("|", regions))
......
......@@ -14,12 +14,6 @@ def test_url():
"output": "processed_data/processed_lad19cd.csv",
"regions": ["E"],
},
"GenerateOutput": {
"storeInputs": True,
"scrapedDataDir": "scraped_data",
"storeProcessedInputs": True,
},
"Global": {"prependID": False, "prependDate": False},
}
df = AreaCodeData.process(config)
......
"""Loads COVID-19 case data"""
import numpy as np
import pandas as pd
from covid.data.util import (
invalidInput,
get_date_low_high,
check_date_bounds,
check_date_format,
check_lad19cd_format,
merge_lad_codes,
)
from covid.data import AreaCodeData
class CasesData:
def get(config):
"""
Retrieve a pandas DataFrame containing the cases/line list data.
"""
settings = config["CasesData"]
if settings["input"] == "url":
df = CasesData.getURL(settings["address"], config)
elif settings["input"] == "csv":
print(
"Reading case data from local CSV file at", settings["address"]
)
df = CasesData.getCSV(settings["address"])
elif settings["input"] == "processed":
print(
"Reading case data from preprocessed CSV at",
settings["address"],
)
df = pd.read_csv(settings["address"], index_col=0)
else:
invalidInput(settings["input"])
return df
def getURL(url, config):
"""
Placeholder, in case we wish to interface with an API.
"""
pass
def getCSV(file):
"""
Format as per linelisting
"""
columns = ["pillar", "LTLA_code", "specimen_date", "lab_report_date"]
dfs = pd.read_csv(file, chunksize=50000, iterator=True, usecols=columns)
df = pd.concat(dfs)
return df
def check(df, config):
"""
Check that data format seems correct
"""
nareas = len(config["lad19cds"])
date_low, date_high = get_date_low_high(config)
dates = pd.date_range(start=date_low, end=date_high, closed="left")
days = len(dates)
entries = days * nareas
if not (
((dims[1] >= 3) & (dims[0] == entries))
| ((dims[1] == days) & (dims[0] == nareas))
):
print(df)
raise ValueError("Incorrect CasesData dimensions")
if "date" in df:
_df = df
elif df.columns.name == "date":
_df = pd.DataFrame({"date": df.columns})
else:
raise ValueError("Cannot determine date axis")
check_date_bounds(df, date_low, date_high)
check_date_format(df)
check_lad19cd_format(df)
return True
def adapt(df, config):
"""
Adapt the line listing data to the desired dataframe format.
"""
# Extract the yaml config settings
date_low, date_high = get_date_low_high(config)
settings = config["CasesData"]
pillars = settings["pillars"]
measure = settings["measure"].casefold()
# this key might not be stored in the config file
# if it's not, we need to grab it using AreaCodeData
if "lad19cds" not in config:
_df = AreaCodeData.process(config)
areacodes = config["lad19cds"]
if settings["input"] == "processed":
return df
if settings["format"].lower() == "phe":
df = CasesData.adapt_phe(
df,
date_low,
date_high,
pillars,
measure,
areacodes,
)
return df
def adapt_phe(df, date_low, date_high, pillars, measure, areacodes):
"""
Adapt the line listing data to the desired dataframe format.
"""
# Clean missing values
df.dropna(inplace=True)
df = df.rename(columns={"LTLA_code": "lad19cd"})
# Clean time formats
df["specimen_date"] = pd.to_datetime(df["specimen_date"], dayfirst=True)
df["lab_report_date"] = pd.to_datetime(
df["lab_report_date"], dayfirst=True
)
df["lad19cd"] = merge_lad_codes(df["lad19cd"])
# filters for pillars, date ranges, and areacodes if given
filters = df["pillar"].isin(pillars)
filters &= df["lad19cd"].isin(areacodes)
if measure == "specimen":
filters &= (date_low <= df["specimen_date"]) & (
df["specimen_date"] < date_high
)
else:
filters &= (date_low <= df["lab_report_date"]) & (
df["lab_report_date"] < date_high
)
df = df[filters]
df = df.drop(columns="pillar") # No longer need pillar column
# Aggregate counts
if measure == "specimen":
df = df.groupby(["lad19cd", "specimen_date"]).count()
df = df.rename(columns={"lab_report_date": "cases"})
else:
df = df.groupby(["lad19cd", "lab_report_date"]).count()
df = df.rename(columns={"specimen_date": "cases"})
df.index.names = ["lad19cd", "date"]
df = df.sort_index()
# Fill in all dates, and add 0s for empty counts
dates = pd.date_range(date_low, date_high, closed="left")
indexes = [(lad19, date) for date in dates for lad19 in areacodes]
multi_indexes = pd.MultiIndex.from_product(
[areacodes, dates], names=["location", "date"]
)
results = df["cases"].reindex(multi_indexes, fill_value=0.0)
return results
def process(config):
df = CasesData.get(config)
df = CasesData.adapt(df, config)
return df
......@@ -55,16 +55,8 @@ def merge_lad_values(df):
def get_date_low_high(config):
if "dates" in config:
low = config["dates"]["low"]
high = config["dates"]["high"]
else:
inference_period = [
np.datetime64(x) for x in config["Global"]["inference_period"]
]
low = inference_period[0]
high = inference_period[1]
return (low, high)
date_range = [np.datetime64(x) for x in config["date_range"]]
return tuple(date_range)
def check_date_format(df):
......
"""Implements the COVID SEIR model as a TFP Joint Distribution"""
import pandas as pd
import geopandas as gp
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
......@@ -19,7 +18,7 @@ XI_FREQ = 14 # baseline transmission changes every 14 days
NU = tf.constant(0.28, dtype=DTYPE) # E->I rate assumed known.
def read_covariates(config):
def gather_data(config):
"""Loads covariate data
:param paths: a dictionary of paths to data with keys {'mobility_matrix',
......@@ -27,31 +26,37 @@ def read_covariates(config):
:returns: a dictionary of covariate information to be consumed by the model
{'C': commute_matrix, 'W': traffic_flow, 'N': population_size}
"""
paths = config["data"]
date_low = np.datetime64(config["Global"]["inference_period"][0])
date_high = np.datetime64(config["Global"]["inference_period"][1])
mobility = data.read_mobility(paths["mobility_matrix"])
popsize = data.read_population(paths["population_size"])
date_low = np.datetime64(config["date_range"][0])
date_high = np.datetime64(config["date_range"][1])
mobility = data.read_mobility(config["mobility_matrix"])
popsize = data.read_population(config["population_size"])
commute_volume = data.read_traffic_flow(
paths["commute_volume"], date_low=date_low, date_high=date_high
config["commute_volume"], date_low=date_low, date_high=date_high
)
geo = gp.read_file(paths["geopackage"])
geo = geo.loc[geo["lad19cd"].str.startswith("E")]
# tier_restriction = data.read_challen_tier_restriction(
# paths["tier_restriction_csv"],
# date_low,
# date_high,
# )
locations = data.AreaCodeData.process(config)
tier_restriction = data.TierData.process(config)[:, :, 2:]
date_range = [date_low, date_high]
weekday = pd.date_range(date_low, date_high).weekday < 5
cases = data.CasesData.process(config).to_xarray()
# cases = data.read_phe_cases(
# config['reported_cases'],
# date_low,
# date_high,
# pillar=config['pillar'],
# date_type=config['case_date_type'],
# )
return dict(
C=mobility.to_numpy().astype(DTYPE),
W=commute_volume.to_numpy().astype(DTYPE),
N=popsize.to_numpy().astype(DTYPE),
L=tier_restriction.astype(DTYPE),
weekday=weekday.astype(DTYPE),
date_range=date_range,
locations=locations,
cases=cases,
)
......@@ -143,6 +148,15 @@ def CovidUK(covariates, initial_state, initial_step, num_steps):
gamma0 = tf.convert_to_tensor(gamma0, DTYPE)
gamma1 = tf.convert_to_tensor(gamma1, DTYPE)
C = tf.convert_to_tensor(covariates["C"], dtype=DTYPE)
C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE))
Cstar = C + tf.transpose(C)
Cstar = tf.linalg.set_diag(Cstar, -tf.reduce_sum(C, axis=-2))
W = tf.convert_to_tensor(tf.squeeze(covariates["W"]), dtype=DTYPE)
N = tf.convert_to_tensor(tf.squeeze(covariates["N"]), dtype=DTYPE)
L = tf.convert_to_tensor(covariates["L"], DTYPE)
L = L - tf.reduce_mean(L, axis=(0, 1))
......@@ -150,14 +164,6 @@ def CovidUK(covariates, initial_state, initial_step, num_steps):
weekday = weekday - tf.reduce_mean(weekday, axis=-1)
def transition_rate_fn(t, state):
C = tf.convert_to_tensor(covariates["C"], dtype=DTYPE)
C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE))
Cstar = C + tf.transpose(C)
Cstar = tf.linalg.set_diag(Cstar, -tf.reduce_sum(C, axis=-2))
W = tf.constant(np.squeeze(covariates["W"]), dtype=DTYPE)
N = tf.constant(np.squeeze(covariates["N"]), dtype=DTYPE)
w_idx = tf.clip_by_value(tf.cast(t, tf.int64), 0, W.shape[0] - 1)
commute_volume = tf.gather(W, w_idx)
......@@ -166,7 +172,6 @@ def CovidUK(covariates, initial_state, initial_step, num_steps):
dtype=tf.int64,
)
xi_ = tf.gather(xi, xi_idx)
L_idx = tf.clip_by_value(tf.cast(t, tf.int64), 0, L.shape[0] - 1)
Lt = tf.gather(L, L_idx)
xB = tf.linalg.matvec(Lt, beta3)
......
"""A Ruffus-ised pipeline for COVID-19 analysis"""
from os.path import expandvars
import yaml
import datetime
import ruffus as rf
from covid.ruffus_pipeline import run_pipeline
def _import_global_config(config_file):
with open(config_file, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
if __name__ == "__main__":
# Ruffus wrapper around argparse used to give us ruffus
# cmd line switches as well as our own config
argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
argparser.add_argument(
"-c",
"--config",
type=str,
help="global configuration file",
required=True,
)
argparser.add_argument(
"-r",
"--results-directory",
type=str,
help="pipeline results directory",
required=True,
)
argparser.add_argument(
"--date-range",
type=lambda s: datetime.datetime.strptime(s, '%Y-%m-%d'),
nargs=2,
help="Date range [low high)",
metavar="ISO6801",
)
argparser.add_argument(
"--reported-cases", type=str, help="Path to case file"
)
argparser.add_argument(
"--commute-volume", type=str, help="Path to commute volume file"
)
argparser.add_argument(
"--case-date-type",
type=str,
help="Case date type (specimen | report)",
choices=["specimen", "report"],
)
argparser.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
)
cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config)
if cli_options.date_range is not None:
global_config["ProcessData"]["date_range"][0] = cli_options.date_range[
0
]
global_config["ProcessData"]["date_range"][1] = cli_options.date_range[
1
]
if cli_options.reported_cases is not None:
global_config["ProcessData"]["CasesData"]["address"] = expandvars(
cli_options.reported_cases
)
if cli_options.commute_volume is not None:
global_config["ProcessData"]["commute_volume"] = expandvars(
cli_options.commute_volume
)
if cli_options.case_date_type is not None:
global_config["ProcessData"][
"case_date_type"
] = cli_options.case_date_type
if cli_options.pillar is not None:
opts = {
"both": ["Pillar 1", "Pillar 2"],
"1": ["Pillar 1"],
"2": ["Pillar 2"],
}
global_config["ProcessData"]["CasesData"]["pillars"] = opts[
cli_options.pillar
]
run_pipeline(global_config, cli_options.results_directory, cli_options)
"""Represents the analytic pipeline as a ruffus chain"""
import os
import yaml
import ruffus as rf
from covid.tasks import (
assemble_data,
mcmc,
thin_posterior,
next_generation_matrix,
overall_rt,
predict,
summarize,
within_between,
case_exceedance,
summary_geopackage,
insample_predictive_timeseries,
)
__all__ = ["run_pipeline"]
def _make_append_work_dir(work_dir):
return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
def run_pipeline(global_config, results_directory, cli_options):
wd = _make_append_work_dir(results_directory)
# Pipeline starts here
@rf.mkdir(results_directory)
@rf.originate(wd("config.yaml"), global_config)
def save_config(output_file, config):
with open(output_file, "w") as f:
yaml.dump(config, f)
@rf.transform(
save_config,
rf.formatter(),
wd("pipeline_data.pkl"),
global_config,
)
def process_data(input_file, output_file, config):
assemble_data(output_file, config["ProcessData"])
@rf.transform(
process_data,
rf.formatter(),
wd("posterior.hd5"),
global_config,
)
def run_mcmc(input_file, output_file, config):
mcmc(input_file, output_file, config["Mcmc"])
@rf.transform(
input=run_mcmc,
filter=rf.formatter(),
output=wd("thin_samples.pkl"),
)
def thin_samples(input_file, output_file):
thin_posterior(input_file, output_file, config["ThinPosterior"])
# Rt related steps
rf.transform(
input=[[process_data, thin_samples]],
filter=rf.formatter(),
output=wd("ngm.pkl"),
)(next_generation_matrix)
rf.transform(
input=next_generation_matrix,
filter=rf.formatter(),
output=wd("national_rt.xlsx"),
)(overall_rt)
# In-sample prediction
@rf.transform(
input=[[process_data, thin_samples]],
filter=rf.formatter(),
output=wd("insample7.pkl"),
)
def insample7(input_files, output_file):
predict(
data=input_files[0],
posterior_samples=input_files[1],
output_file=output_file,
initial_step=-8,
num_steps=28,
)
@rf.transform(
input=[[process_data, thin_samples]],
filter=rf.formatter(),
output=wd("insample14.pkl"),
)
def insample14(input_files, output_file):
return predict(
data=input_files[0],
posterior_samples=input_files[1],
output_file=output_file,
initial_step=-14,
num_steps=28,
)
# Medium-term prediction
@rf.transform(
input=[[process_data, thin_samples]],
filter=rf.formatter(),
output=wd("medium_term.pkl"),
)
def medium_term(input_files, output_file):
return predict(
data=input_files[0],
posterior_samples=input_files[1],
output_file=output_file,
initial_step=-1,
num_steps=61,
)
# Summarisation
rf.transform(
input=next_generation_matrix,
filter=rf.formatter(),
output=wd("rt_summary.csv"),
)(summarize.rt)
rf.transform(
input=medium_term,
filter=rf.formatter(),
output=wd("infec_incidence_summary.csv"),
)(summarize.infec_incidence)
rf.transform(