ruffus_pipeline.py 6.06 KB
Newer Older
1
"""Represents the analytic pipeline as a ruffus chain"""
Chris Jewell's avatar
Chris Jewell committed
2
3
4

import os
import yaml
5
6
7
8
from datetime import datetime
from uuid import uuid1
import json
import netCDF4 as nc
Chris Jewell's avatar
Chris Jewell committed
9
import pandas as pd
Chris Jewell's avatar
Chris Jewell committed
10
11
import ruffus as rf

12

Chris Jewell's avatar
Chris Jewell committed
13
from covid.tasks import (
14
15
    assemble_data,
    mcmc,
Chris Jewell's avatar
Chris Jewell committed
16
17
18
19
    thin_posterior,
    next_generation_matrix,
    overall_rt,
    predict,
20
21
22
23
    summarize,
    within_between,
    case_exceedance,
    summary_geopackage,
24
    insample_predictive_timeseries,
Chris Jewell's avatar
Chris Jewell committed
25
    summary_longformat,
Chris Jewell's avatar
Chris Jewell committed
26
27
)

28
__all__ = ["run_pipeline"]
Chris Jewell's avatar
Chris Jewell committed
29
30


31
32
def _make_append_work_dir(work_dir):
    return lambda filename: os.path.expandvars(os.path.join(work_dir, filename))
Chris Jewell's avatar
Chris Jewell committed
33
34


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def _create_metadata(config):
    return dict(
        pipeline_id=uuid1().hex,
        created_at=str(datetime.now()),
        inference_library="GEM",
        inference_library_version="0.1.alpha0",
        pipeline_config=json.dumps(config, default=str),
    )


def _create_nc_file(output_file, meta_data_dict):
    nc_file = nc.Dataset(output_file, "w", format="NETCDF4")
    for k, v in meta_data_dict.items():
        setattr(nc_file, k, v)
    nc_file.close()


52
def run_pipeline(global_config, results_directory, cli_options):
Chris Jewell's avatar
Chris Jewell committed
53

54
    wd = _make_append_work_dir(results_directory)
Chris Jewell's avatar
Chris Jewell committed
55

56
57
    pipeline_meta = _create_metadata(global_config)

Chris Jewell's avatar
Chris Jewell committed
58
    # Pipeline starts here
59
    @rf.mkdir(results_directory)
60
61
62
63
64
65
    @rf.originate(wd("config.yaml"), global_config)
    def save_config(output_file, config):
        with open(output_file, "w") as f:
            yaml.dump(config, f)

    @rf.follows(save_config)
66
67
    @rf.originate(wd("inferencedata.nc"), global_config)
    def process_data(output_file, config):
68

69
70
        _create_nc_file(output_file, pipeline_meta)
        assemble_data(output_file, config["ProcessData"])
Chris Jewell's avatar
Chris Jewell committed
71

72
    @rf.transform(
Chris Jewell's avatar
Chris Jewell committed
73
74
        process_data,
        rf.formatter(),
75
        wd("posterior.hd5"),
76
        global_config,
Chris Jewell's avatar
Chris Jewell committed
77
    )
78
79
    def run_mcmc(input_file, output_file, config):
        mcmc(input_file, output_file, config["Mcmc"])
Chris Jewell's avatar
Chris Jewell committed
80

81
82
    @rf.transform(
        input=run_mcmc,
Chris Jewell's avatar
Chris Jewell committed
83
        filter=rf.formatter(),
84
        output=wd("thin_samples.pkl"),
Chris Jewell's avatar
Chris Jewell committed
85
        extras=[global_config],
Chris Jewell's avatar
Chris Jewell committed
86
    )
Chris Jewell's avatar
Chris Jewell committed
87
    def thin_samples(input_file, output_file, config):
Chris Jewell's avatar
Chris Jewell committed
88
        thin_posterior(input_file, output_file, config["ThinPosterior"])
Chris Jewell's avatar
Chris Jewell committed
89
90
91

    # Rt related steps
    rf.transform(
92
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
93
        filter=rf.formatter(),
94
        output=wd("ngm.nc"),
95
    )(next_generation_matrix)
Chris Jewell's avatar
Chris Jewell committed
96
97
98
99

    rf.transform(
        input=next_generation_matrix,
        filter=rf.formatter(),
100
        output=wd("national_rt.xlsx"),
101
    )(overall_rt)
Chris Jewell's avatar
Chris Jewell committed
102
103
104

    # In-sample prediction
    @rf.transform(
105
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
106
        filter=rf.formatter(),
107
        output=wd("insample7.nc"),
Chris Jewell's avatar
Chris Jewell committed
108
109
    )
    def insample7(input_files, output_file):
110
        predict(
Chris Jewell's avatar
Chris Jewell committed
111
112
113
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
114
            initial_step=-7,
115
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
116
117
118
        )

    @rf.transform(
119
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
120
        filter=rf.formatter(),
121
        output=wd("insample14.nc"),
Chris Jewell's avatar
Chris Jewell committed
122
123
124
125
126
127
128
    )
    def insample14(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-14,
129
            num_steps=28,
Chris Jewell's avatar
Chris Jewell committed
130
131
132
133
        )

    # Medium-term prediction
    @rf.transform(
134
        input=[[process_data, thin_samples]],
Chris Jewell's avatar
Chris Jewell committed
135
        filter=rf.formatter(),
136
        output=wd("medium_term.nc"),
Chris Jewell's avatar
Chris Jewell committed
137
138
139
140
141
142
143
    )
    def medium_term(input_files, output_file):
        return predict(
            data=input_files[0],
            posterior_samples=input_files[1],
            output_file=output_file,
            initial_step=-1,
144
            num_steps=61,
Chris Jewell's avatar
Chris Jewell committed
145
146
        )

147
    # Summarisation
Chris Jewell's avatar
Chris Jewell committed
148
    rf.transform(
149
        input=next_generation_matrix,
Chris Jewell's avatar
Chris Jewell committed
150
        filter=rf.formatter(),
151
        output=wd("rt_summary.csv"),
152
153
154
155
156
    )(summarize.rt)

    rf.transform(
        input=medium_term,
        filter=rf.formatter(),
157
        output=wd("infec_incidence_summary.csv"),
158
159
160
    )(summarize.infec_incidence)

    rf.transform(
161
        input=[[process_data, medium_term]],
162
        filter=rf.formatter(),
163
        output=wd("prevalence_summary.csv"),
164
165
166
167
168
    )(summarize.prevalence)

    rf.transform(
        input=[[process_data, thin_samples]],
        filter=rf.formatter(),
169
        output=wd("within_between_summary.csv"),
170
171
172
173
174
    )(within_between)

    @rf.transform(
        input=[[process_data, insample7, insample14]],
        filter=rf.formatter(),
175
        output=wd("exceedance_summary.csv"),
Chris Jewell's avatar
Chris Jewell committed
176
    )
177
178
179
180
    def exceedance(input_files, output_file):
        exceed7 = case_exceedance((input_files[0], input_files[1]), 7)
        exceed14 = case_exceedance((input_files[0], input_files[2]), 14)
        df = pd.DataFrame(
Chris Jewell's avatar
Chris Jewell committed
181
182
            {"Pr(pred<obs)_7": exceed7, "Pr(pred<obs)_14": exceed14},
            index=exceed7.coords["location"],
183
184
185
        )
        df.to_csv(output_file)

186
187
188
    # Plot in-sample
    @rf.transform(
        input=[insample7, insample14],
189
        filter=rf.formatter(".+/insample(?P<LAG>\d+).nc"),
190
191
192
193
194
195
        add_inputs=rf.add_inputs(process_data),
        output="{path[0]}/insample_plots{LAG[0]}",
        extras=["{LAG[0]}"],
    )
    def plot_insample_predictive_timeseries(input_files, output_dir, lag):
        insample_predictive_timeseries(input_files, output_dir, lag)
196

197
198
199
200
201
202
203
204
205
206
207
208
209
    # Geopackage
    rf.transform(
        [
            [
                process_data,
                summarize.rt,
                summarize.infec_incidence,
                summarize.prevalence,
                within_between,
                exceedance,
            ]
        ],
        rf.formatter(),
210
        wd("prediction.gpkg"),
211
212
        global_config["Geopackage"],
    )(summary_geopackage)
Chris Jewell's avatar
Chris Jewell committed
213

214
    rf.cmdline.run(cli_options)
Chris Jewell's avatar
Chris Jewell committed
215
216
217

    # DSTL Summary
    rf.transform(
218
219
220
221
222
223
224
225
226
        [
            [
                process_data,
                insample7,
                insample14,
                medium_term,
                next_generation_matrix,
            ]
        ],
Chris Jewell's avatar
Chris Jewell committed
227
228
229
230
231
        rf.formatter(),
        wd("summary_longformat.xlsx"),
    )(summary_longformat)

    rf.cmdline.run(cli_options)