Commit 16bad2b7 authored by Chris Jewell's avatar Chris Jewell
Browse files

Add push to AWS S3 option

CHANGES:

1. Add option to upload results to AWS S3 bucket
2. Remove generation of insample plots from pipeline
parent bf52d237
"""A Ruffus-ised pipeline for COVID-19 analysis""" """A Ruffus-ised pipeline for COVID-19 analysis"""
import os
from os.path import expandvars from os.path import expandvars
import warnings
import yaml import yaml
import datetime import datetime
import s3fs
import ruffus as rf import ruffus as rf
from covid.ruffus_pipeline import run_pipeline from covid.ruffus_pipeline import run_pipeline
...@@ -61,6 +64,9 @@ if __name__ == "__main__": ...@@ -61,6 +64,9 @@ if __name__ == "__main__":
data_args.add_argument( data_args.add_argument(
"--pillar", type=str, help="Pillar", choices=["both", "1", "2"] "--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
) )
data_args.add_argument(
"--aws", action='store_true', help="Push to AWS"
)
cli_options = argparser.parse_args() cli_options = argparser.parse_args()
global_config = _import_global_config(cli_options.config) global_config = _import_global_config(cli_options.config)
...@@ -99,3 +105,13 @@ if __name__ == "__main__": ...@@ -99,3 +105,13 @@ if __name__ == "__main__":
] ]
run_pipeline(global_config, cli_options.results_directory, cli_options) run_pipeline(global_config, cli_options.results_directory, cli_options)
if cli_options.aws is True:
bucket_name = global_config['AWSS3']['bucket']
obj_name = os.path.split(cli_options.results_directory)[1]
obj_path = f"{bucket_name}/{obj_name}"
s3 = s3fs.S3FileSystem(profile=global_config["AWSS3"]["profile"])
if not s3.exists(obj_path):
s3.put(cli_options.results_directory, obj_path, recursive=True)
else:
warnings.warn(f"Path '{obj_path}' already exists, not uploading.")
...@@ -184,15 +184,15 @@ def run_pipeline(global_config, results_directory, cli_options): ...@@ -184,15 +184,15 @@ def run_pipeline(global_config, results_directory, cli_options):
df.to_csv(output_file) df.to_csv(output_file)
# Plot in-sample # Plot in-sample
@rf.transform( # @rf.transform(
input=[insample7, insample14], # input=[insample7, insample14],
filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"), # filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"),
add_inputs=rf.add_inputs(process_data), # add_inputs=rf.add_inputs(process_data),
output="{path[0]}/insample_plots{LAG[0]}", # output="{path[0]}/insample_plots{LAG[0]}",
extras=["{LAG[0]}"], # extras=["{LAG[0]}"],
) # )
def plot_insample_predictive_timeseries(input_files, output_dir, lag): # def plot_insample_predictive_timeseries(input_files, output_dir, lag):
insample_predictive_timeseries(input_files, output_dir, lag) # insample_predictive_timeseries(input_files, output_dir, lag)
# Geopackage # Geopackage
rf.transform( rf.transform(
......
...@@ -28,6 +28,7 @@ jedi = "^0.17.2" ...@@ -28,6 +28,7 @@ jedi = "^0.17.2"
XlsxWriter = "^1.3.7" XlsxWriter = "^1.3.7"
netCDF4 = "^1.5.6" netCDF4 = "^1.5.6"
dask = {extras = ["array"], version = "^2021.2.0"} dask = {extras = ["array"], version = "^2021.2.0"}
s3fs = "^0.5.2"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
ipython = "^7.18.1" ipython = "^7.18.1"
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment