Commit d8069a6a authored by Chris Jewell's avatar Chris Jewell
Browse files

Merge branch 'add-aws' into 'master'

Add option to push to AWS buckets

See merge request !34
parents bf52d237 2ea8e250
"""A Ruffus-ised pipeline for COVID-19 analysis"""
import os
from os.path import expandvars
import warnings
import yaml
import datetime
import s3fs
import ruffus as rf
from covid.ruffus_pipeline import run_pipeline
......@@ -61,6 +64,7 @@ if __name__ == "__main__":
data_args.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
)
data_args.add_argument("--aws", action="store_true", help="Push to AWS")
cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config)
......
"""Represents the analytic pipeline as a ruffus chain"""
import os
import warnings
import yaml
from datetime import datetime
from uuid import uuid1
import json
import s3fs
import netCDF4 as nc
import pandas as pd
import ruffus as rf
......@@ -184,15 +186,15 @@ def run_pipeline(global_config, results_directory, cli_options):
df.to_csv(output_file)
# Plot in-sample
@rf.transform(
input=[insample7, insample14],
filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"),
add_inputs=rf.add_inputs(process_data),
output="{path[0]}/insample_plots{LAG[0]}",
extras=["{LAG[0]}"],
)
def plot_insample_predictive_timeseries(input_files, output_dir, lag):
insample_predictive_timeseries(input_files, output_dir, lag)
# @rf.transform(
# input=[insample7, insample14],
# filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"),
# add_inputs=rf.add_inputs(process_data),
# output="{path[0]}/insample_plots{LAG[0]}",
# extras=["{LAG[0]}"],
# )
# def plot_insample_predictive_timeseries(input_files, output_dir, lag):
# insample_predictive_timeseries(input_files, output_dir, lag)
# Geopackage
rf.transform(
......@@ -228,4 +230,28 @@ def run_pipeline(global_config, results_directory, cli_options):
wd("summary_longformat.xlsx"),
)(summary_longformat)
# Copy results to AWS
@rf.active_if(cli_options.aws)
@rf.transform(
input=[
process_data,
run_mcmc,
insample7,
insample14,
medium_term,
reproduction_number,
],
filter=rf.formatter(),
output="{subdir[0][0]}/{basename[0]}{ext[0]}",
extras=[global_config["AWSS3"]],
)
def upload_to_aws(input_file, output_file, config):
obj_path = f"{config['bucket']}/{output_file}"
s3 = s3fs.S3FileSystem(profile=config["profile"])
if not s3.exists(obj_path):
s3.put(input_file, obj_path)
else:
warnings.warn(f"Path '{obj_path}' already exists, not uploading.")
rf.cmdline.run(cli_options)
......@@ -42,4 +42,8 @@ ThinPosterior: # Post-process further chain thinning HDF5 -> .pkl.
Geopackage: # covid.tasks.summary_geopackage
base_geopackage: data/UK2019mod_pop.gpkg
base_layer: UK2019mod_pop_xgen
\ No newline at end of file
base_layer: UK2019mod_pop_xgen
#AWSS3:
# bucket: mybucket
# profile: myprofile
......@@ -28,6 +28,7 @@ jedi = "^0.17.2"
XlsxWriter = "^1.3.7"
netCDF4 = "^1.5.6"
dask = {extras = ["array"], version = "^2021.2.0"}
s3fs = "^0.5.2"
[tool.poetry.dev-dependencies]
ipython = "^7.18.1"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment