Commit 7cf93af6 authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'tier_restriction' into production

parents d185fc3f b59b68a7
......@@ -3,10 +3,10 @@
import argparse
def cli_args():
def cli_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="configuration file")
args = parser.parse_args()
args = parser.parse_args(args)
return args
......@@ -5,7 +5,12 @@ from warnings import warn
import numpy as np
import pandas as pd
__all__ = ["read_mobility", "read_population", "read_traffic_flow", "read_phe_cases"]
__all__ = [
"read_mobility",
"read_population",
"read_traffic_flow",
"read_phe_cases",
]
def read_mobility(path):
......@@ -19,7 +24,8 @@ def read_mobility(path):
"""
mobility = pd.read_csv(path)
mobility = mobility[
mobility["From"].str.startswith("E") & mobility["To"].str.startswith("E")
mobility["From"].str.startswith("E")
& mobility["To"].str.startswith("E")
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
......@@ -40,7 +46,9 @@ def read_population(path):
return pop
def read_traffic_flow(path: str, date_low: np.datetime64, date_high: np.datetime64):
def read_traffic_flow(
path: str, date_low: np.datetime64, date_high: np.datetime64
):
"""Read traffic flow data, returning a timeseries between dates.
:param path: path to a traffic flow CSV with <date>,<Car> columns
:returns: a Pandas timeseries
......@@ -50,8 +58,12 @@ def read_traffic_flow(path: str, date_low: np.datetime64, date_high: np.datetime
)
commute_raw.index = pd.to_datetime(commute_raw.index, format="%Y-%m-%d")
commute_raw.sort_index(axis=0, inplace=True)
commute = pd.DataFrame(index=np.arange(date_low, date_high, np.timedelta64(1, "D")))
commute = commute.merge(commute_raw, left_index=True, right_index=True, how="left")
commute = pd.DataFrame(
index=np.arange(date_low, date_high, np.timedelta64(1, "D"))
)
commute = commute.merge(
commute_raw, left_index=True, right_index=True, how="left"
)
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
......@@ -87,14 +99,18 @@ def read_phe_cases(
line_listing["lad19cd"] = _merge_ltla(line_listing["lad19cd"])
# Select dates
line_listing["date"] = pd.to_datetime(line_listing["date"], format="%d/%m/%Y")
line_listing["date"] = pd.to_datetime(
line_listing["date"], format="%d/%m/%Y"
)
line_listing = line_listing[
(date_low <= line_listing["date"]) & (line_listing["date"] < date_high)
]
# Choose pillar
if pillar_map[pillar] is not None:
line_listing = line_listing.loc[line_listing["pillar"] == pillar_map[pillar]]
line_listing = line_listing.loc[
line_listing["pillar"] == pillar_map[pillar]
]
# Drop na rows
orig_len = line_listing.shape[0]
......@@ -112,8 +128,47 @@ due to missing values ({100. * (orig_len - line_listing.shape[0])/orig_len}%)"
dates = pd.date_range(date_low, date_high, closed="left")
if ltlas is None:
ltlas = case_counts.index.levels[1]
index = pd.MultiIndex.from_product([dates, ltlas], names=["date", "lad19cd"])
index = pd.MultiIndex.from_product(
[dates, ltlas], names=["date", "lad19cd"]
)
case_counts = case_counts.reindex(index, fill_value=0)
return case_counts.reset_index().pivot(
index="lad19cd", columns="date", values="count"
)
def read_tier_restriction_data(
tier_restriction_csv, lad19cd_lookup, date_low, date_high
):
data = pd.read_csv(tier_restriction_csv)
data.loc[:, "date"] = pd.to_datetime(data["date"])
# Group merged ltlas
london = ["City of London", "Westminster"]
corn_scilly = ["Cornwall", "Isles of Scilly"]
data.loc[data["ltla"].isin(london), "ltla"] = ":".join(london)
data.loc[data["ltla"].isin(corn_scilly), "ltla"] = ":".join(corn_scilly)
# Fix up dodgy names
data.loc[
data["ltla"] == "Blackburn With Darwen", "ltla"
] = "Blackburn with Darwen"
# Merge
data = lad19cd_lookup.merge(
data, how="left", left_on="lad19nm", right_on="ltla"
)
# Re-index
data.index = pd.MultiIndex.from_frame(data[["date", "lad19cd"]])
data = data[["tier_2", "tier_3"]]
data = data[~data.index.duplicated()]
dates = pd.date_range(date_low, date_high - pd.Timedelta(1, "D"))
lad19cd = lad19cd_lookup["lad19cd"].sort_values().unique()
new_index = pd.MultiIndex.from_product([dates, lad19cd])
data = data.reindex(new_index, fill_value=0.0)
warn(f"Tier summary: {np.mean(data, axis=0)}")
# Pack into [T, M, V] array.
arr_data = data.to_xarray().to_array()
return np.transpose(arr_data, axes=[1, 2, 0])
......@@ -33,3 +33,7 @@ echo "Done"
echo -n "Create summary..."
poetry run python summary.py -c "$CONFIG"
echo "Done"
echo -n "Localisation summary..."
poetry run python within_between.py -c "$CONFIG"
echo "Done"
"""Methods to read in COVID-19 data and output
well-known formats"""
from warnings import warn
import numpy as np
import pandas as pd
def read_mobility(path):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
<id>,<val>,<val>,...
...
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
mobility = mobility[
mobility["From"].str.startswith("E") & mobility["To"].str.startswith("E")
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
def read_population(path):
"""Reads population CSV
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.str.startswith("E")]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
def read_traffic_flow(path: str, date_low: np.datetime64, date_high: np.datetime64):
"""Read traffic flow data, returning a timeseries between dates.
:param path: path to a traffic flow CSV with <date>,<Car> columns
:returns: a Pandas timeseries
"""
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
commute_raw.index = pd.to_datetime(commute_raw.index, format="%Y-%m-%d")
commute_raw.sort_index(axis=0, inplace=True)
commute = pd.DataFrame(index=np.arange(date_low, date_high, np.timedelta64(1, "D")))
commute = commute.merge(commute_raw, left_index=True, right_index=True, how="left")
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
def _merge_ltla(series):
london = ["E09000001", "E09000033"]
corn_scilly = ["E06000052", "E06000053"]
series.loc[series.isin(london)] = ",".join(london)
series.loc[series.isin(corn_scilly)] = ",".join(corn_scilly)
return series
def read_phe_cases(
path, date_low, date_high, pillar=None, date_type="specimen", ltlas=None
):
"""Reads a PHE Anonymised Line Listing for dates in [low_date, high_date)
:param path: path to PHE Anonymised Line Listing Data
:param low_date: lower date bound
:param high_date: upper date bound
:returns: a Pandas data frame of LTLAs x dates
"""
date_type_map = {"specimen": "specimen_date", "report": "lab_report_date"}
line_listing = pd.read_csv(
path, usecols=[date_type_map[date_type], "LTLA_code", "pillar"]
)[[date_type_map[date_type], "LTLA_code", "pillar"]]
line_listing.columns = ["date", "lad19cd", "pillar"]
line_listing["lad19cd"] = _merge_ltla(line_listing["lad19cd"])
line_listing["date"] = pd.to_datetime(line_listing["date"], format="%d/%m/%Y")
# Select dates
line_listing = line_listing[
(date_low <= line_listing["date"])
& (line_listing["date"] < (date_high - np.timedelta64(1, "D")))
]
# Choose pillar
if pillar is not None:
line_listing = line_listing.loc[line_listing["pillar"] == pillar]
# Drop na rows
orig_len = line_listing.shape[0]
line_listing = line_listing.dropna(axis=0)
warn(
f"Removed {orig_len - line_listing.shape[0]} rows of {orig_len} \
due to missing values ({100. * (orig_len - line_listing.shape[0])/orig_len}%)"
)
# Aggregate by date/region
case_counts = line_listing.groupby(["date", "lad19cd"]).size()
case_counts.name = "count"
# Re-index
dates = pd.date_range(date_low, date_high, closed="left")
if ltlas is None:
ltlas = case_counts.index.levels[1]
index = pd.MultiIndex.from_product([dates, ltlas], names=["date", "lad19cd"])
case_counts = case_counts.reindex(index, fill_value=0)
return case_counts.reset_index().pivot(
index="lad19cd", columns="date", values="count"
)
......@@ -2,9 +2,9 @@
# Enqueues COVID-19 pipelines
CASES_FILE="data/Anonymised Combined Line List 20201102.csv"
DATE_LOW="2020-08-07"
DATE_HIGH="2020-10-30"
CASES_FILE="data/Anonymised Combined Line List 20201109.csv"
DATE_LOW="2020-08-14"
DATE_HIGH="2020-11-06"
TEMPLATE_CONFIG=template_config.yaml
......
......@@ -10,16 +10,13 @@ import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability.python.experimental import unnest
from gemlib.util import compute_state
from gemlib.mcmc import UncalibratedEventTimesUpdate
from gemlib.mcmc import UncalibratedOccultUpdate, TransitionTopology
from gemlib.mcmc import GibbsKernel
from gemlib.mcmc.gibbs_kernel import GibbsKernelResults
from gemlib.mcmc.gibbs_kernel import flatten_results
from gemlib.mcmc import MultiScanKernel
from gemlib.mcmc import AdaptiveRandomWalkMetropolis
from gemlib.mcmc import Posterior
from covid.data import read_phe_cases
from covid.cli_arg_parse import cli_args
......@@ -102,18 +99,19 @@ if __name__ == "__main__":
initial_state=initial_state,
initial_step=0,
num_steps=events.shape[1],
priors=convert_priors(config['mcmc']['prior']),
priors=convert_priors(config["mcmc"]["prior"]),
)
# Full joint log posterior distribution
# $\pi(\theta, \xi, y^{se}, y^{ei} | y^{ir})$
def logp(theta, xi, events):
def logp(block0, block1, events):
return model.log_prob(
dict(
beta1=xi[0],
beta2=theta[0],
gamma=theta[1],
xi=xi[1:],
beta2=block0[0],
gamma=block0[1],
beta1=block1[0],
beta3=block1[1:3],
xi=block1[3:],
seir=events,
)
)
......@@ -127,12 +125,13 @@ if __name__ == "__main__":
# Q(Z^{ei}, Z^{ei\prime}) (partially-censored)
# Q(Z^{se}, Z^{se\prime}) (occult)
# Q(Z^{ei}, Z^{ei\prime}) (occult)
def make_theta_kernel(shape, name):
def make_blk0_kernel(shape, name):
def fn(target_log_prob_fn, _):
return tfp.mcmc.TransformedTransitionKernel(
inner_kernel=AdaptiveRandomWalkMetropolis(
target_log_prob_fn=target_log_prob_fn,
initial_covariance=np.eye(shape[0], dtype=model_spec.DTYPE) * 1e-1,
initial_covariance=np.eye(shape[0], dtype=model_spec.DTYPE)
* 1e-1,
covariance_burnin=200,
),
bijector=tfp.bijectors.Exp(),
......@@ -141,11 +140,12 @@ if __name__ == "__main__":
return fn
def make_xi_kernel(shape, name):
def make_blk1_kernel(shape, name):
def fn(target_log_prob_fn, _):
return AdaptiveRandomWalkMetropolis(
target_log_prob_fn=target_log_prob_fn,
initial_covariance=np.eye(shape[0], dtype=model_spec.DTYPE) * 1e-1,
initial_covariance=np.eye(shape[0], dtype=model_spec.DTYPE)
* 1e-1,
covariance_burnin=200,
name=name,
)
......@@ -207,32 +207,46 @@ if __name__ == "__main__":
# MCMC tracing functions
def trace_results_fn(_, results):
"""Returns log_prob, accepted, q_ratio"""
def f(result):
proposed_results = unnest.get_innermost(result, "proposed_results")
log_prob = proposed_results.target_log_prob
accepted = tf.cast(
unnest.get_innermost(result, "is_accepted"), log_prob.dtype
)
q_ratio = proposed_results.log_acceptance_correction
if hasattr(proposed_results, "extra"):
proposed = tf.cast(proposed_results.extra, log_prob.dtype)
return tf.concat(
[[log_prob], [accepted], [q_ratio], proposed], axis=0
)
return tf.concat([[log_prob], [accepted], [q_ratio]], axis=0)
"""Packs results into a dictionary"""
results_dict = {}
res0 = results.inner_results
results_dict["theta"] = {
"is_accepted": res0[0].inner_results.is_accepted,
"target_log_prob": res0[
0
].inner_results.accepted_results.target_log_prob,
}
results_dict["xi"] = {
"is_accepted": res0[1].is_accepted,
"target_log_prob": res0[1].accepted_results.target_log_prob,
}
def get_move_results(results):
return {
"is_accepted": results.is_accepted,
"target_log_prob": results.accepted_results.target_log_prob,
"proposed_delta": tf.stack(
[
results.accepted_results.m,
results.accepted_results.t,
results.accepted_results.delta_t,
results.accepted_results.x_star,
]
),
}
def recurse(f, results):
if isinstance(results, GibbsKernelResults):
return [recurse(f, x) for x in results.inner_results]
return f(results)
res1 = res0[2].inner_results
results_dict["move/S->E"] = get_move_results(res1[0])
results_dict["move/E->I"] = get_move_results(res1[1])
results_dict["occult/S->E"] = get_move_results(res1[2])
results_dict["occult/E->I"] = get_move_results(res1[3])
return recurse(f, results)
return results_dict
# Build MCMC algorithm here. This will be run in bursts for memory economy
@tf.function(autograph=False, experimental_compile=True)
def sample(n_samples, init_state, previous_results=None):
def sample(n_samples, init_state, thin=0, previous_results=None):
with tf.name_scope("main_mcmc_sample_loop"):
init_state = init_state.copy()
......@@ -240,16 +254,18 @@ if __name__ == "__main__":
gibbs_schema = GibbsKernel(
target_log_prob_fn=logp,
kernel_list=[
(0, make_theta_kernel(init_state[0].shape, "theta")),
(1, make_xi_kernel(init_state[1].shape, "xi")),
(0, make_blk0_kernel(init_state[0].shape, "theta")),
(1, make_blk1_kernel(init_state[1].shape, "xi")),
(2, make_event_multiscan_kernel),
],
name="gibbs0",
)
samples, results, final_results = tfp.mcmc.sample_chain(
n_samples,
init_state,
kernel=gibbs_schema,
num_steps_between_results=thin,
previous_kernel_results=previous_results,
return_final_kernel_results=True,
trace_fn=trace_results_fn,
......@@ -262,84 +278,36 @@ if __name__ == "__main__":
####################################
# MCMC Control
NUM_BURSTS = config["mcmc"]["num_bursts"]
NUM_BURST_SAMPLES = config["mcmc"]["num_burst_samples"]
NUM_EVENT_TIME_UPDATES = config["mcmc"]["num_event_time_updates"]
THIN_BURST_SAMPLES = NUM_BURST_SAMPLES // config["mcmc"]["thin"]
NUM_SAVED_SAMPLES = THIN_BURST_SAMPLES * NUM_BURSTS
NUM_BURSTS = int(config["mcmc"]["num_bursts"])
NUM_BURST_SAMPLES = int(config["mcmc"]["num_burst_samples"])
NUM_EVENT_TIME_UPDATES = int(config["mcmc"]["num_event_time_updates"])
NUM_SAVED_SAMPLES = NUM_BURST_SAMPLES * NUM_BURSTS
# RNG stuff
tf.random.set_seed(2)
current_state = [
np.array([0.65, 0.48], dtype=DTYPE),
np.zeros(model.model["xi"](0.0).event_shape[-1] + 1, dtype=DTYPE),
np.zeros(model.model["xi"](0.0).event_shape[-1] + 3, dtype=DTYPE),
events,
]
# Output Files
posterior = h5py.File(
# Output file
samples, results, _ = sample(1, current_state)
posterior = Posterior(
os.path.join(
os.path.expandvars(config["output"]["results_dir"]),
config["output"]["posterior"],
),
"w",
rdcc_nbytes=1024 ** 2 * 400,
rdcc_nslots=100000,
libver="latest",
)
event_size = [NUM_SAVED_SAMPLES] + list(current_state[2].shape)
posterior.create_dataset("initial_state", data=initial_state)
# Ideally we insert the inference period into the posterior file
# as this allows us to post-attribute it to the data. Maybe better
# to simply save the data into it as well.
posterior.create_dataset("config", data=yaml.dump(config))
theta_samples = posterior.create_dataset(
"samples/theta",
[NUM_SAVED_SAMPLES, current_state[0].shape[0]],
dtype=np.float64,
)
xi_samples = posterior.create_dataset(
"samples/xi",
[NUM_SAVED_SAMPLES, current_state[1].shape[0]],
dtype=np.float64,
)
event_samples = posterior.create_dataset(
"samples/events",
event_size,
dtype=DTYPE,
chunks=(32, 32, 32, 1),
compression="szip",
compression_opts=("nn", 16),
sample_dict={"theta": (samples[0], (NUM_BURST_SAMPLES, 1)),
"xi": (samples[1], (NUM_BURST_SAMPLES, 1)),
"events": (samples[2], (NUM_BURST_SAMPLES, 64, 64, 1)),
},
results_dict=results,
num_samples=NUM_SAVED_SAMPLES,
)
output_results = [
posterior.create_dataset(
"results/theta", (NUM_SAVED_SAMPLES, 3), dtype=DTYPE,
),
posterior.create_dataset(
"results/xi", (NUM_SAVED_SAMPLES, 3), dtype=DTYPE,
),
posterior.create_dataset(
"results/move/S->E",
(NUM_SAVED_SAMPLES, 3 + num_metapop),
dtype=DTYPE,
),
posterior.create_dataset(
"results/move/E->I",
(NUM_SAVED_SAMPLES, 3 + num_metapop),
dtype=DTYPE,
),
posterior.create_dataset(
"results/occult/S->E", (NUM_SAVED_SAMPLES, 6), dtype=DTYPE
),
posterior.create_dataset(
"results/occult/E->I", (NUM_SAVED_SAMPLES, 6), dtype=DTYPE
),
]
posterior.swmr_mode = True
posterior._file.create_dataset("initial_state", data=initial_state)
posterior._file.create_dataset("config", data=yaml.dump(config))
print("Initial logpi:", logp(*current_state))
......@@ -347,68 +315,79 @@ if __name__ == "__main__":
# to disc, or else end OOM (even on a 32GB system).
# with tf.profiler.experimental.Profile("/tmp/tf_logdir"):
final_results = None
for i in tqdm.tqdm(range(NUM_BURSTS), unit_scale=NUM_BURST_SAMPLES):
for i in tqdm.tqdm(
range(NUM_BURSTS), unit_scale=NUM_BURST_SAMPLES * config["mcmc"]["thin"]
):
samples, results, final_results = sample(
NUM_BURST_SAMPLES,
init_state=current_state,
thin=config["mcmc"]["thin"] - 1,
previous_results=final_results,
)
current_state = [s[-1] for s in samples]
s = slice(
i * THIN_BURST_SAMPLES, i * THIN_BURST_SAMPLES + THIN_BURST_SAMPLES
)
idx = tf.constant(range(0, NUM_BURST_SAMPLES, config["mcmc"]["thin"]))
theta_samples[s, ...] = tf.gather(samples[0], idx)
xi_samples[s, ...] = tf.gather(samples[1], idx)
# cov = np.cov(
# np.log(theta_samples[: (i * NUM_BURST_SAMPLES + NUM_BURST_SAMPLES), ...]),
# rowvar=False,
# )
print(current_state[0].numpy(), flush=True)
# print(cov, flush=True)
# if (i * NUM_BURST_SAMPLES) > 1000 and np.all(np.isfinite(cov)):
# theta_scale = 2.38 ** 2 * cov / 2.0
start = perf_counter()
event_samples[s, ...] = tf.gather(samples[2], idx)
posterior.write_samples(
{"theta": samples[0], "xi": samples[1], "events": samples[2]},
first_dim_offset=i * NUM_BURST_SAMPLES,
)
posterior.write_results(results, first_dim_offset=i * NUM_BURST_SAMPLES)
end = perf_counter()
flat_results = flatten_results(results)
for i, ro in enumerate(output_results):
ro[s, ...] = tf.gather(flat_results[i], idx)
posterior.flush()
print("Storage time:", end - start, "seconds")
print(
"Acceptance theta:",
tf.reduce_mean(tf.cast(flat_results[0][:, 1], tf.float32)),
tf.reduce_mean(
tf.cast(results["theta"]["is_accepted"], tf.float32)
),
)
print(
"Acceptance xi:",
tf.reduce_mean(tf.cast(flat_results[1][:, 1], tf.float32)),
tf.reduce_mean(
tf.cast(results["theta"]["is_accepted"], tf.float32),
),
)
print(
"Acceptance move S->E:",
tf.reduce_mean(tf.cast(flat_results[2][:, 1], tf.float32)),
tf.reduce_mean(
tf.cast(results["move/S->E"]["is_accepted"], tf.float32)
),
)
print(
"Acceptance move E->I:",
tf.reduce_mean(tf.cast(flat_results[3][:, 1], tf.float32)),
tf.reduce_mean(
tf.cast(results["move/E->I"]["is_accepted"], tf.float32)
),
)
print(
"Acceptance occult S->E:",
tf.reduce_mean(tf.cast(flat_results[4][:, 1], tf.float32)),
tf.reduce_mean(
tf.cast(results["occult/S->E"]["is_accepted"], tf.float32)
),
)
print(