Commit 17d98f36 authored by Chris Jewell's avatar Chris Jewell
Browse files

Automatic web-scraping enabled

parent c6ec2155
"""Loads COVID-19 case data""" """Loads COVID-19 case data"""
from warnings import warn
import requests
import json
import numpy as np import numpy as np
import pandas as pd import pandas as pd
...@@ -42,7 +45,10 @@ class CasesData: ...@@ -42,7 +45,10 @@ class CasesData:
""" """
Placeholder, in case we wish to interface with an API. Placeholder, in case we wish to interface with an API.
""" """
pass response = requests.get(url)
content = json.loads(response.content)
df = pd.read_json(json.dumps(content["body"]))
return df
def getCSV(file): def getCSV(file):
""" """
...@@ -110,9 +116,34 @@ class CasesData: ...@@ -110,9 +116,34 @@ class CasesData:
measure, measure,
areacodes, areacodes,
) )
elif (settings["input"] == "url") and (settings["format"] == "json"):
df = CasesData.adapt_gov_api(
df, date_low, date_high, pillars, measure, areacodes
)
return df return df
def adapt_gov_api(df, date_low, date_high, pillars, measure, areacodes):
warn("Using API data: 'pillar' and 'measure' will be ignored")
df = df.rename(
columns={"areaCode": "location", "newCasesBySpecimenDate": "cases"}
)
df = df[["location", "date", "cases"]]
df["date"] = pd.to_datetime(df["date"])
df["location"] = merge_lad_codes(df["location"])
df = df[df["location"].isin(areacodes)]
df.index = pd.MultiIndex.from_frame(df[["location", "date"]])
df = df.sort_index()
dates = pd.date_range(date_low, date_high, closed="left")
multi_index = pd.MultiIndex.from_product([areacodes, dates])
ser = df["cases"].reindex(multi_index, fill_value=0.0)
ser.index.names = ["location", "date"]
ser.name = "cases"
return ser
def adapt_phe(df, date_low, date_high, pillars, measure, areacodes): def adapt_phe(df, date_low, date_high, pillars, measure, areacodes):
""" """
Adapt the line listing data to the desired dataframe format. Adapt the line listing data to the desired dataframe format.
...@@ -156,12 +187,11 @@ class CasesData: ...@@ -156,12 +187,11 @@ class CasesData:
# Fill in all dates, and add 0s for empty counts # Fill in all dates, and add 0s for empty counts
dates = pd.date_range(date_low, date_high, closed="left") dates = pd.date_range(date_low, date_high, closed="left")
indexes = [(lad19, date) for date in dates for lad19 in areacodes]
multi_indexes = pd.MultiIndex.from_product( multi_indexes = pd.MultiIndex.from_product(
[areacodes, dates], names=["location", "date"] [areacodes, dates], names=["location", "date"]
) )
results = df["cases"].reindex(multi_indexes, fill_value=0.0) results = df["cases"].reindex(multi_indexes, fill_value=0.0)
return results return results.sort_index()
def process(config): def process(config):
df = CasesData.get(config) df = CasesData.get(config)
......
"""Provides functions to format data"""
import pandas as pd
def _expand_quantiles(q_dict):
"""Expand a dictionary of quantiles"""
q_str = [
"0.05",
"0.1",
"0.15",
"0.2",
"0.25",
"0.3",
"0.35",
"0.4",
"0.45",
"0.5",
"0.55",
"0.6",
"0.65",
"0.7",
"0.75",
"0.8",
"0.85",
"0.9",
"0.95",
]
quantiles = {f"Quantile {q}": None for q in q_str}
if q_dict is None:
return quantiles
for k, v in q_dict.items():
q_key = f"Quantile {k}"
if q_key not in quantiles.keys():
raise KeyError(f"quantile '{k}' not compatible with template form")
quantiles[q_key] = v
return [
pd.Series(v, name=k).reset_index(drop=True)
for k, v in quantiles.items()
]
def _split_dates(dates):
if dates is None:
return {"day": None, "month": None, "year": None}
if hasattr(dates, "__iter__"):
dx = pd.DatetimeIndex(dates)
else:
dx = pd.DatetimeIndex([dates])
return {"day": dx.day, "month": dx.month, "year": dx.year}
def make_dstl_template(
group=None,
model=None,
scenario=None,
model_type=None,
version=None,
creation_date=None,
value_date=None,
age_band=None,
geography=None,
value_type=None,
value=None,
quantiles=None,
):
"""Formats a DSTL-type Excel results template"""
# Process date
creation_date_parts = _split_dates(creation_date)
value_date_parts = _split_dates(value_date)
quantile_series = _expand_quantiles(quantiles)
fields = {
"Group": group,
"Model": model,
"Scenario": scenario,
"ModelType": model_type,
"Version": version,
"Creation Day": creation_date_parts["day"],
"Creation Month": creation_date_parts["month"],
"Creation Year": creation_date_parts["year"],
"Day of Value": value_date_parts["day"],
"Month of Value": value_date_parts["month"],
"Year of Value": value_date_parts["year"],
"AgeBand": age_band,
"Geography": geography,
"ValueType": value_type,
"Value": value,
}
fields = [
pd.Series(v, name=k).reset_index(drop=True) for k, v in fields.items()
]
return pd.concat(fields + quantile_series, axis="columns").ffill(
axis="index"
)
...@@ -29,14 +29,18 @@ def gather_data(config): ...@@ -29,14 +29,18 @@ def gather_data(config):
date_low = np.datetime64(config["date_range"][0]) date_low = np.datetime64(config["date_range"][0])
date_high = np.datetime64(config["date_range"][1]) date_high = np.datetime64(config["date_range"][1])
mobility = data.read_mobility(config["mobility_matrix"]) locations = data.AreaCodeData.process(config)
popsize = data.read_population(config["population_size"]) mobility = data.read_mobility(
config["mobility_matrix"], locations["lad19cd"]
)
popsize = data.read_population(
config["population_size"], locations["lad19cd"]
)
commute_volume = data.read_traffic_flow( commute_volume = data.read_traffic_flow(
config["commute_volume"], date_low=date_low, date_high=date_high config["commute_volume"], date_low=date_low, date_high=date_high
) )
locations = data.AreaCodeData.process(config) # tier_restriction = data.TierData.process(config)[:, :, [0, 2, 3, 4]]
tier_restriction = data.TierData.process(config)[:, :, [0, 2, 3, 4]]
date_range = [date_low, date_high] date_range = [date_low, date_high]
weekday = ( weekday = (
pd.date_range(date_low, date_high - np.timedelta64(1, "D")).weekday < 5 pd.date_range(date_low, date_high - np.timedelta64(1, "D")).weekday < 5
...@@ -47,7 +51,6 @@ def gather_data(config): ...@@ -47,7 +51,6 @@ def gather_data(config):
C=mobility.to_numpy().astype(DTYPE), C=mobility.to_numpy().astype(DTYPE),
W=commute_volume.to_numpy().astype(DTYPE), W=commute_volume.to_numpy().astype(DTYPE),
N=popsize.to_numpy().astype(DTYPE), N=popsize.to_numpy().astype(DTYPE),
L=tier_restriction.astype(DTYPE),
weekday=weekday.astype(DTYPE), weekday=weekday.astype(DTYPE),
date_range=date_range, date_range=date_range,
locations=locations, locations=locations,
...@@ -217,9 +220,6 @@ def next_generation_matrix_fn(covar_data, param): ...@@ -217,9 +220,6 @@ def next_generation_matrix_fn(covar_data, param):
""" """
def fn(t, state): def fn(t, state):
L = tf.convert_to_tensor(covar_data["L"], DTYPE)
L = L - tf.reduce_mean(L, axis=(0, 1))
C = tf.convert_to_tensor(covar_data["C"], dtype=DTYPE) C = tf.convert_to_tensor(covar_data["C"], dtype=DTYPE)
C = tf.linalg.set_diag(C, -tf.reduce_sum(C, axis=-2)) C = tf.linalg.set_diag(C, -tf.reduce_sum(C, axis=-2))
C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE)) C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE))
...@@ -237,13 +237,9 @@ def next_generation_matrix_fn(covar_data, param): ...@@ -237,13 +237,9 @@ def next_generation_matrix_fn(covar_data, param):
) )
xi = tf.gather(param["xi"], xi_idx) xi = tf.gather(param["xi"], xi_idx)
L_idx = tf.clip_by_value(tf.cast(t, tf.int64), 0, L.shape[0] - 1) beta = tf.math.exp(xi)
Lt = L[-1] # Last timepoint
xB = tf.linalg.matvec(Lt, param["beta3"])
beta = tf.math.exp(xi + xB)
ngm = beta[:, tf.newaxis] * ( ngm = beta * (
tf.eye(Cstar.shape[0], dtype=state.dtype) tf.eye(Cstar.shape[0], dtype=state.dtype)
+ param["beta2"] * commute_volume * Cstar / N[tf.newaxis, :] + param["beta2"] * commute_volume * Cstar / N[tf.newaxis, :]
) )
......
...@@ -18,6 +18,7 @@ from covid.tasks import ( ...@@ -18,6 +18,7 @@ from covid.tasks import (
case_exceedance, case_exceedance,
summary_geopackage, summary_geopackage,
insample_predictive_timeseries, insample_predictive_timeseries,
summary_longformat,
) )
__all__ = ["run_pipeline"] __all__ = ["run_pipeline"]
...@@ -190,3 +191,12 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -190,3 +191,12 @@ def run_pipeline(global_config, results_directory, cli_options):
)(summary_geopackage) )(summary_geopackage)
rf.cmdline.run(cli_options) rf.cmdline.run(cli_options)
# DSTL Summary
rf.transform(
[[process_data, insample14, medium_term, next_generation_matrix]],
rf.formatter(),
wd("summary_longformat.xlsx"),
)(summary_longformat)
rf.cmdline.run(cli_options)
...@@ -11,6 +11,8 @@ from covid.tasks.within_between import within_between ...@@ -11,6 +11,8 @@ from covid.tasks.within_between import within_between
from covid.tasks.case_exceedance import case_exceedance from covid.tasks.case_exceedance import case_exceedance
from covid.tasks.summary_geopackage import summary_geopackage from covid.tasks.summary_geopackage import summary_geopackage
from covid.tasks.insample_predictive_timeseries import insample_predictive_timeseries from covid.tasks.insample_predictive_timeseries import insample_predictive_timeseries
from covid.tasks.summary_longformat import summary_longformat
__all__ = [ __all__ = [
"assemble_data", "assemble_data",
...@@ -24,4 +26,5 @@ __all__ = [ ...@@ -24,4 +26,5 @@ __all__ = [
"case_exceedance", "case_exceedance",
"summary_geopackage", "summary_geopackage",
"insample_predictive_timeseries", "insample_predictive_timeseries",
"summary_longformat",
] ]
...@@ -39,7 +39,7 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): ...@@ -39,7 +39,7 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
# We load in cases and impute missing infections first, since this sets the # We load in cases and impute missing infections first, since this sets the
# time epoch which we are analysing. # time epoch which we are analysing.
# Impute censored events, return cases # Impute censored events, return cases
print("Data shape:", data['cases'].shape) print("Data shape:", data["cases"].shape)
events = model_spec.impute_censored_events(data["cases"].astype(DTYPE)) events = model_spec.impute_censored_events(data["cases"].astype(DTYPE))
# Initial conditions are calculated by calculating the state # Initial conditions are calculated by calculating the state
...@@ -98,9 +98,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): ...@@ -98,9 +98,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
tfp.bijectors.Exp(), tfp.bijectors.Exp(),
tfp.bijectors.Identity(), tfp.bijectors.Identity(),
tfp.bijectors.Exp(), tfp.bijectors.Exp(),
#tfp.bijectors.Identity(), # tfp.bijectors.Identity(),
], ],
block_sizes=[1, 2, 1], #, 5], block_sizes=[1, 2, 1], # , 5],
), ),
name=name, name=name,
) )
...@@ -251,7 +251,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): ...@@ -251,7 +251,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
tf.random.set_seed(2) tf.random.set_seed(2)
current_state = [ current_state = [
np.array([0.6, 0.0, 0.0, 0.1], dtype=DTYPE), #, 0.0, 0.0, 0.0, 0.0, 0.0], dtype=DTYPE), np.array(
[0.6, 0.0, 0.0, 0.1], dtype=DTYPE
), # , 0.0, 0.0, 0.0, 0.0, 0.0], dtype=DTYPE),
np.zeros( np.zeros(
model.model["xi"](0.0, 0.1).event_shape[-1] + 1, model.model["xi"](0.0, 0.1).event_shape[-1] + 1,
dtype=DTYPE, dtype=DTYPE,
...@@ -269,13 +271,16 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): ...@@ -269,13 +271,16 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
"gamma0": (samples[0][:, 1], (NUM_BURST_SAMPLES,)), "gamma0": (samples[0][:, 1], (NUM_BURST_SAMPLES,)),
"gamma1": (samples[0][:, 2], (NUM_BURST_SAMPLES,)), "gamma1": (samples[0][:, 2], (NUM_BURST_SAMPLES,)),
"sigma": (samples[0][:, 3], (NUM_BURST_SAMPLES,)), "sigma": (samples[0][:, 3], (NUM_BURST_SAMPLES,)),
"beta3": (tf.zeros([1,5], dtype=DTYPE), (NUM_BURST_SAMPLES, 2)), #(samples[0][:, 4:], (NUM_BURST_SAMPLES, 2)), "beta3": (
tf.zeros([1, 5], dtype=DTYPE),
(NUM_BURST_SAMPLES, 2),
), # (samples[0][:, 4:], (NUM_BURST_SAMPLES, 2)),
"beta1": (samples[1][:, 0], (NUM_BURST_SAMPLES,)), "beta1": (samples[1][:, 0], (NUM_BURST_SAMPLES,)),
"xi": ( "xi": (
samples[1][:, 1:], samples[1][:, 1:],
(NUM_BURST_SAMPLES, samples[1].shape[1] - 1), (NUM_BURST_SAMPLES, samples[1].shape[1] - 1),
), ),
"events": (samples[2], (NUM_BURST_SAMPLES, 64, 64, 1)), "events": (samples[2], (NUM_BURST_SAMPLES, 32, 32, 1)),
}, },
results_dict=results, results_dict=results,
num_samples=NUM_SAVED_SAMPLES, num_samples=NUM_SAVED_SAMPLES,
...@@ -308,7 +313,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True): ...@@ -308,7 +313,9 @@ def mcmc(data_file, output_file, config, use_autograph=False, use_xla=True):
"gamma0": samples[0][:, 1], "gamma0": samples[0][:, 1],
"gamma1": samples[0][:, 2], "gamma1": samples[0][:, 2],
"sigma": samples[0][:, 3], "sigma": samples[0][:, 3],
"beta3": tf.zeros([samples[0].shape[0], 5], dtype=DTYPE), #samples[0][:, 4:], "beta3": tf.zeros(
[samples[0].shape[0], 5], dtype=DTYPE
), # samples[0][:, 4:],
"beta1": samples[1][:, 0], "beta1": samples[1][:, 0],
"xi": samples[1][:, 1:], "xi": samples[1][:, 1:],
"events": samples[2], "events": samples[2],
......
...@@ -22,12 +22,11 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps): ...@@ -22,12 +22,11 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps):
@tf.function @tf.function
def sim_fn(args): def sim_fn(args):
beta1_, beta2_, beta3_, sigma_, xi_, gamma0_, gamma1_, init_ = args beta1_, beta2_, sigma_, xi_, gamma0_, gamma1_, init_ = args
par = dict( par = dict(
beta1=beta1_, beta1=beta1_,
beta2=beta2_, beta2=beta2_,
beta3=beta3_,
sigma=sigma_, sigma=sigma_,
xi=xi_, xi=xi_,
gamma0=gamma0_, gamma0=gamma0_,
...@@ -54,7 +53,6 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps): ...@@ -54,7 +53,6 @@ def predicted_incidence(posterior_samples, covar_data, init_step, num_steps):
elems=( elems=(
posterior_samples["beta1"], posterior_samples["beta1"],
posterior_samples["beta2"], posterior_samples["beta2"],
posterior_samples["beta3"],
posterior_samples["sigma"], posterior_samples["sigma"],
posterior_samples["xi"], posterior_samples["xi"],
posterior_samples["gamma0"], posterior_samples["gamma0"],
......
...@@ -34,8 +34,6 @@ def summary_geopackage(input_files, output_file, config): ...@@ -34,8 +34,6 @@ def summary_geopackage(input_files, output_file, config):
geo = geo[geo["lad19cd"].isin(data["locations"]["lad19cd"])] geo = geo[geo["lad19cd"].isin(data["locations"]["lad19cd"])]
geo = geo.sort_values(by="lad19cd") geo = geo.sort_values(by="lad19cd")
geo["current_alert_level"] = _tier_enum(data["L"])
# Dump data into the geopackage # Dump data into the geopackage
while len(input_files) > 0: while len(input_files) > 0:
fn = input_files.pop() fn = input_files.pop()
......
"""Produces a long-format summary of fitted model results"""
import pickle as pkl
from datetime import date
import numpy as np
import pandas as pd
import xarray
from gemlib.util import compute_state
from covid.model_spec import STOICHIOMETRY
from covid import model_spec
from covid.formats import make_dstl_template
def xarray2summarydf(arr):
mean = arr.mean(dim="iteration").to_dataset(name="value")
quantiles = arr.quantile(q=[0.05, 0.5, 0.95], dim="iteration").to_dataset(
dim="quantile"
)
ds = mean.merge(quantiles).rename_vars(
{0.05: "0.05", 0.5: "0.5", 0.95: "0.95"}
)
return ds.to_dataframe().reset_index()
def prevalence(events, popsize):
prev = compute_state(events.attrs["initial_state"], events, STOICHIOMETRY)
prev = xarray.DataArray(
prev.numpy(),
coords=[
np.arange(prev.shape[0]),
events.coords["location"],
events.coords["time"],
np.arange(prev.shape[-1]),
],
dims=["iteration", "location", "time", "state"],
)
prev_per_1e5 = (
prev[..., 1:3].sum(dim="state").reset_coords(drop=True)
/ popsize[np.newaxis, :, np.newaxis]
* 100000
)
return xarray2summarydf(prev_per_1e5)
def summary_longformat(input_files, output_file):
"""Draws together pipeline results into a long format
csv file.
:param input_files: a list of filenames [data_pkl,
insample14_pkl,
medium_term_pred_pkl,
ngm_pkl]
:param output_file: the output CSV with columns `[date,
location,value_name,value,q0.025,q0.975]`
"""
with open(input_files[0], "rb") as f:
data = pkl.load(f)
da = data["cases"].rename({"date": "time"})
df = da.to_dataframe(name="value").reset_index()
df["value_name"] = "newCasesBySpecimenDate"
df["0.05"] = np.nan
df["0.5"] = np.nan
df["0.95"] = np.nan
# Insample predictive incidence
with open(input_files[1], "rb") as f:
insample = pkl.load(f)
insample_df = xarray2summarydf(insample[..., 2].reset_coords(drop=True))
insample_df["value_name"] = "insample14_Cases"
df = pd.concat([df, insample_df], axis="index")
# Medium term incidence
with open(input_files[2], "rb") as f:
medium_term = pkl.load(f)
medium_df = xarray2summarydf(medium_term[..., 2].reset_coords(drop=True))
medium_df["value_name"] = "Cases"
df = pd.concat([df, medium_df], axis="index")
# Medium term prevalence
prev_df = prevalence(medium_term, data["N"])
prev_df["value_name"] = "Prevalence"
df = pd.concat([df, prev_df], axis="index")
# Rt
with open(input_files[3], "rb") as f:
ngms = pkl.load(f)
rt = ngms.sum(dim="dest")
rt = rt.rename({"src": "location"})
rt_summary = xarray2summarydf(rt)
rt_summary["value_name"] = "R"
rt_summary["time"] = data["date_range"][1]
df = pd.concat([df, rt_summary], axis="index")
return make_dstl_template(
group="Lancaster",
model="SpatialStochasticSEIR",
creation_date=date.today(),
version=model_spec.VERSION,
age_band="All",
geography=df["location"],
value_date=df["time"],
value_type=df["value_name"],
quantiles={
"0.05": df["0.05"],
"0.5": df["0.5"],
"0.95": df["0.95"],
},
).to_excel(output_file)
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
# Enqueues COVID-19 pipelines # Enqueues COVID-19 pipelines
CASES_FILE="data/Anonymised Combined Line List 20210125.csv"
COMMUTE_VOL_FILE="data/210122_OFF_SEN_COVID19_road_traffic_national_table.xlsx" COMMUTE_VOL_FILE="data/210122_OFF_SEN_COVID19_road_traffic_national_table.xlsx"
DATE_LOW="2020-10-30" DATE_LOW="2020-10-30"
DATE_HIGH="2021-01-22" DATE_HIGH="2021-01-22"
...@@ -12,20 +11,10 @@ JSV_SCRIPT=/usr/shared_apps/packages/sge-8.1.9-gpu/default/common/sge_request.js ...@@ -12,20 +11,10 @@ JSV_SCRIPT=/usr/shared_apps/packages/sge-8.1.9-gpu/default/common/sge_request.js
# Job submisison # Job submisison
switch-gpu switch-gpu
RESULTS_DIR=$global_scratch/covid19/scotland_${DATE_HIGH}_notier
for PILLAR in both 1 JOB_NAME="covid_scotland_${DATE_HIGH}_notier"
do qsub -N $JOB_NAME covid_pipeline.sge \
for CASE_DATE_TYPE in specimen
do
RESULTS_DIR=$global_scratch/covid19/${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}_notier
JOB_NAME="covid_${DATE_HIGH}_${PILLAR}_${CASE_DATE_TYPE}_notier"
qsub -N $JOB_NAME covid_pipeline.sge \
--reported-cases "$CASES_FILE" \
--commute-volume "$COMMUTE_VOL_FILE" \ --commute-volume "$COMMUTE_VOL_FILE" \
--case-date-type $CASE_DATE_TYPE \
--pillar $PILLAR \
--date-range $DATE_LOW $DATE_HIGH \ --date-range $DATE_LOW $DATE_HIGH \
--results-dir "$RESULTS_DIR" \ --results-dir "$RESULTS_DIR" \
--config $TEMPLATE_CONFIG