pipeline.py 2.93 KB
Newer Older
1
2
"""A Ruffus-ised pipeline for COVID-19 analysis"""

Chris Jewell's avatar
Chris Jewell committed
3
import os
4
from os.path import expandvars
Chris Jewell's avatar
Chris Jewell committed
5
import warnings
6
7
import yaml
import datetime
Chris Jewell's avatar
Chris Jewell committed
8
import s3fs
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import ruffus as rf

from covid.ruffus_pipeline import run_pipeline


def _import_global_config(config_file):

    with open(config_file, "r") as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    return config


if __name__ == "__main__":

    # Ruffus wrapper around argparse used to give us ruffus
    # cmd line switches as well as our own config
    argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
27
28
29
30
31
    data_args = argparser.add_argument_group(
        "Data options", "Options controlling input data"
    )

    data_args.add_argument(
32
33
34
35
36
37
        "-c",
        "--config",
        type=str,
        help="global configuration file",
        required=True,
    )
38
    data_args.add_argument(
39
40
41
42
43
44
        "-r",
        "--results-directory",
        type=str,
        help="pipeline results directory",
        required=True,
    )
45
    data_args.add_argument(
46
        "--date-range",
47
        type=lambda s: datetime.datetime.strptime(s, "%Y-%m-%d"),
48
49
50
51
        nargs=2,
        help="Date range [low high)",
        metavar="ISO6801",
    )
52
    data_args.add_argument(
53
54
        "--reported-cases", type=str, help="Path to case file"
    )
55
    data_args.add_argument(
56
57
        "--commute-volume", type=str, help="Path to commute volume file"
    )
58
    data_args.add_argument(
59
60
61
62
63
        "--case-date-type",
        type=str,
        help="Case date type (specimen | report)",
        choices=["specimen", "report"],
    )
64
    data_args.add_argument(
65
66
        "--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
    )
Chris Jewell's avatar
Chris Jewell committed
67
68
69
    data_args.add_argument(
        "--aws", action='store_true', help="Push to AWS"
        )
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

    cli_options = argparser.parse_args()
    global_config = _import_global_config(cli_options.config)

    if cli_options.date_range is not None:
        global_config["ProcessData"]["date_range"][0] = cli_options.date_range[
            0
        ]
        global_config["ProcessData"]["date_range"][1] = cli_options.date_range[
            1
        ]

    if cli_options.reported_cases is not None:
        global_config["ProcessData"]["CasesData"]["address"] = expandvars(
            cli_options.reported_cases
        )

    if cli_options.commute_volume is not None:
        global_config["ProcessData"]["commute_volume"] = expandvars(
            cli_options.commute_volume
        )

    if cli_options.case_date_type is not None:
        global_config["ProcessData"][
            "case_date_type"
        ] = cli_options.case_date_type

    if cli_options.pillar is not None:
        opts = {
            "both": ["Pillar 1", "Pillar 2"],
            "1": ["Pillar 1"],
            "2": ["Pillar 2"],
        }
        global_config["ProcessData"]["CasesData"]["pillars"] = opts[
            cli_options.pillar
        ]

    run_pipeline(global_config, cli_options.results_directory, cli_options)