ruffus_pipeline.py 5.41 KB
Newer Older
Chris Jewell's avatar
Chris Jewell committed
1
2
3
4
"""A Ruffus-ised pipeline for COVID-19 analysis"""

import os
import yaml
5
import pandas as pd
Chris Jewell's avatar
Chris Jewell committed
6
7
8
import ruffus as rf

from covid.tasks import (
9
10
    assemble_data,
    mcmc,
Chris Jewell's avatar
Chris Jewell committed
11
12
13
14
    thin_posterior,
    next_generation_matrix,
    overall_rt,
    predict,
15
16
17
18
19
    summarize,
    within_between,
    case_exceedance,
    summary_geopackage,
    # lancs_spreadsheet,
Chris Jewell's avatar
Chris Jewell committed
20
21
22
)


23
def import_global_config(config_file):
Chris Jewell's avatar
Chris Jewell committed
24

25
    with open(config_file, "r") as f:
Chris Jewell's avatar
Chris Jewell committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
        config = yaml.load(f, Loader=yaml.FullLoader)

    return config


def join_and_expand(path1, path2):
    return os.path.expand(os.path.join(path1, path2))


if __name__ == "__main__":

    argparser = rf.cmdline.get_argparse(description="COVID-19 pipeline")
    argparser.add_argument(
        "-c", "--config", type=str, help="global configuration file"
    )
    argparser.add_argument(
        "-r", "--results-directory", type=str, help="pipeline results directory"
    )
    cli_options = argparser.parse_args()
    global_config = import_global_config(cli_options.config)

    # Output paths
    BASEDIR = os.path.expandvars(cli_options.results_directory)
49
50
51

    def work_dir(fn):
        return os.path.join(BASEDIR, fn)
Chris Jewell's avatar
Chris Jewell committed
52
53
54

    # Pipeline starts here
    @rf.mkdir(BASEDIR)
55
56
57
58
    @rf.originate(work_dir("config.yaml"), global_config)
    def save_config(output_file, config):
        with open(output_file, "w") as f:
            yaml.dump(config, f)
Chris Jewell's avatar
Chris Jewell committed
59
60
61
62

    @rf.transform(
        save_config,
        rf.formatter(),
63
        work_dir("pipeline_data.pkl"),
Chris Jewell's avatar
Chris Jewell committed
64
65
        global_config,
    )
66
67
    def process_data(input_file, output_file, config):
        assemble_data(output_file, config["ProcessData"])
Chris Jewell's avatar
Chris Jewell committed
68

69
    @rf.transform(
Chris Jewell's avatar
Chris Jewell committed
70
71
        process_data,
        rf.formatter(),
72
73
        work_dir("posterior.hd5"),
        global_config,
Chris Jewell's avatar
Chris Jewell committed
74
    )
75
76
    def run_mcmc(input_file, output_file, config):
        mcmc(input_file, output_file, config["Mcmc"])
Chris Jewell's avatar
Chris Jewell committed
77

78
79
    @rf.transform(
        input=run_mcmc,
Chris Jewell's avatar
Chris Jewell committed
80
        filter=rf.formatter(),
81
        output=work_dir("thin_samples.pkl"),
Chris Jewell's avatar
Chris Jewell committed
82
    )
83
84
    def thin_samples(input_file, output_file):
        thin_posterior(input_file, output_file, range(100))
Chris Jewell's avatar
Chris Jewell committed
85
86
87

    # Rt related steps
    rf.transform(
88
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
89
        filter=rf.formatter(),
90
91
        output=work_dir("ngm.pkl"),
    )(next_generation_matrix)
Chris Jewell's avatar
Chris Jewell committed
92
93
94
95

    rf.transform(
        input=next_generation_matrix,
        filter=rf.formatter(),
96
97
        output=work_dir("national_rt.xlsx"),
    )(overall_rt)
Chris Jewell's avatar
Chris Jewell committed
98
99
100

    # In-sample prediction
    @rf.transform(
101
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
102
        filter=rf.formatter(),
103
        output=work_dir("insample7.pkl"),
Chris Jewell's avatar
Chris Jewell committed
104
105
    )
    def insample7(input_files, output_file):
106
        predict(
Chris Jewell's avatar
Chris Jewell committed
107
108
109
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
110
111
            initial_step=-8,
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
112
113
114
        )

    @rf.transform(
115
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
116
        filter=rf.formatter(),
117
        output=work_dir("insample14.pkl"),
Chris Jewell's avatar
Chris Jewell committed
118
119
120
121
122
123
124
    )
    def insample14(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-14,
125
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
126
127
128
129
        )

    # Medium-term prediction
    @rf.transform(
130
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
131
        filter=rf.formatter(),
132
        output=work_dir("medium_term.pkl"),
Chris Jewell's avatar
Chris Jewell committed
133
134
135
136
137
138
139
    )
    def medium_term(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-1,
140
            num_steps=61,
Chris Jewell's avatar
Chris Jewell committed
141
142
        )

143
    # Summarisation
Chris Jewell's avatar
Chris Jewell committed
144
    rf.transform(
145
        input=next_generation_matrix,
Chris Jewell's avatar
Chris Jewell committed
146
        filter=rf.formatter(),
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
        output=work_dir("rt_summary.csv"),
    )(summarize.rt)

    rf.transform(
        input=medium_term,
        filter=rf.formatter(),
        output=work_dir("infec_incidence_summary.csv"),
    )(summarize.infec_incidence)

    rf.transform(
        input=[[process_data, thin_samples, medium_term]],
        filter=rf.formatter(),
        output=work_dir("prevalence_summary.csv"),
    )(summarize.prevalence)

    rf.transform(
        input=[[process_data, thin_samples]],
        filter=rf.formatter(),
        output=work_dir("within_between_summary.csv"),
    )(within_between)

    @rf.transform(
        input=[[process_data, insample7, insample14]],
        filter=rf.formatter(),
        output=work_dir("exceedance_summary.csv"),
Chris Jewell's avatar
Chris Jewell committed
172
    )
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
    def exceedance(input_files, output_file):
        exceed7 = case_exceedance((input_files[0], input_files[1]), 7)
        exceed14 = case_exceedance((input_files[0], input_files[2]), 14)
        df = pd.DataFrame(
            {"Pr(pred<obs)_7": exceed7, "Pr(pred<obs)_14": exceed14}
        )
        df.to_csv(output_file)

    # @rf.transform(
    #     input=[[process_data, insample7, insample14, medium_term]],
    #     filter=rf.formatter(),
    #     output=work_dir("total_predictive_timeseries.pdf")
    # )(total_predictive_timeseries)

    # Geopackage
    rf.transform(
        [
            [
                process_data,
                summarize.rt,
                summarize.infec_incidence,
                summarize.prevalence,
                within_between,
                exceedance,
            ]
        ],
        rf.formatter(),
        work_dir("prediction.gpkg"),
        global_config["Geopackage"],
    )(summary_geopackage)
Chris Jewell's avatar
Chris Jewell committed
203

204
    rf.cmdline.run(cli_options)