Commit 1d912c7a authored by Chris Jewell's avatar Chris Jewell
Browse files

Incorporated FNC framework for downloading and using Tier data

parent 2425aa7d
"""Covid data adaptors and support code"""
from covid.data.data import (
read_phe_cases,
read_mobility,
read_population,
read_traffic_flow,
)
from covid.data.tiers import TierData
from covid.data.area_code import AreaCodeData
__all__ = [
"TierData",
"AreaCodeData",
"read_phe_cases",
"read_mobility",
"read_population",
"read_traffic_flow",
]
"""Retrieves LAD19 area codes"""
import pandas as pd
import requests
from http import HTTPStatus
import json
from covid.data.util import (
merge_lad_codes,
check_lad19cd_format,
invalidInput,
format_output_filename,
)
class AreaCodeData:
def get(config):
"""
Retrieve a response containing a list of all the LAD codes
"""
settings = config["AreaCodeData"]
if settings["input"] == "url":
df = AreaCodeData.getURL(settings["address"], config)
df.columns = [x.lower() for x in df.columns]
elif settings["input"] == "json":
print(
"Reading Area Code data from local JSON file at",
settings["address"],
)
df = AreaCodeData.getJSON(settings["address"])
elif settings["input"] == "csv":
print(
"Reading Area Code data from local CSV file at",
settings["address"],
)
df = AreaCodeData.getCSV(settings["address"])
elif settings["input"] == "processed":
print(
"Reading Area Code data from preprocessed CSV at",
settings["address"],
)
df = pd.read_csv(settings["address"])
else:
invalidInput(settings["input"])
return df
def getConfig(config):
# Create a dataframe from the LADs specified in config
df = pd.DataFrame(config["lad19cds"], columns=["lad19cd"])
df["name"] = "n/a" # placeholder names for now.
return df
def getURL(url, config):
settings = config["AreaCodeData"]
fields = ["LAD19CD", "LAD19NM"]
api_params = {"outFields": str.join(",", fields), "f": "json"}
response = requests.get(url, params=api_params, timeout=5)
if response.status_code >= HTTPStatus.BAD_REQUEST:
raise RuntimeError(f"Request failed: {response.text}")
if settings["format"] == "ons":
print("Retrieving Area Code data from the ONS")
data = response.json()
if config["GenerateOutput"]["storeInputs"]:
fn = format_output_filename(
config["GenerateOutput"]["scrapedDataDir"]
+ "AreaCodeData_ONS.json",
config,
)
with open(fn, "w") as f:
json.dump(data, f)
df = AreaCodeData.getJSON(json.dumps(data))
return df
def cmlad11_to_lad19(cmlad11):
"""
Converts CM (census merged) 2011 codes to LAD 2019 codes
"""
# The below URL converts from CMLAD2011CD to LAD11CD
# url = "http://infuse.ukdataservice.ac.uk/showcase/mergedgeographies/Merging-Local-Authorities-Lookup.xlsx"
# response = requests.get(url, timeout=5)
# if response.status_code >= HTTPStatus.BAD_REQUEST:
# raise RuntimeError(f'Request failed: {response.text}')
#
# data = io.BytesIO(response.content)
#
# cm11_to_lad11_map = pd.read_excel(data)
# cached
cm11_to_lad11_map = pd.read_excel(
"data/Merging-Local-Authorities-Lookup.xlsx"
)
cm11_to_lad11_dict = dict(
zip(
cm11_to_lad11_map["Merging Local Authority Code"],
cm11_to_lad11_map["Standard Local Authority Code"],
)
)
lad19cds = cmlad11.apply(
lambda x: cm11_to_lad11_dict[x]
if x in cm11_to_lad11_dict.keys()
else x
)
mapping = {
"E06000028": "E06000058", # "Bournemouth" : "Bournemouth, Christchurch and Poole",
"E06000029": "E06000058", # "Poole" : "Bournemouth, Christchurch and Poole",
"E07000048": "E06000058", # "Christchurch" : "Bournemouth, Christchurch and Poole",
"E07000050": "E06000059", # "North Dorset" : "Dorset",
"E07000049": "E06000059", # "East Dorset" : "Dorset",
"E07000052": "E06000059", # "West Dorset" : "Dorset",
"E07000051": "E06000059", # "Purbeck" : "Dorset",
"E07000053": "E06000059", # "Weymouth and Portland" : "Dorset",
"E07000191": "E07000246", # "West Somerset" : "Somerset West and Taunton",
"E07000190": "E07000246", # "Taunton Deane" : "Somerset West and Taunton",
"E07000205": "E07000244", # "Suffolk Coastal" : "East Suffolk",
"E07000206": "E07000244", # "Waveney" : "East Suffolk",
"E07000204": "E07000245", # "St Edmundsbury" : "West Suffolk",
"E07000201": "E07000245", # "Forest Heath" : "West Suffolk",
"E07000097": "E07000242", # East Hertforshire
"E07000101": "E07000243", # Stevenage
"E07000100": "E07000240", # St Albans
"E08000020": "E08000037", # Gateshead
"E06000048": "E06000057", # Northumberland
"E07000104": "E07000241", # Welwyn Hatfield
}
lad19cds = lad19cds.apply(
lambda x: mapping[x] if x in mapping.keys() else x
)
lad19cds = merge_lad_codes(lad19cds)
return lad19cds
def getJSON(file):
data = pd.read_json(file, orient="index").T["features"][0]
data = [record["attributes"] for record in data]
df = pd.DataFrame.from_records(data)
return df
def getCSV(file):
return pd.read_csv(file)
def check(df, config):
"""
Check that data format seems correct
"""
check_lad19cd_format(df)
return True
def adapt(df, config):
"""
Adapt the area codes to the desired dataframe format
"""
output_settings = config["GenerateOutput"]
settings = config["AreaCodeData"]
output = settings["output"]
regions = settings["regions"]
if settings["input"] == "processed":
return df
if settings["format"].lower() == "ons":
df = AreaCodeData.adapt_ons(df, regions, output, config)
# if we have a predefined list of LADs, filter them down
if "lad19cds" in config:
df = df[[x in config["lad19cds"] for x in df.lad19cd.values]]
if output_settings["storeProcessedInputs"] and output != "None":
output = format_output_filename(output, config)
df.to_csv(output, index=False)
return df
def adapt_ons(df, regions, output, config):
colnames = ["lad19cd", "name"]
df.columns = colnames
filters = df["lad19cd"].str.contains(str.join("|", regions))
df = df[filters]
df["lad19cd"] = merge_lad_codes(df["lad19cd"])
df = df.drop_duplicates(subset="lad19cd")
return df
def process(config):
df = AreaCodeData.get(config)
df = AreaCodeData.adapt(df, config)
if AreaCodeData.check(df, config):
config["lad19cds"] = df["lad19cd"].tolist()
return df
"""Tests area codes"""
import pytest
def test_url():
from covid.data import AreaCodeData
config = {
"AreaCodeData": {
"input": "json",
"address": "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LAD_APR_2019_UK_NC/FeatureServer/0/query?where=1%3D1&outFields=LAD19CD,FID&returnGeometry=false&returnDistinctValues=true&orderByFields=LAD19CD&outSR=4326&f=json",
"format": "ons",
"output": "processed_data/processed_lad19cd.csv",
"regions": ["E"],
},
"GenerateOutput": {
"storeInputs": True,
"scrapedDataDir": "scraped_data",
"storeProcessedInputs": True,
},
"Global": {"prependID": False, "prependDate": False},
}
df = AreaCodeData.process(config)
print(df)
"""Methods to read in COVID-19 data and output
well-known formats"""
from warnings import warn
import numpy as np
import pandas as pd
__all__ = [
"read_mobility",
"read_population",
"read_traffic_flow",
"read_phe_cases",
]
def read_mobility(path):
"""Reads in CSV with mobility matrix.
CSV format: <To>,<id>,<id>,....
<id>,<val>,<val>,...
...
:returns: a numpy matrix sorted by <id> on both rows and cols.
"""
mobility = pd.read_csv(path)
mobility = mobility[
mobility["From"].str.startswith("E")
& mobility["To"].str.startswith("E")
]
mobility = mobility.sort_values(["From", "To"])
mobility = mobility.groupby(["From", "To"]).agg({"Flow": sum}).reset_index()
mob_matrix = mobility.pivot(index="To", columns="From", values="Flow")
mob_matrix[mob_matrix.isna()] = 0.0
return mob_matrix
def read_population(path):
"""Reads population CSV
:returns: a pandas Series indexed by LTLAs
"""
pop = pd.read_csv(path, index_col="lad19cd")
pop = pop[pop.index.str.startswith("E")]
pop = pop.sum(axis=1)
pop = pop.sort_index()
pop.name = "n"
return pop
def read_traffic_flow(
path: str, date_low: np.datetime64, date_high: np.datetime64
):
"""Read traffic flow data, returning a timeseries between dates.
:param path: path to a traffic flow CSV with <date>,<Car> columns
:returns: a Pandas timeseries
"""
commute_raw = pd.read_excel(
path, index_col="Date", skiprows=5, usecols=["Date", "Cars"]
)
commute_raw.index = pd.to_datetime(commute_raw.index, format="%Y-%m-%d")
commute_raw.sort_index(axis=0, inplace=True)
commute = pd.DataFrame(
index=np.arange(date_low, date_high, np.timedelta64(1, "D"))
)
commute = commute.merge(
commute_raw, left_index=True, right_index=True, how="left"
)
commute[commute.index < commute_raw.index[0]] = commute_raw.iloc[0, 0]
commute[commute.index > commute_raw.index[-1]] = commute_raw.iloc[-1, 0]
commute["Cars"] = commute["Cars"] / 100.0
commute.columns = ["percent"]
return commute
def _merge_ltla(series):
london = ["E09000001", "E09000033"]
corn_scilly = ["E06000052", "E06000053"]
series.loc[series.isin(london)] = ",".join(london)
series.loc[series.isin(corn_scilly)] = ",".join(corn_scilly)
return series
def read_phe_cases(
path, date_low, date_high, pillar="both", date_type="specimen", ltlas=None
):
"""Reads a PHE Anonymised Line Listing for dates in [low_date, high_date)
:param path: path to PHE Anonymised Line Listing Data
:param low_date: lower date bound
:param high_date: upper date bound
:returns: a Pandas data frame of LTLAs x dates
"""
date_type_map = {"specimen": "specimen_date", "report": "lab_report_date"}
pillar_map = {"both": None, "1": "Pillar 1", "2": "Pillar 2"}
line_listing = pd.read_csv(
path, usecols=[date_type_map[date_type], "LTLA_code", "pillar"]
)[[date_type_map[date_type], "LTLA_code", "pillar"]]
line_listing.columns = ["date", "lad19cd", "pillar"]
line_listing["lad19cd"] = _merge_ltla(line_listing["lad19cd"])
# Select dates
line_listing["date"] = pd.to_datetime(
line_listing["date"], format="%d/%m/%Y"
)
line_listing = line_listing[
(date_low <= line_listing["date"]) & (line_listing["date"] < date_high)
]
# Choose pillar
if pillar_map[pillar] is not None:
line_listing = line_listing.loc[
line_listing["pillar"] == pillar_map[pillar]
]
# Drop na rows
orig_len = line_listing.shape[0]
line_listing = line_listing.dropna(axis=0)
warn(
f"Removed {orig_len - line_listing.shape[0]} rows of {orig_len} \
due to missing values ({100. * (orig_len - line_listing.shape[0])/orig_len}%)"
)
# Aggregate by date/region
case_counts = line_listing.groupby(["date", "lad19cd"]).size()
case_counts.name = "count"
# Re-index
dates = pd.date_range(date_low, date_high, closed="left")
if ltlas is None:
ltlas = case_counts.index.levels[1]
index = pd.MultiIndex.from_product(
[dates, ltlas], names=["date", "lad19cd"]
)
case_counts = case_counts.reindex(index, fill_value=0)
return case_counts.reset_index().pivot(
index="lad19cd", columns="date", values="count"
)
def read_tier_restriction_data(
tier_restriction_csv, lad19cd_lookup, date_low, date_high
):
data = pd.read_csv(tier_restriction_csv)
data.loc[:, "date"] = pd.to_datetime(data["date"])
# Group merged ltlas
london = ["City of London", "Westminster"]
corn_scilly = ["Cornwall", "Isles of Scilly"]
data.loc[data["ltla"].isin(london), "ltla"] = ":".join(london)
data.loc[data["ltla"].isin(corn_scilly), "ltla"] = ":".join(corn_scilly)
# Fix up dodgy names
data.loc[
data["ltla"] == "Blackburn With Darwen", "ltla"
] = "Blackburn with Darwen"
# Merge
data = lad19cd_lookup.merge(
data, how="left", left_on="lad19nm", right_on="ltla"
)
# Re-index
data.index = pd.MultiIndex.from_frame(data[["date", "lad19cd"]])
data = data[["tier_2", "tier_3", "national_lockdown"]]
data = data[~data.index.duplicated()]
dates = pd.date_range(date_low, date_high - pd.Timedelta(1, "D"))
lad19cd = lad19cd_lookup["lad19cd"].sort_values().unique()
new_index = pd.MultiIndex.from_product([dates, lad19cd])
data = data.reindex(new_index, fill_value=0.0)
warn(f"Tier summary: {np.mean(data, axis=0)}")
# Pack into [T, M, V] array.
arr_data = data.to_xarray().to_array()
return np.transpose(arr_data, axes=[1, 2, 0])
def read_challen_tier_restriction(tier_restriction_csv, date_low, date_high):
tiers = pd.read_csv(tier_restriction_csv)
tiers["date"] = pd.to_datetime(tiers["date"], format="%Y-%m-%d")
tiers["code"] = _merge_ltla(tiers["code"])
# Separate out December tiers
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "three"),
"tier",
] = "dec_three"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "two"),
"tier",
] = "dec_two"
tiers.loc[
(tiers["date"] > np.datetime64("2020-12-02"))
& (tiers["tier"] == "one"),
"tier",
] = "dec_one"
index = pd.MultiIndex.from_frame(tiers[["date", "code", "tier"]])
index = index.sort_values()
index = index[~index.duplicated()]
ser = pd.Series(1.0, index=index, name="value")
ser = ser[date_low : (date_high - np.timedelta64(1, "D"))]
xarr = ser.to_xarray()
xarr.data[np.isnan(xarr.data)] = 0.0
return xarr.loc[..., ["two", "three", "dec_two", "dec_three"]]
"""Tests Tier Data"""
import numpy as np
from covid.data import TierData
def test_url_tier_data():
config = {
"AreaCodeData": {
"input": "json",
"address": "https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/LAD_APR_2019_UK_NC/FeatureServer/0/query?where=1%3D1&outFields=LAD19CD,LAD19NM&returnGeometry=false&returnDistinctValues=true&orderByFields=LAD19CD&outSR=4326&f=json",
"format": "ons",
"output": "processed_data/processed_lad19cd.csv",
"regions": ["E"],
},
"TierData": {
"input": "api",
"address": None,
"format": "api",
},
"GenerateOutput": {
"storeInputs": True,
"scrapedDataDir": "scraped_data",
"storeProcessedInputs": True,
},
"Global": {
"prependID": False,
"prependDate": False,
"inference_period": ["2020-10-12", "2021-01-04"],
},
}
xarr = TierData.process(config)
print("xarr", xarr)
np.testing.assert_array_equal(xarr.shape, [315, 84, 6])
"""Import COVID Tier data"""
import numpy as np
import pandas as pd
from covid.data.area_code import AreaCodeData
from covid.data.util import get_date_low_high, invalidInput, merge_lad_codes
class TierData:
def get(config):
"""
Retrieve an xarray DataArray of the tier data
"""
settings = config["TierData"]
if settings["input"] == "csv":
df = TierData.getCSV(settings["address"])
elif settings["input"] == "api":
df = TierData.getCSV(
"https://api.coronavirus.data.gov.uk/v2/data?areaType=ltla&metric=alertLevel&format=csv"
)
else:
invalidInput(settings["input"])
return df
def getCSV(file):
"""
Read TierData CSV from file
"""
return pd.read_csv(file)
def check(xarray, config):
"""
Check the data format
"""
return True
def adapt(df, config):
"""
Adapt the dataframe to the desired format.
"""
settings = config["TierData"]
# TODO this key might not be stored in the config file
# if it's not, we need to grab it using AreaCodeData
if "lad19cds" not in config:
areacodes = AreaCodeData.process(config)["lad19cd"]
else:
areacodes = config["lad19cds"]
# Below is assuming inference_period dates
date_low, date_high = get_date_low_high(config)
if settings["format"].lower() == "tidy":
xarray = TierData.adapt_xarray(
df, date_low, date_high, areacodes, settings
)
elif settings["format"].lower() == "api":
xarray = TierData.adapt_api_xarray(
df, date_low, date_high, areacodes, settings
)
return xarray
def adapt_api_xarray(tiers, date_low, date_high, lads, settings):
"""
Adapt web-api to desired format
"""
tiers["date"] = pd.to_datetime(tiers["date"], format="%Y-%m-%d")
tiers["lad19cd"] = merge_lad_codes(tiers["areaCode"])
tiers["alert_level"] = tiers["alertLevel"]
tiers = tiers[["date", "lad19cd", "alert_level"]]
if len(lads) > 0:
tiers = tiers[tiers["lad19cd"].isin(lads)]
date_range = pd.date_range(date_low, date_high - np.timedelta64(1, "D"))
def interpolate(df):
df.index = pd.Index(pd.to_datetime(df["date"]), name="date")
df = df.drop(columns="date").sort_index()
df = df.reindex(date_range)
df["alert_level"] = (
df["alert_level"].ffill().backfill().astype("int")
)
return df[["alert_level"]]
tiers = tiers.groupby(["lad19cd"]).apply(interpolate)
tiers = tiers.reset_index()
tiers.columns = ["lad19cd", "date", "alert_level"]
index = pd.MultiIndex.from_frame(tiers)
index = index.sort_values()
index = index[~index.duplicated()]
ser = pd.Series(1, index=index, name="value")
ser = ser.loc[
pd.IndexSlice[:, date_low : (date_high - np.timedelta64(1, "D")), :]
]
xarr = ser.to_xarray()
xarr.data[np.isnan(xarr.data)] = 0.0
# return [T, M, V] structure
return np.transpose(xarr, axes=[1, 0, 2])
def adapt_xarray(tiers, date_low, date_high, lads, settings):
"""
Adapt to a filtered xarray object
"""
tiers["date"] = pd.to_datetime(tiers["date"], format="%Y-%m-%d")
tiers["code"] = merge_lad_codes(tiers["code"])
# Separate out December tiers
date_mask = tiers["date"] > np.datetime64("2020-12-02")
tiers.loc[
date_mask & (tiers["tier"] == "three"),
"tier",
] = "dec_three"
tiers.loc[
date_mask & (tiers["tier"] == "two"),
"tier",
] = "dec_two"
tiers.loc[
date_mask & (tiers["tier"] == "one"),
"tier",