Commit 167f49a0 authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'master' into ghmaster

parents 2a381548 d975a3fb
*~
__pycache__
poetry.lock
.ruffus_history.sqlite
# covid19uk: Bayesian stochastic spatial modelling for COVID-19 in the UK # covid19uk: Bayesian stochastic spatial modelling for COVID-19 in the UK
## Files This Python package implements a spatial stochastic SEIR model for COVID-19 in the UK,
using Local Authority District level positive test data, population data, and mobility
information. Details of the model implementation may be found in `doc/lancs_space_model_concept.pdf`.
* `covid` Python package
* `model_spec.py` defines the CovidUK model using `tfp.JointDistributionNamed`, plus helper functions
* `inference.py` demonstrates MCMC model fitting the model
* `simulate.py` demontrates simulating from the model
* `example_config.yaml` example configuration file containing data paths and MCMC settings
* `data` a directory containing example data (see below)
* `environment.yaml` conda description of the required environment. Create with `conda create -f environment.yaml`
8 `summary.py` python script to summarise MCMC results into a Geopkg file.
## Example data files
* `data/example_cases.csv` a file containing example case data for 43 local authorities in England collected and present of PHE's [website](https://coronavirus.data.gov.uk)
* `data/example_population.csv` a file containing local authority population data in the UK, taken from ONS prediction for December 2019
* `data/example_mobility.csv` inter local authority mobility matrix taken from UK Census 2011 commuting data
* `data/example_traffic_flow` a relative measure of traffic flow taken from mobility metrics from providers such as Google and Facebook. Data have been smoothed to represent a summary of the original data.
## Example workflow ## Workflow
The entire analysis chain, from case data through parameter inference and predictive
simulation to results summarised as long-format XLSX and geopackage documents.
The pipeline is run using the [`ruffus`](http://ruffus.org.uk) computational pipeline library.
The library relies heavily on [TensorFlow](https://tensorflow.org) and
[TensorFlow Probability](https://tensorflow.org/probability) machine learning libraries, and is
optimised for GPU acceleration (tested on NVIDIA K40 and V100 cards). This package also imports
an experimental library [`gemlib`](http://fhm-chicas-code.lancs.ac.uk/GEM/gemlib) hosted at Lancaster University.
This library is under active development as part of the [Wellcome Trust](https://wellcome.ac.uk) `GEM` project.
The pipeline gathers data from [the official UK Government website for data and insights on Coronavirus]
(https://coronavirus.data.gov.uk), together with population and mobility data taken from the [UK
Office for National Statistics Open Geography Portal](https://geoportal.statistics.gov.uk).
### Quickstart
```bash ```bash
$ conda env create --prefix=./env -f environment.txt $ poetry install # Python dependencies
$ conda activate ./env $ python -m covid.pipeline --config example_config.yaml --results-dir <output_dir>
$ python inference.py
$ python summary.py
``` ```
The global pipeline configuration file `example_config.yaml` contains sections for pipeline
stages where required. See file for documentation.
### Model specification
The `covid.model_spec` module contains the model specified as a `tensorflow_probability.distributions.JointDistributionNamed`
instance `Covid19UK`. This module also contains a model version number, and constants such as the stoichiometry matrix
characterising the state transition model, an accompanying next generation matrix function, a function to assemble data
specific to the model, and a function to initialise censored event data explored by the MCMC inference algorithm.
### Pipeline stages
Each pipeline stage loads input and saves output to disc. This is inherent to the `ruffus` pipeline
architecture, and provides the possibility to run different stages of the pipeline manually, as well as
introspection of data passed between each stage.
1. Data assembly: `covid.tasks.assemble_data` downloads or loads data from various sources, clips
to the desired date range require needed, and bundles into a pickled Python dictionary `<output_dir>/pipeline_data.pkl`.
2. Inference: `covid.tasks.mcmc` runs the data augmentation MCMC algorithm described in the concept note, producing
a (large!) HDF5 file containing draws from the joint posterior distribution `posterior.hd5`.
3. Sample thinning: `covid.tasks.thin_posterior` further thins the posterior draws contained in the HDF5 file into a (much
smaller) pickled Python dictionary `<output_dir>/thin_samples.pkl`
4. Next generation matrix: `covid.tasks.next_generation_matrix` computes the posterior next generation matrix for the
epidemic, from which measures of Local Authority District level and National-level reproduction number can be derived.
This posterior is saved in `<output_dir>/ngm.pkl`.
5. National Rt: `covid.tasks.overall_rt` evaluates the dominant eigenvalue of the next generation matrix samples using
power iteration and Rayleigh Quotient method. The dominant eigenvalue of the inter-LAD next generation matrix gives the
national reproduction number estimate.
6. Prediction: `covid.tasks.predict` calculates the Bayesian predictive distribution of the epidemic given the observed
data and joint posterior distribution. This is used in two ways:
a. in-sample predictions are made for the latest 7 and 14 day time intervals in the observed data time window. These
are saved as `<output_dir>/insample7.pkl` and `<output_dir>/insample14.pkl` `xarray` data structures.
b. medium-term predictions are made by simulating forward 56 days from the last+1 day of the observed data time window. These is saved as `<output_dir>/medium_term.pkl` `xarray` data structure.
7. Summary output:
a. LAD-level reproduction number: `covid.tasks.summarize.rt` takes the column sums of the next generation matrix as the
LAD-level reproduction number. This is saved in `<output_dir>/rt_summary.csv`.
b. Incidence summary: `covid.tasks.summarize.infec_incidence` calculates mean and quantile information for the medium term prediction, `<output_dir>/infec_incidence_summary.csv`.
c. Prevalence summary: `covid.tasks.summarize.prevalence` calculated the predicted prevalence of COVID-19 infection
(model E+I compartments) at LAD level, `<output_dir>/prevalence_summary.csv`.
d. Population attributable risk fraction for infection: `covid.tasks.within_between` calculates the population
attributable fraction of within-LAD versus between-LAD infection risk, `<output_dir>/within_between_summary.csv`.
e. Case exceedance: `covid.tasks.case_exceedance` calculates the probability that observed cases in the last 7 and 14
days of the observed timeseries exceeding the predictive distribution. This highlights regions that are behaving
atypically given the model, `<output_dir>/exceedance_summary.csv`.
8. In-sample predictive plots: `covid.tasks.insample_predictive_timeseries` plots graphs of the in-sample predictive
distribution for the last 7 and 14 days within the observed data time window, `<output_dir>/insample_plots7` and
`<output_dir>/insample_plots14`.
9. Geopackage summary: `covid.tasks.summary_geopackage` assembles summary information into a `geopackage` GIS file,
`<output_dir>/prediction.pkg`.
10. Long format summary: `covid.tasks.summary_longformat` assembles reproduction number, observed data, in-sample, and medium-term
predictive incidence and prevalence (per 100000 people) into a long-format XLSX file.
## COVID-19 Lancaster University data statement ## COVID-19 Lancaster University data statement
...@@ -40,3 +103,17 @@ UTLA: Upper Tier Local Authority ...@@ -40,3 +103,17 @@ UTLA: Upper Tier Local Authority
LAD: Local Authority District LAD: Local Authority District
### Files
* `covid` Python package
* `example_config.yaml` example configuration file containing data paths and MCMC settings
* `data` a directory containing example data (see below)
* `pyproject.py` a PEP518-compliant file describing the `poetry` build system and dependencies.
## Example data files
* `data/example_cases.csv` a file containing example case data for 43 local authorities in England collected and present of PHE's [website](https://coronavirus.data.gov.uk)
* `data/c2019modagepop.csv` a file containing local authority population data in the UK, taken from ONS prediction for December 2019. Local authorities [City of Westminster, City of London] and [Cornwall, Isles of Scilly] have been aggregated to meet commute data processing requirements.
* `data/mergedflows.csv` inter local authority mobility matrix taken from UK Census 2011 commuting data and aggregated up from Middle Super Output Area level (respecting aggregated LADs as above).
* `data/UK2019mod_pop.gpkg` a geopackage containing UK Local Authority Districts (2019) polygons together with population and areal metrics.
"""General argument parsing for all scripts"""
import argparse
def cli_args(args=None):
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="configuration file")
parser.add_argument(
"-r",
"--results",
type=str,
default=None,
help="override config file results dir",
)
args = parser.parse_args(args)
return args
"""Methods to read in COVID-19 data and output
well-known formats"""
from warnings import warn
import numpy as np
import pandas as pd
__all__ = [
"read_mobility",
"read_population",
"read_traffic_flow",
"read_phe_cases",
]
def read_mobility(path):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
<id>,<val>,<val>,...
...
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
mobility = mobility[
mobility["From"].str.startswith("E")
& mobility["To"].str.startswith("E")
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
def read_population(path):
"""Reads population CSV
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.str.startswith("E")]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
def read_traffic_flow(
path: str, date_low: np.datetime64, date_high: np.datetime64
):
"""Read traffic flow data, returning a timeseries between dates.
:param path: path to a traffic flow CSV with <date>,<Car> columns
:returns: a Pandas timeseries
"""
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
commute_raw.index = pd.to_datetime(commute_raw.index, format="%Y-%m-%d")
commute_raw.sort_index(axis=0, inplace=True)
commute = pd.DataFrame(
index=np.arange(date_low, date_high, np.timedelta64(1, "D"))
)
commute = commute.merge(
commute_raw, left_index=True, right_index=True, how="left"
)
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
def _merge_ltla(series):
london = ["E09000001", "E09000033"]
corn_scilly = ["E06000052", "E06000053"]
series.loc[series.isin(london)] = ",".join(london)
series.loc[series.isin(corn_scilly)] = ",".join(corn_scilly)
return series
def read_phe_cases(
path, date_low, date_high, pillar="both", date_type="specimen", ltlas=None
):
"""Reads a PHE Anonymised Line Listing for dates in [low_date, high_date)
:param path: path to PHE Anonymised Line Listing Data
:param low_date: lower date bound
:param high_date: upper date bound
:returns: a Pandas data frame of LTLAs x dates
"""
date_type_map = {"specimen": "specimen_date", "report": "lab_report_date"}
pillar_map = {"both": None, "1": "Pillar 1", "2": "Pillar 2"}
line_listing = pd.read_csv(
path, usecols=[date_type_map[date_type], "LTLA_code", "pillar"]
)[[date_type_map[date_type], "LTLA_code", "pillar"]]
line_listing.columns = ["date", "lad19cd", "pillar"]
line_listing["lad19cd"] = _merge_ltla(line_listing["lad19cd"])
# Select dates
line_listing["date"] = pd.to_datetime(
line_listing["date"], format="%d/%m/%Y"
)
line_listing = line_listing[
(date_low <= line_listing["date"]) & (line_listing["date"] < date_high)
]
# Choose pillar
if pillar_map[pillar] is not None:
line_listing = line_listing.loc[
line_listing["pillar"] == pillar_map[pillar]
]
# Drop na rows
orig_len = line_listing.shape[0]
line_listing = line_listing.dropna(axis=0)
warn(
f"Removed {orig_len - line_listing.shape[0]} rows of {orig_len} \
due to missing values ({100. * (orig_len - line_listing.shape[0])/orig_len}%)"
)
# Aggregate by date/region
case_counts = line_listing.groupby(["date", "lad19cd"]).size()
case_counts.name = "count"
# Re-index
dates = pd.date_range(date_low, date_high, closed="left")
if ltlas is None:
ltlas = case_counts.index.levels[1]
index = pd.MultiIndex.from_product(
[dates, ltlas], names=["date", "lad19cd"]
)
case_counts = case_counts.reindex(index, fill_value=0)
return case_counts.reset_index().pivot(
index="lad19cd", columns="date", values="count"
)
def read_tier_restriction_data(
tier_restriction_csv, lad19cd_lookup, date_low, date_high
):
data = pd.read_csv(tier_restriction_csv)
data.loc[:, "date"] = pd.to_datetime(data["date"])
# Group merged ltlas
london = ["City of London", "Westminster"]
corn_scilly = ["Cornwall", "Isles of Scilly"]
data.loc[data["ltla"].isin(london), "ltla"] = ":".join(london)
data.loc[data["ltla"].isin(corn_scilly), "ltla"] = ":".join(corn_scilly)
# Fix up dodgy names
data.loc[
data["ltla"] == "Blackburn With Darwen", "ltla"
] = "Blackburn with Darwen"
# Merge
data = lad19cd_lookup.merge(
data, how="left", left_on="lad19nm", right_on="ltla"
)
# Re-index
data.index = pd.MultiIndex.from_frame(data[["date", "lad19cd"]])
data = data[["tier_2", "tier_3", "national_lockdown"]]
data = data[~data.index.duplicated()]
dates = pd.date_range(date_low, date_high - pd.Timedelta(1, "D"))
lad19cd = lad19cd_lookup["lad19cd"].sort_values().unique()
new_index = pd.MultiIndex.from_product([dates, lad19cd])
data = data.reindex(new_index, fill_value=0.0)
warn(f"Tier summary: {np.mean(data, axis=0)}")
# Pack into [T, M, V] array.
arr_data = data.to_xarray().to_array()
return np.transpose(arr_data, axes=[1, 2, 0])
def read_challen_tier_restriction(tier_restriction_csv, date_low, date_high):
tiers = pd.read_csv(tier_restriction_csv)
tiers["date"] = pd.to_datetime(tiers["date"], format="%Y-%m-%d")
tiers["code"] = _merge_ltla(tiers["code"])
# Separate out December tiers
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "three"),
"tier",
] = "dec_three"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "two"),
"tier",
] = "dec_two"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "one"),
"tier",
] = "dec_one"
index = pd.MultiIndex.from_frame(tiers[["date", "code", "tier"]])
index = index.sort_values()
index = index[~index.duplicated()]
ser = pd.Series(1.0, index=index, name="value")
ser = ser[date_low : (date_high - np.timedelta64(1, "D"))]
xarr = ser.to_xarray()
xarr.data[np.isnan(xarr.data)] = 0.0
return xarr.loc[..., ["two", "three", "dec_two", "dec_three"]]
"""Covid data adaptors and support code"""
from covid.data.data import (
read_mobility,
read_population,
read_traffic_flow,
)
from covid.data.tiers import TierData
from covid.data.area_code import AreaCodeData
from covid.data.case_data import CasesData
__all__ = [
"TierData",
"AreaCodeData",
"CasesData",
"read_mobility",
"read_population",
"read_traffic_flow",
]
"""Retrieves LAD19 area codes"""
import pandas as pd
import requests
from http import HTTPStatus
import json
from covid.data.util import (
merge_lad_codes,
check_lad19cd_format,
invalidInput,
format_output_filename,
)
class AreaCodeData:
def get(config):
"""
Retrieve a response containing a list of all the LAD codes
"""
settings = config["AreaCodeData"]
if settings["input"] == "url":
df = AreaCodeData.getURL(settings["address"], config)
df.columns = [x.lower() for x in df.columns]
elif settings["input"] == "json":
print(
"Reading Area Code data from local JSON file at",
settings["address"],
)
df = AreaCodeData.getJSON(settings["address"])
elif settings["input"] == "csv":
print(
"Reading Area Code data from local CSV file at",
settings["address"],
)
df = AreaCodeData.getCSV(settings["address"])
elif settings["input"] == "processed":
print(
"Reading Area Code data from preprocessed CSV at",
settings["address"],
)
df = pd.read_csv(settings["address"])
else:
invalidInput(settings["input"])
return df
def getConfig(config):
# Create a dataframe from the LADs specified in config
df = pd.DataFrame(config["lad19cds"], columns=["lad19cd"])
df["name"] = "n/a" # placeholder names for now.
return df
def getURL(url, config):
settings = config["AreaCodeData"]
fields = ["LAD19CD", "LAD19NM"]
api_params = {"outFields": str.join(",", fields), "f": "json"}
response = requests.get(url, params=api_params, timeout=5)
if response.status_code >= HTTPStatus.BAD_REQUEST:
raise RuntimeError(f"Request failed: {response.text}")
if settings["format"] == "ons":
print("Retrieving Area Code data from the ONS")
data = response.json()
df = AreaCodeData.getJSON(json.dumps(data))
return df
def cmlad11_to_lad19(cmlad11):
"""
Converts CM (census merged) 2011 codes to LAD 2019 codes
"""
# The below URL converts from CMLAD2011CD to LAD11CD
# url = "http://infuse.ukdataservice.ac.uk/showcase/mergedgeographies/Merging-Local-Authorities-Lookup.xlsx"
# response = requests.get(url, timeout=5)
# if response.status_code >= HTTPStatus.BAD_REQUEST:
# raise RuntimeError(f'Request failed: {response.text}')
#
# data = io.BytesIO(response.content)
#
# cm11_to_lad11_map = pd.read_excel(data)
# cached
cm11_to_lad11_map = pd.read_excel(
"data/Merging-Local-Authorities-Lookup.xlsx"
)
cm11_to_lad11_dict = dict(
zip(
cm11_to_lad11_map["Merging Local Authority Code"],
cm11_to_lad11_map["Standard Local Authority Code"],
)
)
lad19cds = cmlad11.apply(
lambda x: cm11_to_lad11_dict[x]
if x in cm11_to_lad11_dict.keys()
else x
)
mapping = {
"E06000028": "E06000058", # "Bournemouth" : "Bournemouth, Christchurch and Poole",
"E06000029": "E06000058", # "Poole" : "Bournemouth, Christchurch and Poole",
"E07000048": "E06000058", # "Christchurch" : "Bournemouth, Christchurch and Poole",
"E07000050": "E06000059", # "North Dorset" : "Dorset",
"E07000049": "E06000059", # "East Dorset" : "Dorset",
"E07000052": "E06000059", # "West Dorset" : "Dorset",
"E07000051": "E06000059", # "Purbeck" : "Dorset",
"E07000053": "E06000059", # "Weymouth and Portland" : "Dorset",
"E07000191": "E07000246", # "West Somerset" : "Somerset West and Taunton",
"E07000190": "E07000246", # "Taunton Deane" : "Somerset West and Taunton",
"E07000205": "E07000244", # "Suffolk Coastal" : "East Suffolk",
"E07000206": "E07000244", # "Waveney" : "East Suffolk",
"E07000204": "E07000245", # "St Edmundsbury" : "West Suffolk",
"E07000201": "E07000245", # "Forest Heath" : "West Suffolk",
"E07000097": "E07000242", # East Hertforshire
"E07000101": "E07000243", # Stevenage
"E07000100": "E07000240", # St Albans
"E08000020": "E08000037", # Gateshead
"E06000048": "E06000057", # Northumberland
"E07000104": "E07000241", # Welwyn Hatfield
}
lad19cds = lad19cds.apply(
lambda x: mapping[x] if x in mapping.keys() else x
)
lad19cds = merge_lad_codes(lad19cds)
return lad19cds
def getJSON(file):
data = pd.read_json(file, orient="index").T["features"][0]
data = [record["attributes"] for record in data]
df = pd.DataFrame.from_records(data)
return df
def getCSV(file):
return pd.read_csv(file)
def check(df, config):
"""
Check that data format seems correct
"""
check_lad19cd_format(df)
return True
def adapt(df, config):
"""
Adapt the area codes to the desired dataframe format
"""
settings = config["AreaCodeData"]
regions = settings["regions"]
if settings["input"] == "processed":
return df
if settings["format"].lower() == "ons":
df = AreaCodeData.adapt_ons(df, regions)
# if we have a predefined list of LADs, filter them down
if "lad19cds" in config:
df = df[[x in config["lad19cds"] for x in df.lad19cd.values]]
return df
def adapt_ons(df, regions):
colnames = ["lad19cd", "name"]
df.columns = colnames
filters = df["lad19cd"].str.contains(str.join("|", regions))
df = df[filters]
df["lad19cd"] = merge_lad_codes(df["lad19cd"])
df = df.drop_duplicates(subset="lad19cd")
return df
def process(config):
df = AreaCodeData.get(config)
df = AreaCodeData.adapt(df, config)
if AreaCodeData.check(df, config):
config["lad19cds"] = df["lad19cd"].tolist()
return df
"""Tests area codes"""
import pytest
def test_url():
from covid.data import AreaCodeData
config = {
"AreaCodeData": {
"input": "json",
"address": "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LAD_APR_2019_UK_NC/FeatureServer/0/query?where=1%3D1&outFields=LAD19CD,FID&returnGeometry=false&returnDistinctValues=true&orderByFields=LAD19CD&outSR=4326&f=json",
"format": "ons",
"output": "processed_data/processed_lad19cd.csv",
"regions": ["E"],
},
}
df = AreaCodeData.process(config)
print(df)
"""Loads COVID-19 case data"""
from warnings import warn
import requests
import json
import numpy as np
import pandas as pd
from covid.data.util import (
invalidInput,
get_date_low_high,
check_date_bounds,
check_date_format,
check_lad19cd_format,
merge_lad_codes,
)
from covid.data import AreaCodeData
class CasesData:
def get(config):
"""