ruffus_pipeline.py 5.47 KB
Newer Older
Chris Jewell's avatar
Chris Jewell committed
1
2
3
4
"""A Ruffus-ised pipeline for COVID-19 analysis"""

import os
import yaml
5
import pandas as pd
Chris Jewell's avatar
Chris Jewell committed
6
7
8
import ruffus as rf

from covid.tasks import (
9
10
    assemble_data,
    mcmc,
Chris Jewell's avatar
Chris Jewell committed
11
12
13
14
    thin_posterior,
    next_generation_matrix,
    overall_rt,
    predict,
15
16
17
18
19
    summarize,
    within_between,
    case_exceedance,
    summary_geopackage,
    # lancs_spreadsheet,
Chris Jewell's avatar
Chris Jewell committed
20
21
22
)


23
def import_global_config(config_file):
Chris Jewell's avatar
Chris Jewell committed
24

25
    with open(config_file, "r") as f:
Chris Jewell's avatar
Chris Jewell committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
        config = yaml.load(f, Loader=yaml.FullLoader)

    return config


def join_and_expand(path1, path2):
    return os.path.expand(os.path.join(path1, path2))


if __name__ == "__main__":

    argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
    argparser.add_argument(
        "-c", "--config", type=str, help="global configuration file"
    )
    argparser.add_argument(
        "-r", "--results-directory", type=str, help="pipeline results directory"
    )
    cli_options = argparser.parse_args()
    global_config = import_global_config(cli_options.config)

    # Output paths
    BASEDIR = os.path.expandvars(cli_options.results_directory)
49
50
51

    def work_dir(fn):
        return os.path.join(BASEDIR, fn)
Chris Jewell's avatar
Chris Jewell committed
52
53
54

    # Pipeline starts here
    @rf.mkdir(BASEDIR)
55
56
57
58
    @rf.originate(work_dir("config.yaml"), global_config)
    def save_config(output_file, config):
        with open(output_file, "w") as f:
            yaml.dump(config, f)
Chris Jewell's avatar
Chris Jewell committed
59
60
61
62

    @rf.transform(
        save_config,
        rf.formatter(),
63
        work_dir("pipeline_data.pkl"),
Chris Jewell's avatar
Chris Jewell committed
64
65
        global_config,
    )
66
67
    def process_data(input_file, output_file, config):
        assemble_data(output_file, config["ProcessData"])
Chris Jewell's avatar
Chris Jewell committed
68

69
    @rf.transform(
Chris Jewell's avatar
Chris Jewell committed
70
71
        process_data,
        rf.formatter(),
72
73
        work_dir("posterior.hd5"),
        global_config,
Chris Jewell's avatar
Chris Jewell committed
74
    )
75
76
    def run_mcmc(input_file, output_file, config):
        mcmc(input_file, output_file, config["Mcmc"])
Chris Jewell's avatar
Chris Jewell committed
77

78
79
    @rf.transform(
        input=run_mcmc,
Chris Jewell's avatar
Chris Jewell committed
80
        filter=rf.formatter(),
81
        output=work_dir("thin_samples.pkl"),
Chris Jewell's avatar
Chris Jewell committed
82
    )
83
    def thin_samples(input_file, output_file):
Chris Jewell's avatar
Chris Jewell committed
84
        thin_posterior(input_file, output_file, config["ThinPosterior"])
Chris Jewell's avatar
Chris Jewell committed
85
86
87

    # Rt related steps
    rf.transform(
88
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
89
        filter=rf.formatter(),
90
91
        output=work_dir("ngm.pkl"),
    )(next_generation_matrix)
Chris Jewell's avatar
Chris Jewell committed
92
93
94
95

    rf.transform(
        input=next_generation_matrix,
        filter=rf.formatter(),
96
97
        output=work_dir("national_rt.xlsx"),
    )(overall_rt)
Chris Jewell's avatar
Chris Jewell committed
98
99
100

    # In-sample prediction
    @rf.transform(
101
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
102
        filter=rf.formatter(),
103
        output=work_dir("insample7.pkl"),
Chris Jewell's avatar
Chris Jewell committed
104
105
    )
    def insample7(input_files, output_file):
106
        predict(
Chris Jewell's avatar
Chris Jewell committed
107
108
109
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
110
111
            initial_step=-8,
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
112
113
114
        )

    @rf.transform(
115
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
116
        filter=rf.formatter(),
117
        output=work_dir("insample14.pkl"),
Chris Jewell's avatar
Chris Jewell committed
118
119
120
121
122
123
124
    )
    def insample14(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-14,
125
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
126
127
128
129
        )

    # Medium-term prediction
    @rf.transform(
130
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
131
        filter=rf.formatter(),
132
        output=work_dir("medium_term.pkl"),
Chris Jewell's avatar
Chris Jewell committed
133
134
135
136
137
138
139
    )
    def medium_term(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-1,
140
            num_steps=61,
Chris Jewell's avatar
Chris Jewell committed
141
142
        )

143
    # Summarisation
Chris Jewell's avatar
Chris Jewell committed
144
    rf.transform(
145
        input=next_generation_matrix,
Chris Jewell's avatar
Chris Jewell committed
146
        filter=rf.formatter(),
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
        output=work_dir("rt_summary.csv"),
    )(summarize.rt)

    rf.transform(
        input=medium_term,
        filter=rf.formatter(),
        output=work_dir("infec_incidence_summary.csv"),
    )(summarize.infec_incidence)

    rf.transform(
        input=[[process_data, thin_samples, medium_term]],
        filter=rf.formatter(),
        output=work_dir("prevalence_summary.csv"),
    )(summarize.prevalence)

    rf.transform(
        input=[[process_data, thin_samples]],
        filter=rf.formatter(),
        output=work_dir("within_between_summary.csv"),
    )(within_between)

    @rf.transform(
        input=[[process_data, insample7, insample14]],
        filter=rf.formatter(),
        output=work_dir("exceedance_summary.csv"),
Chris Jewell's avatar
Chris Jewell committed
172
    )
173
174
175
176
    def exceedance(input_files, output_file):
        exceed7 = case_exceedance((input_files[0], input_files[1]), 7)
        exceed14 = case_exceedance((input_files[0], input_files[2]), 14)
        df = pd.DataFrame(
Chris Jewell's avatar
Chris Jewell committed
177
178
            {"Pr(pred<obs)_7": exceed7, "Pr(pred<obs)_14": exceed14},
            index=exceed7.coords["location"],
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        )
        df.to_csv(output_file)

    # @rf.transform(
    #     input=[[process_data, insample7, insample14, medium_term]],
    #     filter=rf.formatter(),
    #     output=work_dir("total_predictive_timeseries.pdf")
    # )(total_predictive_timeseries)

    # Geopackage
    rf.transform(
        [
            [
                process_data,
                summarize.rt,
                summarize.infec_incidence,
                summarize.prevalence,
                within_between,
                exceedance,
            ]
        ],
        rf.formatter(),
        work_dir("prediction.gpkg"),
        global_config["Geopackage"],
    )(summary_geopackage)
Chris Jewell's avatar
Chris Jewell committed
204

205
    rf.cmdline.run(cli_options)