Commit 42f4435f authored by Chris Jewell's avatar Chris Jewell
Browse files

Moved pipeline into covid package directory.

CHANGES:

1. Committed missing `case_data.py` file
2. Moved pipeline specification into `covid.ruffus_pipeline`
3. Moved pipeline initialisation into `covid.pipeline`
parent 80bbe105
"""Loads COVID-19 case data"""
import numpy as np
import pandas as pd
from covid.data.util import (
invalidInput,
get_date_low_high,
check_date_bounds,
check_date_format,
check_lad19cd_format,
merge_lad_codes,
)
from covid.data import AreaCodeData
class CasesData:
def get(config):
"""
Retrieve a pandas DataFrame containing the cases/line list data.
"""
settings = config["CasesData"]
if settings["input"] == "url":
df = CasesData.getURL(settings["address"], config)
elif settings["input"] == "csv":
print(
"Reading case data from local CSV file at", settings["address"]
)
df = CasesData.getCSV(settings["address"])
elif settings["input"] == "processed":
print(
"Reading case data from preprocessed CSV at",
settings["address"],
)
df = pd.read_csv(settings["address"], index_col=0)
else:
invalidInput(settings["input"])
return df
def getURL(url, config):
"""
Placeholder, in case we wish to interface with an API.
"""
pass
def getCSV(file):
"""
Format as per linelisting
"""
columns = ["pillar", "LTLA_code", "specimen_date", "lab_report_date"]
dfs = pd.read_csv(file, chunksize=50000, iterator=True, usecols=columns)
df = pd.concat(dfs)
return df
def check(df, config):
"""
Check that data format seems correct
"""
nareas = len(config["lad19cds"])
date_low, date_high = get_date_low_high(config)
dates = pd.date_range(start=date_low, end=date_high, closed="left")
days = len(dates)
entries = days * nareas
if not (
((dims[1] >= 3) & (dims[0] == entries))
| ((dims[1] == days) & (dims[0] == nareas))
):
print(df)
raise ValueError("Incorrect CasesData dimensions")
if "date" in df:
_df = df
elif df.columns.name == "date":
_df = pd.DataFrame({"date": df.columns})
else:
raise ValueError("Cannot determine date axis")
check_date_bounds(df, date_low, date_high)
check_date_format(df)
check_lad19cd_format(df)
return True
def adapt(df, config):
"""
Adapt the line listing data to the desired dataframe format.
"""
# Extract the yaml config settings
date_low, date_high = get_date_low_high(config)
settings = config["CasesData"]
pillars = settings["pillars"]
measure = settings["measure"].casefold()
# this key might not be stored in the config file
# if it's not, we need to grab it using AreaCodeData
if "lad19cds" not in config:
_df = AreaCodeData.process(config)
areacodes = config["lad19cds"]
if settings["input"] == "processed":
return df
if settings["format"].lower() == "phe":
df = CasesData.adapt_phe(
df,
date_low,
date_high,
pillars,
measure,
areacodes,
)
return df
def adapt_phe(df, date_low, date_high, pillars, measure, areacodes):
"""
Adapt the line listing data to the desired dataframe format.
"""
# Clean missing values
df.dropna(inplace=True)
df = df.rename(columns={"LTLA_code": "lad19cd"})
# Clean time formats
df["specimen_date"] = pd.to_datetime(df["specimen_date"], dayfirst=True)
df["lab_report_date"] = pd.to_datetime(
df["lab_report_date"], dayfirst=True
)
df["lad19cd"] = merge_lad_codes(df["lad19cd"])
# filters for pillars, date ranges, and areacodes if given
filters = df["pillar"].isin(pillars)
filters &= df["lad19cd"].isin(areacodes)
if measure == "specimen":
filters &= (date_low <= df["specimen_date"]) & (
df["specimen_date"] < date_high
)
else:
filters &= (date_low <= df["lab_report_date"]) & (
df["lab_report_date"] < date_high
)
df = df[filters]
df = df.drop(columns="pillar") # No longer need pillar column
# Aggregate counts
if measure == "specimen":
df = df.groupby(["lad19cd", "specimen_date"]).count()
df = df.rename(columns={"lab_report_date": "cases"})
else:
df = df.groupby(["lad19cd", "lab_report_date"]).count()
df = df.rename(columns={"specimen_date": "cases"})
df.index.names = ["lad19cd", "date"]
df = df.sort_index()
# Fill in all dates, and add 0s for empty counts
dates = pd.date_range(date_low, date_high, closed="left")
indexes = [(lad19, date) for date in dates for lad19 in areacodes]
multi_indexes = pd.MultiIndex.from_product(
[areacodes, dates], names=["location", "date"]
)
results = df["cases"].reindex(multi_indexes, fill_value=0.0)
return results
def process(config):
df = CasesData.get(config)
df = CasesData.adapt(df, config)
return df
"""A Ruffus-ised pipeline for COVID-19 analysis"""
from os.path import expandvars
import yaml
import datetime
import ruffus as rf
from covid.ruffus_pipeline import run_pipeline
def _import_global_config(config_file):
with open(config_file, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
if __name__ == "__main__":
# Ruffus wrapper around argparse used to give us ruffus
# cmd line switches as well as our own config
argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
argparser.add_argument(
"-c",
"--config",
type=str,
help="global configuration file",
required=True,
)
argparser.add_argument(
"-r",
"--results-directory",
type=str,
help="pipeline results directory",
required=True,
)
argparser.add_argument(
"--date-range",
type=lambda s: datetime.datetime.strptime(s, '%Y-%m-%d'),
nargs=2,
help="Date range [low high)",
metavar="ISO6801",
)
argparser.add_argument(
"--reported-cases", type=str, help="Path to case file"
)
argparser.add_argument(
"--commute-volume", type=str, help="Path to commute volume file"
)
argparser.add_argument(
"--case-date-type",
type=str,
help="Case date type (specimen | report)",
choices=["specimen", "report"],
)
argparser.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
)
cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config)
if cli_options.date_range is not None:
global_config["ProcessData"]["date_range"][0] = cli_options.date_range[
0
]
global_config["ProcessData"]["date_range"][1] = cli_options.date_range[
1
]
if cli_options.reported_cases is not None:
global_config["ProcessData"]["CasesData"]["address"] = expandvars(
cli_options.reported_cases
)
if cli_options.commute_volume is not None:
global_config["ProcessData"]["commute_volume"] = expandvars(
cli_options.commute_volume
)
if cli_options.case_date_type is not None:
global_config["ProcessData"][
"case_date_type"
] = cli_options.case_date_type
if cli_options.pillar is not None:
opts = {
"both": ["Pillar 1", "Pillar 2"],
"1": ["Pillar 1"],
"2": ["Pillar 2"],
}
global_config["ProcessData"]["CasesData"]["pillars"] = opts[
cli_options.pillar
]
run_pipeline(global_config, cli_options.results_directory, cli_options)
"""A Ruffus-ised pipeline for COVID-19 analysis""" """Represents the analytic pipeline as a ruffus chain"""
import os import os
import yaml import yaml
import pandas as pd
import ruffus as rf import ruffus as rf
from covid.tasks import ( from covid.tasks import (
assemble_data, assemble_data,
mcmc, mcmc,
...@@ -19,40 +19,20 @@ from covid.tasks import ( ...@@ -19,40 +19,20 @@ from covid.tasks import (
insample_predictive_timeseries, insample_predictive_timeseries,
) )
__all__ = ["run_pipeline"]
def import_global_config(config_file):
with open(config_file, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
return config
def join_and_expand(path1, path2): def _make_append_work_dir(work_dir):
return os.path.expand(os.path.join(path1, path2)) return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
if __name__ == "__main__": def run_pipeline(global_config, results_directory, cli_options):
argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
argparser.add_argument(
"-c", "--config", type=str, help="global configuration file"
)
argparser.add_argument(
"-r", "--results-directory", type=str, help="pipeline results directory"
)
cli_options = argparser.parse_args()
global_config = import_global_config(cli_options.config)
# Output paths
BASEDIR = os.path.expandvars(cli_options.results_directory)
def work_dir(fn): wd = _make_append_work_dir(results_directory)
return os.path.join(BASEDIR, fn)
# Pipeline starts here # Pipeline starts here
@rf.mkdir(BASEDIR) @rf.mkdir(results_directory)
@rf.originate(work_dir("config.yaml"), global_config) @rf.originate(wd("config.yaml"), global_config)
def save_config(output_file, config): def save_config(output_file, config):
with open(output_file, "w") as f: with open(output_file, "w") as f:
yaml.dump(config, f) yaml.dump(config, f)
...@@ -60,7 +40,7 @@ if __name__ == "__main__": ...@@ -60,7 +40,7 @@ if __name__ == "__main__":
@rf.transform( @rf.transform(
save_config, save_config,
rf.formatter(), rf.formatter(),
work_dir("pipeline_data.pkl"), wd("pipeline_data.pkl"),
global_config, global_config,
) )
def process_data(input_file, output_file, config): def process_data(input_file, output_file, config):
...@@ -69,7 +49,7 @@ if __name__ == "__main__": ...@@ -69,7 +49,7 @@ if __name__ == "__main__":
@rf.transform( @rf.transform(
process_data, process_data,
rf.formatter(), rf.formatter(),
work_dir("posterior.hd5"), wd("posterior.hd5"),
global_config, global_config,
) )
def run_mcmc(input_file, output_file, config): def run_mcmc(input_file, output_file, config):
...@@ -78,7 +58,7 @@ if __name__ == "__main__": ...@@ -78,7 +58,7 @@ if __name__ == "__main__":
@rf.transform( @rf.transform(
input=run_mcmc, input=run_mcmc,
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("thin_samples.pkl"), output=wd("thin_samples.pkl"),
) )
def thin_samples(input_file, output_file): def thin_samples(input_file, output_file):
thin_posterior(input_file, output_file, config["ThinPosterior"]) thin_posterior(input_file, output_file, config["ThinPosterior"])
...@@ -87,20 +67,20 @@ if __name__ == "__main__": ...@@ -87,20 +67,20 @@ if __name__ == "__main__":
rf.transform( rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("ngm.pkl"), output=wd("ngm.pkl"),
)(next_generation_matrix) )(next_generation_matrix)
rf.transform( rf.transform(
input=next_generation_matrix, input=next_generation_matrix,
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("national_rt.xlsx"), output=wd("national_rt.xlsx"),
)(overall_rt) )(overall_rt)
# In-sample prediction # In-sample prediction
@rf.transform( @rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("insample7.pkl"), output=wd("insample7.pkl"),
) )
def insample7(input_files, output_file): def insample7(input_files, output_file):
predict( predict(
...@@ -114,7 +94,7 @@ if __name__ == "__main__": ...@@ -114,7 +94,7 @@ if __name__ == "__main__":
@rf.transform( @rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("insample14.pkl"), output=wd("insample14.pkl"),
) )
def insample14(input_files, output_file): def insample14(input_files, output_file):
return predict( return predict(
...@@ -129,7 +109,7 @@ if __name__ == "__main__": ...@@ -129,7 +109,7 @@ if __name__ == "__main__":
@rf.transform( @rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("medium_term.pkl"), output=wd("medium_term.pkl"),
) )
def medium_term(input_files, output_file): def medium_term(input_files, output_file):
return predict( return predict(
...@@ -144,31 +124,31 @@ if __name__ == "__main__": ...@@ -144,31 +124,31 @@ if __name__ == "__main__":
rf.transform( rf.transform(
input=next_generation_matrix, input=next_generation_matrix,
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("rt_summary.csv"), output=wd("rt_summary.csv"),
)(summarize.rt) )(summarize.rt)
rf.transform( rf.transform(
input=medium_term, input=medium_term,
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("infec_incidence_summary.csv"), output=wd("infec_incidence_summary.csv"),
)(summarize.infec_incidence) )(summarize.infec_incidence)
rf.transform( rf.transform(
input=[[process_data, thin_samples, medium_term]], input=[[process_data, thin_samples, medium_term]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("prevalence_summary.csv"), output=wd("prevalence_summary.csv"),
)(summarize.prevalence) )(summarize.prevalence)
rf.transform( rf.transform(
input=[[process_data, thin_samples]], input=[[process_data, thin_samples]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("within_between_summary.csv"), output=wd("within_between_summary.csv"),
)(within_between) )(within_between)
@rf.transform( @rf.transform(
input=[[process_data, insample7, insample14]], input=[[process_data, insample7, insample14]],
filter=rf.formatter(), filter=rf.formatter(),
output=work_dir("exceedance_summary.csv"), output=wd("exceedance_summary.csv"),
) )
def exceedance(input_files, output_file): def exceedance(input_files, output_file):
exceed7 = case_exceedance((input_files[0], input_files[1]), 7) exceed7 = case_exceedance((input_files[0], input_files[1]), 7)
...@@ -203,7 +183,7 @@ if __name__ == "__main__": ...@@ -203,7 +183,7 @@ if __name__ == "__main__":
] ]
], ],
rf.formatter(), rf.formatter(),
work_dir("prediction.gpkg"), wd("prediction.gpkg"),
global_config["Geopackage"], global_config["Geopackage"],
)(summary_geopackage) )(summary_geopackage)
......
...@@ -19,25 +19,4 @@ export XLA_FLAGS="--xla_gpu_cuda_data_dir=$CUDA_HOME" ...@@ -19,25 +19,4 @@ export XLA_FLAGS="--xla_gpu_cuda_data_dir=$CUDA_HOME"
echo Args: "$@" echo Args: "$@"
echo -n "Preparing config..." poetry run python -m covid.pipeline "$@"
CONFIG=`poetry run python -m covid.tasks.prepare_config "$@"`
echo "Done"
echo Using config: $CONFIG
echo Working directory: `pwd`
echo -n "Run inference..."
poetry run python -m covid.tasks.inference -c "$CONFIG"
echo "Done"
echo -n "Create summary..."
poetry run python -m covid.tasks.summary -c "$CONFIG"
echo "Done"
echo -n "Localisation summary..."
poetry run python -m covid.tasks.within_between -c "$CONFIG"
echo "Done"
echo -n "Hotspot detection..."
poetry run python -m covid.tasks.hotspot_detection -c "$CONFIG"
echo "Done"
...@@ -2,9 +2,10 @@ ...@@ -2,9 +2,10 @@
# Enqueues COVID-19 pipelines # Enqueues COVID-19 pipelines
CASES_FILE="data/Anonymised Combined Line List 20210104.csv" CASES_FILE="data/Anonymised Combined Line List 20210108.csv"
DATE_LOW="2020-10-09" COMMUTE_VOL_FILE="data/210108_OFF_SEN_COVID19_road_traffic_national_table.xlsx"
DATE_HIGH="2021-01-01" DATE_LOW="2020-10-13"
DATE_HIGH="2021-01-05"
TEMPLATE_CONFIG=template_config.yaml TEMPLATE_CONFIG=template_config.yaml
...@@ -12,7 +13,7 @@ TEMPLATE_CONFIG=template_config.yaml ...@@ -12,7 +13,7 @@ TEMPLATE_CONFIG=template_config.yaml
# Job submisison # Job submisison
switch-gpu switch-gpu
for PILLAR in both 1 for PILLAR in both 1 2
do do
for CASE_DATE_TYPE in specimen for CASE_DATE_TYPE in specimen
do do
...@@ -20,11 +21,11 @@ do ...@@ -20,11 +21,11 @@ do
JOB_NAME="covid_${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}" JOB_NAME="covid_${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}"
qsub -N $JOB_NAME covid_pipeline.sge \ qsub -N $JOB_NAME covid_pipeline.sge \
--reported-cases "$CASES_FILE" \ --reported-cases "$CASES_FILE" \
--commute-volume "$COMMUTE_VOL_FILE" \
--case-date-type $CASE_DATE_TYPE \ --case-date-type $CASE_DATE_TYPE \
--pillar $PILLAR \ --pillar $PILLAR \
--inference-period $DATE_LOW $DATE_HIGH \ --date-range $DATE_LOW $DATE_HIGH \
--results-dir "$RESULTS_DIR" \ --results-dir "$RESULTS_DIR" \
--output "$RESULTS_DIR/config.yaml" \ --config $TEMPLATE_CONFIG
$TEMPLATE_CONFIG
done done
done done
...@@ -7,9 +7,6 @@ ProcessData: ...@@ -7,9 +7,6 @@ ProcessData:
mobility_matrix: data/mergedflows.csv mobility_matrix: data/mergedflows.csv
population_size: data/c2019modagepop.csv population_size: data/c2019modagepop.csv
commute_volume: data/201231_OFF_SEN_COVID19_road_traffic_national_table.xlsx commute_volume: data/201231_OFF_SEN_COVID19_road_traffic_national_table.xlsx
reported_cases: data/Anonymised Combined Line List 20210104.csv
case_date_type: specimen
pillar: both
CasesData: CasesData:
input: csv input: csv
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment