pipeline.py 2.69 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""A Ruffus-ised pipeline for COVID-19 analysis"""

from os.path import expandvars
import yaml
import datetime
import ruffus as rf

from covid.ruffus_pipeline import run_pipeline


def _import_global_config(config_file):

    with open(config_file, "r") as f:
        config = yaml.load(f, Loader=yaml.FullLoader)

    return config


if __name__ == "__main__":

    # Ruffus wrapper around argparse used to give us ruffus
    # cmd line switches as well as our own config
    argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
    argparser.add_argument(
        "-c",
        "--config",
        type=str,
        help="global configuration file",
        required=True,
    )
    argparser.add_argument(
        "-r",
        "--results-directory",
        type=str,
        help="pipeline results directory",
        required=True,
    )
    argparser.add_argument(
        "--date-range",
        type=lambda s: datetime.datetime.strptime(s, '%Y-%m-%d'),
        nargs=2,
        help="Date range [low high)",
        metavar="ISO6801",
    )
    argparser.add_argument(
        "--reported-cases", type=str, help="Path to case file"
    )
    argparser.add_argument(
        "--commute-volume", type=str, help="Path to commute volume file"
    )
    argparser.add_argument(
        "--case-date-type",
        type=str,
        help="Case date type (specimen | report)",
        choices=["specimen", "report"],
    )
    argparser.add_argument(
        "--pillar", type=str, help="Pillar", choices=["both", "1", "2"]
    )

    cli_options = argparser.parse_args()
    global_config = _import_global_config(cli_options.config)

    if cli_options.date_range is not None:
        global_config["ProcessData"]["date_range"][0] = cli_options.date_range[
            0
        ]
        global_config["ProcessData"]["date_range"][1] = cli_options.date_range[
            1
        ]

    if cli_options.reported_cases is not None:
        global_config["ProcessData"]["CasesData"]["address"] = expandvars(
            cli_options.reported_cases
        )

    if cli_options.commute_volume is not None:
        global_config["ProcessData"]["commute_volume"] = expandvars(
            cli_options.commute_volume
        )

    if cli_options.case_date_type is not None:
        global_config["ProcessData"][
            "case_date_type"
        ] = cli_options.case_date_type

    if cli_options.pillar is not None:
        opts = {
            "both": ["Pillar 1", "Pillar 2"],
            "1": ["Pillar 1"],
            "2": ["Pillar 2"],
        }
        global_config["ProcessData"]["CasesData"]["pillars"] = opts[
            cli_options.pillar
        ]

    run_pipeline(global_config, cli_options.results_directory, cli_options)