forked from open-covid-19/data
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.py
124 lines (102 loc) · 4.43 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import warnings
import subprocess
from pathlib import Path
from datetime import datetime
from functools import partial
from argparse import ArgumentParser
from typing import Any, Callable, Dict, List
from lib.constants import SRC
def parse_command(cmd: str) -> List[str]:
if cmd == "curl":
return ["python3", str(SRC / "cache" / "commands" / "curl_fetch.py")]
if cmd == "static_fetch":
return ["python3", str(SRC / "cache" / "commands" / "static_fetch.py")]
if cmd == "dynamic_fetch":
return ["node", str(SRC / "cache" / "commands" / "dynamic_fetch.js")]
if cmd.startswith("dynamic_custom/"):
script_name = cmd.split("/")[-1]
script_extension = script_name.split(".")[-1]
assert script_extension == "js", "Dynamic script must be a NodeJS script"
return ["node", str(SRC / "cache" / "commands" / "dynamic_custom" / script_name)]
raise ValueError(f"Unknown command {cmd}")
def process_source(cwd: Path, error_handler: Callable[[str], None], data_source: Dict[str, Any]):
"""
Use the appropriate download command for the given data source.
"""
cmd_tokens = parse_command(data_source["cmd"])
cmd_tokens += ["--output", str(cwd / data_source["output"])]
for option, value in data_source.items():
if not option in ("cmd", "output"):
value = value if isinstance(value, str) else json.dumps(value)
cmd_tokens += [f"--{option}", value]
print(">", " ".join(cmd_tokens))
process = subprocess.Popen(cmd_tokens, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait for process to finish and get err streams
try:
stdout, stderr = process.communicate(timeout=30)
# Write error to our stderr output
if stderr:
error_handler(stderr.decode("UTF-8"))
# If there's any output, pipe it through
if stdout:
print(stdout.decode("UTF-8"))
# Verify that the return code is zero
if process.returncode != 0:
error_handler(f"Exit code: {process.returncode}")
except Exception as exc:
# Most likely a timeout, but catching all potential errors so we can proceed
error_handler(getattr(exc, "message", str(exc)))
# Parse arguments
parser = ArgumentParser()
parser.add_argument("--continue-on-error", action="store_true", default=False)
args = parser.parse_args()
def error_handler(error_message: str):
""" Define error handling behavior depending on arguments """
if args.continue_on_error:
warnings.warn(error_message)
else:
raise RuntimeError(error_message)
# Create the output folder for the nearest hour in UTC time
now = datetime.utcnow()
output_name = now.strftime("%Y-%m-%d-%H")
output_path = SRC / ".." / "output" / "cache"
snapshot_path = output_path / output_name
snapshot_path.mkdir(parents=True, exist_ok=True)
# Iterate over each source and process it
map_func = partial(process_source, snapshot_path, error_handler)
for source in json.load((SRC / "cache" / "config.json").open("r")):
map_func(source)
# Build a "sitemap" of the cache output folder
sitemap: Dict[str, List[str]] = {}
for snapshot in output_path.iterdir():
if snapshot.name.startswith(".") or snapshot.name == "sitemap.json":
continue
if snapshot.is_file():
warnings.warn(f"Unexpected file seen in root of {snapshot}")
continue
for cached_file in snapshot.iterdir():
if not cached_file.is_file():
warnings.warn(f"Unexpected folder seen in directory {cached_file}")
continue
sitemap_key = cached_file.stem
snapshot_list = sitemap.get(sitemap_key, [])
snapshot_list.append(str(cached_file.relative_to(output_path)))
sitemap[sitemap_key] = list(sorted(snapshot_list))
# Output the sitemap
with open(output_path / "sitemap.json", "w") as fd:
json.dump(sitemap, fd)