ruffus_pipeline.py 5.35 KB
Newer Older
1
"""Represents the analytic pipeline as a ruffus chain"""
Chris Jewell's avatar
Chris Jewell committed
2
3
4

import os
import yaml
Chris Jewell's avatar
Chris Jewell committed
5
import pandas as pd
Chris Jewell's avatar
Chris Jewell committed
6
7
import ruffus as rf

8

Chris Jewell's avatar
Chris Jewell committed
9
from covid.tasks import (
10
11
    assemble_data,
    mcmc,
Chris Jewell's avatar
Chris Jewell committed
12
13
14
15
    thin_posterior,
    next_generation_matrix,
    overall_rt,
    predict,
16
17
18
19
    summarize,
    within_between,
    case_exceedance,
    summary_geopackage,
20
    insample_predictive_timeseries,
Chris Jewell's avatar
Chris Jewell committed
21
    summary_longformat,
Chris Jewell's avatar
Chris Jewell committed
22
23
)

24
__all__ = ["run_pipeline"]
Chris Jewell's avatar
Chris Jewell committed
25
26


27
28
def _make_append_work_dir(work_dir):
    return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
Chris Jewell's avatar
Chris Jewell committed
29
30


31
def run_pipeline(global_config, results_directory, cli_options):
Chris Jewell's avatar
Chris Jewell committed
32

33
    wd = _make_append_work_dir(results_directory)
Chris Jewell's avatar
Chris Jewell committed
34
35

    # Pipeline starts here
36
37
    @rf.mkdir(results_directory)
    @rf.originate(wd("config.yaml"), global_config)
38
39
40
    def save_config(output_file, config):
        with open(output_file, "w") as f:
            yaml.dump(config, f)
Chris Jewell's avatar
Chris Jewell committed
41
42
43
44

    @rf.transform(
        save_config,
        rf.formatter(),
45
        wd("pipeline_data.pkl"),
Chris Jewell's avatar
Chris Jewell committed
46
47
        global_config,
    )
48
49
    def process_data(input_file, output_file, config):
        assemble_data(output_file, config["ProcessData"])
Chris Jewell's avatar
Chris Jewell committed
50

51
    @rf.transform(
Chris Jewell's avatar
Chris Jewell committed
52
53
        process_data,
        rf.formatter(),
54
        wd("posterior.hd5"),
55
        global_config,
Chris Jewell's avatar
Chris Jewell committed
56
    )
57
58
    def run_mcmc(input_file, output_file, config):
        mcmc(input_file, output_file, config["Mcmc"])
Chris Jewell's avatar
Chris Jewell committed
59

60
61
    @rf.transform(
        input=run_mcmc,
Chris Jewell's avatar
Chris Jewell committed
62
        filter=rf.formatter(),
63
        output=wd("thin_samples.pkl"),
Chris Jewell's avatar
Chris Jewell committed
64
        extras=[global_config],
Chris Jewell's avatar
Chris Jewell committed
65
    )
Chris Jewell's avatar
Chris Jewell committed
66
    def thin_samples(input_file, output_file, config):
Chris Jewell's avatar
Chris Jewell committed
67
        thin_posterior(input_file, output_file, config["ThinPosterior"])
Chris Jewell's avatar
Chris Jewell committed
68
69
70

    # Rt related steps
    rf.transform(
71
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
72
        filter=rf.formatter(),
73
        output=wd("ngm.pkl"),
74
    )(next_generation_matrix)
Chris Jewell's avatar
Chris Jewell committed
75
76
77
78

    rf.transform(
        input=next_generation_matrix,
        filter=rf.formatter(),
79
        output=wd("national_rt.xlsx"),
80
    )(overall_rt)
Chris Jewell's avatar
Chris Jewell committed
81
82
83

    # In-sample prediction
    @rf.transform(
84
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
85
        filter=rf.formatter(),
86
        output=wd("insample7.pkl"),
Chris Jewell's avatar
Chris Jewell committed
87
88
    )
    def insample7(input_files, output_file):
89
        predict(
Chris Jewell's avatar
Chris Jewell committed
90
91
92
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
93
94
            initial_step=-8,
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
95
96
97
        )

    @rf.transform(
98
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
99
        filter=rf.formatter(),
100
        output=wd("insample14.pkl"),
Chris Jewell's avatar
Chris Jewell committed
101
102
103
104
105
106
107
    )
    def insample14(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-14,
108
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
109
110
111
112
        )

    # Medium-term prediction
    @rf.transform(
113
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
114
        filter=rf.formatter(),
115
        output=wd("medium_term.pkl"),
Chris Jewell's avatar
Chris Jewell committed
116
117
118
119
120
121
122
    )
    def medium_term(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-1,
123
            num_steps=61,
Chris Jewell's avatar
Chris Jewell committed
124
125
        )

126
    # Summarisation
Chris Jewell's avatar
Chris Jewell committed
127
    rf.transform(
128
        input=next_generation_matrix,
Chris Jewell's avatar
Chris Jewell committed
129
        filter=rf.formatter(),
130
        output=wd("rt_summary.csv"),
131
132
133
134
135
    )(summarize.rt)

    rf.transform(
        input=medium_term,
        filter=rf.formatter(),
136
        output=wd("infec_incidence_summary.csv"),
137
138
139
140
141
    )(summarize.infec_incidence)

    rf.transform(
        input=[[process_data, thin_samples, medium_term]],
        filter=rf.formatter(),
142
        output=wd("prevalence_summary.csv"),
143
144
145
146
147
    )(summarize.prevalence)

    rf.transform(
        input=[[process_data, thin_samples]],
        filter=rf.formatter(),
148
        output=wd("within_between_summary.csv"),
149
150
151
152
153
    )(within_between)

    @rf.transform(
        input=[[process_data, insample7, insample14]],
        filter=rf.formatter(),
154
        output=wd("exceedance_summary.csv"),
Chris Jewell's avatar
Chris Jewell committed
155
    )
156
157
158
159
    def exceedance(input_files, output_file):
        exceed7 = case_exceedance((input_files[0], input_files[1]), 7)
        exceed14 = case_exceedance((input_files[0], input_files[2]), 14)
        df = pd.DataFrame(
Chris Jewell's avatar
Chris Jewell committed
160
161
            {"Pr(pred<obs)_7": exceed7, "Pr(pred<obs)_14": exceed14},
            index=exceed7.coords["location"],
162
163
164
        )
        df.to_csv(output_file)

165
166
167
168
169
170
171
172
173
174
    # Plot in-sample
    @rf.transform(
        input=[insample7, insample14],
        filter=rf.formatter(".+/insample(?P<LAG>\d+).pkl"),
        add_inputs=rf.add_inputs(process_data),
        output="{path[0]}/insample_plots{LAG[0]}",
        extras=["{LAG[0]}"],
    )
    def plot_insample_predictive_timeseries(input_files, output_dir, lag):
        insample_predictive_timeseries(input_files, output_dir, lag)
175

176
177
178
179
180
181
182
183
184
185
186
187
188
    # Geopackage
    rf.transform(
        [
            [
                process_data,
                summarize.rt,
                summarize.infec_incidence,
                summarize.prevalence,
                within_between,
                exceedance,
            ]
        ],
        rf.formatter(),
189
        wd("prediction.gpkg"),
190
191
        global_config["Geopackage"],
    )(summary_geopackage)
Chris Jewell's avatar
Chris Jewell committed
192

193
    rf.cmdline.run(cli_options)
Chris Jewell's avatar
Chris Jewell committed
194
195
196
197
198
199
200
201
202

    # DSTL Summary
    rf.transform(
        [[process_data, insample14, medium_term, next_generation_matrix]],
        rf.formatter(),
        wd("summary_longformat.xlsx"),
    )(summary_longformat)

    rf.cmdline.run(cli_options)