Commit bfcaf904 authored by Chris Jewell's avatar Chris Jewell
Browse files

Major new pipelining structure

==============================

enqueue_pipeline.sh --> covid_pipeline.sge --> prepare_config.py --> inference.py --> summary.py

enqueue_pipeline.sh identifies the dataset and analysis period, launching 4 SGE jobs [P1, P1+2]\times[specimen_date, report_date]

Each SGE job runs the inference and summary scripts, using information contained in a config.yaml file.

The config.yaml file, together with results outputs, are stored in a results directory specified the config file
and over-ridden in the top-level enqueue_pipeline.sh script.
parent efe94dd2
"""General argument parsing for all scripts"""
import argparse
def cli_args():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="configuration file")
args = parser.parse_args()
return args
......@@ -68,7 +68,7 @@ def _merge_ltla(series):
def read_phe_cases(
path, date_low, date_high, pillar=None, date_type="specimen", ltlas=None
path, date_low, date_high, pillar="both", date_type="specimen", ltlas=None
):
"""Reads a PHE Anonymised Line Listing for dates in [low_date, high_date)
:param path: path to PHE Anonymised Line Listing Data
......@@ -77,6 +77,7 @@ def read_phe_cases(
:returns: a Pandas data frame of LTLAs x dates
"""
date_type_map = {"specimen": "specimen_date", "report": "lab_report_date"}
pillar_map = {"both": None, "1": "Pillar 1", "2": "Pillar 2"}
line_listing = pd.read_csv(
path, usecols=[date_type_map[date_type], "LTLA_code", "pillar"]
......@@ -92,8 +93,8 @@ def read_phe_cases(
]
# Choose pillar
if pillar is not None:
line_listing = line_listing.loc[line_listing["pillar"] == pillar]
if pillar_map[pillar] is not None:
line_listing = line_listing.loc[line_listing["pillar"] == pillar_map[pillar]]
# Drop na rows
orig_len = line_listing.shape[0]
......
#!/bin/bash
#$ -S /bin/bash
#$ -N covidp1p2
#$ -q gpu
#$ -l ngpus=1
# -l ncpus=2
#$ -l h_vmem=32G
#$ -l h_rt=12:00:00
#$ -j y
#$ -cwd
. /etc/profile
. $HOME/.bashrc
module add cuda/10.1
conda activate covid19uk
export XLA_FLAGS="--xla_gpu_cuda_data_dir=$CUDA_HOME"
echo Args: "$@"
echo -n "Preparing config..."
CONFIG=`python prepare_config.py "$@"`
echo "Done"
echo Using config: $CONFIG
echo Working directory: `pwd`
echo -n "Run inference..."
python inference.py -c "$CONFIG"
echo "Done"
echo -n "Create summary..."
python summary.py -c "$CONFIG"
echo "Done"
#!/bin/bash
# Enqueues COVID-19 pipelines
CASES_FILE="data/Anonymised Combined Line List 20201002.csv"
DATE_LOW="2020-07-19"
DATE_HIGH="2020-09-28"
TEMPLATE_CONFIG=template_config.yaml
# Job submisison
switch-gpu
for PILLAR in both 1
do
for CASE_DATE_TYPE in specimen report
do
RESULTS_DIR=$global_scratch/covid19/${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}
JOB_NAME="covid_${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}"
qsub -N $JOB_NAME covid_pipeline.sge \
--reported-cases "$CASES_FILE" \
--case-date-type $CASE_DATE_TYPE \
--pillar $PILLAR \
--inference-period $DATE_LOW $DATE_HIGH \
--results-dir "$RESULTS_DIR" \
--output "$RESULTS_DIR/config.yaml" \
$TEMPLATE_CONFIG
done
done
......@@ -6,7 +6,7 @@ import os
# Uncomment to block GPU use
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
from time import perf_counter
......@@ -24,6 +24,7 @@ from covid.impl.occult_events_mh import UncalibratedOccultUpdate, TransitionTopo
from covid.impl.gibbs import DeterministicScanKernel, GibbsStep, flatten_results
from covid.impl.multi_scan_kernel import MultiScanKernel
from covid.data import read_phe_cases
from covid.cli_arg_parse import cli_args
import model_spec
......@@ -41,16 +42,7 @@ DTYPE = model_spec.DTYPE
if __name__ == "__main__":
# Read in settings
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
type=str,
default="example_config.yaml",
help="configuration file",
)
args = parser.parse_args()
print("Loading config file:", args.config)
args = cli_args()
with open(args.config, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
......@@ -69,7 +61,8 @@ if __name__ == "__main__":
config["data"]["reported_cases"],
date_low=inference_period[0],
date_high=inference_period[1],
date_type="report",
date_type=config["data"]["case_date_type"],
pillar=config["data"]["pillar"],
).astype(DTYPE)
# Impute censored events, return cases
......@@ -258,7 +251,10 @@ if __name__ == "__main__":
# Output Files
posterior = h5py.File(
os.path.expandvars(config["output"]["posterior"]),
os.path.join(
os.path.expandvars(config["output"]["results_dir"]),
config["output"]["posterior"],
),
"w",
rdcc_nbytes=1024 ** 2 * 400,
rdcc_nslots=100000,
......
"""Prepare a config file for the COVID-19 pipeline"""
import sys
from pathlib import Path
import shutil
import numpy as np
import argparse
import yaml
def make_or_refresh_dir(path):
dirpath = Path(path)
if dirpath.exists():
shutil.rmtree(dirpath)
dirpath.mkdir()
parser = argparse.ArgumentParser()
parser.add_argument(
"-o",
"--output",
type=str,
default="stdout",
help="Output file. If unspecified or `stdout`, writes to stdout.",
)
parser.add_argument(
"--inference-period",
type=np.datetime64,
nargs=2,
help="Date range [low high)",
metavar="ISO6801",
)
parser.add_argument("--reported-cases", type=str, help="Path to case file")
parser.add_argument("--commute-volume", type=str, help="Path to commute volume file")
parser.add_argument(
"--case-date-type",
type=str,
help="Case date type (specimen | report)",
choices=["specimen", "report"],
)
parser.add_argument("--pillar", type=str, help="Pillar", choices=["both", "1", "2"])
parser.add_argument("--results-dir", type=str, help="Results directory")
parser.add_argument("config_template", type=str, help="Config file template")
args = parser.parse_args()
template_file = args.config_template
output_file = args.output
with open(template_file, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
for key in ["reported_cases", "commute_volume", "case_date_type", "pillar"]:
val = getattr(args, key)
if val is not None:
config["data"][key] = val
if args.inference_period is not None:
config["settings"]["inference_period"] = [str(x) for x in args.inference_period]
for key in ["results_dir"]:
val = getattr(args, key)
if val is not None:
config["output"][key] = val
make_or_refresh_dir(config["output"]["results_dir"])
if args.output == "stdout":
print(yaml.dump(config))
else:
with open(args.output, "w") as f:
yaml.dump(config, f)
print(args.output)
sys.exit(0)
"""Calculate Rt given a posterior"""
import argparse
import os
import yaml
import h5py
import numpy as np
import pandas as pd
import geopandas as gp
import tensorflow as tf
from covid.cli_arg_parse import cli_args
from covid.model import (
rayleigh_quotient,
power_iteration,
)
from covid.impl.util import compute_state
from covid.summary import mean_and_ci
......@@ -18,9 +21,7 @@ import model_spec
DTYPE = model_spec.DTYPE
CONFIG_FILE = "example_config.yaml"
GIS_TEMPLATE = "data/uk_clip.gpkg"
GIS_OUTPUT = "example_gis_2020-09-25.gpkg"
GIS_TEMPLATE = "data/UK2019mod_pop.gpkg"
# Reproduction number calculation
def calc_R_it(theta, xi, events, init_state, covar_data):
......@@ -58,7 +59,7 @@ def predicted_incidence(theta, xi, init_state, init_step, num_steps):
:param events: a [B, M, S] batched state tensor
:param init_step: the initial time step
:param num_steps: the number of steps to simulate
:returns: a tensor of shape [B, M, num_steps, X] where X is the number of state
:returns: a tensor of srt_quhape [B, M, num_steps, X] where X is the number of state
transitions
"""
......@@ -104,20 +105,33 @@ def predicted_events(events, name=None):
if __name__ == "__main__":
args = cli_args()
# Get general config
with open(CONFIG_FILE, "r") as f:
with open(args.config, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
inference_period = [
np.datetime64(x) for x in config["settings"]["inference_period"]
]
# Load covariate data
covar_data = model_spec.read_covariates(config["data"])
covar_data = model_spec.read_covariates(
config["data"], date_low=inference_period[0], date_high=inference_period[1]
)
# Load posterior file
posterior = h5py.File(
config["output"]["posterior"], "r", rdcc_nbytes=1024 ** 3, rdcc_nslots=1e6,
os.path.expandvars(
os.path.join(config["output"]["results_dir"], config["output"]["posterior"])
),
"r",
rdcc_nbytes=1024 ** 3,
rdcc_nslots=1e6,
)
# Pre-determined thinning of posterior (better done in MCMC?)
idx = slice(posterior["samples/theta"].shape[0]) # range(6000, 10000, 10)
idx = range(6000, 10000, 10)
theta = posterior["samples/theta"][idx]
xi = posterior["samples/xi"][idx]
events = posterior["samples/events"][idx]
......@@ -133,7 +147,9 @@ if __name__ == "__main__":
b, _ = power_iteration(ngms)
rt = rayleigh_quotient(ngms, b)
q = np.arange(0.05, 1.0, 0.05)
rt_quantiles = np.stack([q, np.quantile(rt, q)], axis=-1)
rt_quantiles = pd.DataFrame({"Rt": np.quantile(rt, q)}, index=q).T.to_excel(
os.path.join(config["output"]["results_dir"], config["output"]["national_rt"]),
)
# Prediction requires simulation from the last available timepoint for 28 + 4 + 1 days
# Note a 4 day recording lag in the case timeseries data requires that
......@@ -176,7 +192,7 @@ if __name__ == "__main__":
geodata[k] = arr
## GIS here
ltla = gp.read_file(GIS_TEMPLATE)
ltla = gp.read_file(GIS_TEMPLATE, layer="UK2019mod_pop_xgen")
ltla = ltla[ltla["lad19cd"].str.startswith("E")] # England only, for now.
ltla = ltla.sort_values("lad19cd")
rti = tf.reduce_sum(ngms, axis=-1)
......@@ -205,4 +221,7 @@ if __name__ == "__main__":
"(lad19cd|lad19nm$|prev|cases|Rt|popsize|geometry)", regex=True
),
]
ltla.to_file(GIS_OUTPUT, driver="GPKG")
ltla.to_file(
os.path.join(config["output"]["results_dir"], config["output"]["geopackage"]),
driver="GPKG",
)
# Covid_ODE model configuration
data:
mobility_matrix: data/mergedflows.csv
population_size: data/c2019modagepop.csv
commute_volume: data/200929_OFF_SEN_COVID19_road_traffic_national_table.xlsx
reported_cases: data/Anonymised Combined Line List 20201002.csv
case_date_type: specimen
pillar: both
parameter:
beta1: 0.291 # R0 2.4
beta2: 0.876 # Contact with commuters 1/3rd of the time
beta3: 1.0 # lockdown vs normal
omega: 1.0 # Non-linearity parameter for commuting volume
nu: 0.5 # E -> I transition rate
gamma: 0.45 # I -> R transition rate
xi:
- 2.07
- 0.15
- -1.08
- -0.78
- -0.73
- -0.51
settings:
inference_period:
- 2020-07-03
- 2020-09-29
time_step: 1.
prediction_period:
- 2020-02-19
- 2020-08-01
mcmc:
dmax: 21
nmax: 50
m: 1
occult_nmax: 5
num_event_time_updates: 35
num_bursts: 200
num_burst_samples: 1000
thin: 20
output:
results_dir: $global_scratch/covid19/
posterior: posterior.hd5
national_rt: national_rt.xlsx
geopackage: prediction.gpkg
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment