Commit 2fe0a475 authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'master' into mod-hmc-gibbs

parents 13b9c97f d76bb321
"""Loads COVID-19 case data"""
import time
from warnings import warn
import requests
import json
......@@ -45,17 +46,30 @@ class CasesData:
"""
Placeholder, in case we wish to interface with an API.
"""
response = requests.get(url)
content = json.loads(response.content)
df = pd.read_json(json.dumps(content["body"]))
return df
max_tries = 5
secs = 5
for i in range(max_tries):
try:
print("Attempting to download...", end="", flush=True)
response = requests.get(url)
content = json.loads(response.content)
df = pd.DataFrame.from_dict(content["body"])
print("Success", flush=True)
return df
except (requests.ConnectionError, requests.RequestException) as e:
print("Failed", flush=True)
print(e)
time.sleep(secs * 2 ** i)
raise ConnectionError(
f"Data download timed out after {max_tries} attempts"
)
def getCSV(file):
"""
Format as per linelisting
"""
columns = ["pillar", "LTLA_code", "specimen_date", "lab_report_date"]
dfs = pd.read_csv(file, chunksize=50000, iterator=True, usecols=columns)
dfs = pd.read_csv(file, chunksize=50000, iterator=True)
df = pd.concat(dfs)
return df
......@@ -117,7 +131,7 @@ class CasesData:
measure,
areacodes,
)
elif (settings["input"] == "url") and (settings["format"] == "json"):
elif settings["format"].lower() == "gov":
df = CasesData.adapt_gov_api(
df, date_low, date_high, pillars, measure, areacodes
)
......@@ -149,6 +163,8 @@ class CasesData:
"""
Adapt the line listing data to the desired dataframe format.
"""
df = df[["pillar", "LTLA_code", "specimen_date", "lab_report_date"]]
# Clean missing values
df.dropna(inplace=True)
df = df.rename(columns={"LTLA_code": "lad19cd"})
......
......@@ -163,7 +163,6 @@ def CovidUK(covariates, initial_state, initial_step, num_steps):
C = tf.convert_to_tensor(covariates["C"], dtype=DTYPE)
C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE))
Cstar = C + tf.transpose(C)
Cstar = tf.linalg.set_diag(Cstar, -tf.reduce_sum(C, axis=-2))
......@@ -263,7 +262,6 @@ def next_generation_matrix_fn(covar_data, param):
def fn(t, state):
C = tf.convert_to_tensor(covar_data["C"], dtype=DTYPE)
C = tf.linalg.set_diag(C, -tf.reduce_sum(C, axis=-2))
C = tf.linalg.set_diag(C, tf.zeros(C.shape[0], dtype=DTYPE))
Cstar = C + tf.transpose(C)
Cstar = tf.linalg.set_diag(Cstar, -tf.reduce_sum(C, axis=-2))
......
......@@ -64,9 +64,7 @@ if __name__ == "__main__":
data_args.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
)
data_args.add_argument(
"--aws", action='store_true', help="Push to AWS"
)
data_args.add_argument("--aws", action="store_true", help="Push to AWS")
cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config)
......
......@@ -6,6 +6,7 @@ import yaml
from datetime import datetime
from uuid import uuid1
import json
import s3fs
import netCDF4 as nc
import s3fs
import pandas as pd
......@@ -230,6 +231,7 @@ def run_pipeline(global_config, results_directory, cli_options):
wd("summary_longformat.xlsx"),
)(summary_longformat)
# Copy results to AWS
@rf.active_if(cli_options.aws)
@rf.transform(
input=[
......@@ -248,7 +250,6 @@ def run_pipeline(global_config, results_directory, cli_options):
obj_path = f"{config['bucket']}/{output_file}"
s3 = s3fs.S3FileSystem(profile=config["profile"])
if not s3.exists(obj_path):
print(f"Copy {input_file} to {obj_path}", flush=True)
s3.put(input_file, obj_path)
else:
warnings.warn(f"Path '{obj_path}' already exists, not uploading.")
......
......@@ -592,11 +592,8 @@ if __name__ == "__main__":
parser.add_argument(
"-o", "--output", type=str, help="Output file", required=True
)
parser.add_argument(
"data_file",
type=str,
help="Data pickle file",
)
parser.add_argument("data_file", type=str, help="Data NetCDF file")
args = parser.parse_args()
with open(args.config, "r") as f:
......
......@@ -3,6 +3,7 @@
import numpy as np
import xarray
import pickle as pkl
import pandas as pd
import tensorflow as tf
from covid import model_spec
......@@ -75,11 +76,17 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
origin_date = np.array(cases.coords["time"][0])
dates = np.arange(
origin_date + np.timedelta64(initial_step, "D"),
origin_date,
origin_date + np.timedelta64(initial_step + num_steps, "D"),
np.timedelta64(1, "D"),
)
covar_data["weekday"] = xarray.DataArray(
(pd.to_datetime(dates).weekday < 5).astype(model_spec.DTYPE),
coords=[dates],
dims=["prediction_time"],
)
estimated_init_state, predicted_events = predicted_incidence(
samples, initial_state, covar_data, initial_step, num_steps
)
......@@ -89,7 +96,7 @@ def predict(data, posterior_samples, output_file, initial_step, num_steps):
coords=[
np.arange(predicted_events.shape[0]),
covar_data.coords["location"],
dates,
dates[initial_step:],
np.arange(predicted_events.shape[3]),
],
dims=("iteration", "location", "time", "event"),
......
......@@ -13,7 +13,7 @@ ProcessData:
address: https://api.coronavirus.data.gov.uk/v2/data?areaType=ltla&metric=newCasesBySpecimenDate&format=json
pillars: None # Capability to filter Pillar 1 and 2 testing data from PHE confidential line listing
measure: None # Capability to filter date of test report from PHE confidential line listing
format: json
format: gov
AreaCodeData:
input: json
......@@ -40,4 +40,8 @@ ThinPosterior: # Post-process further chain thinning HDF5 -> .pkl.
Geopackage: # covid.tasks.summary_geopackage
base_geopackage: data/UK2019mod_pop.gpkg
base_layer: UK2019mod_pop_xgen
\ No newline at end of file
base_layer: UK2019mod_pop_xgen
#AWSS3:
# bucket: mybucket
# profile: myprofile
......@@ -37,10 +37,13 @@ black = "^20.8b1"
pytest = "^6.2.1"
jedi = "^0.17.2"
[tool.poetry-dynamic-versioning]
enable = true
[tool.black]
line-length = 80
include = '\.pyi?$'
[build-system]
requires = ["poetry-core>=1.0.0"]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"]
build-backend = "poetry.core.masonry.api"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment