Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃憤 fixs recovery data in api/v2 jhu #316

Closed
wants to merge 16 commits into from
10 changes: 10 additions & 0 deletions .deepsource.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version = 1

test_patterns = ["tests/**"]

[[analyzers]]
name = "python"
enabled = true

[analyzers.meta]
runtime_version = "3.x.x"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ htmlcov/
nosetests.xml
coverage.xml
*,cover
locustfile.py

# Translations
*.mo
Expand Down
6 changes: 5 additions & 1 deletion app/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from ..services.location.nyt import NYTLocationService

# Mapping of services to data-sources.
DATA_SOURCES = {"jhu": JhuLocationService(), "csbs": CSBSLocationService(), "nyt": NYTLocationService()}
DATA_SOURCES = {
"jhu": JhuLocationService(),
"csbs": CSBSLocationService(),
"nyt": NYTLocationService(),
}


def data_source(source):
Expand Down
13 changes: 11 additions & 2 deletions app/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@


def save(
name: str, content: Union[str, Dict, List], write_mode: str = "w", indent: int = 2, **json_dumps_kwargs
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
) -> pathlib.Path:
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
Expand All @@ -35,7 +39,12 @@ class AIO:

@classmethod
async def save(
cls, name: str, content: Union[str, Dict, List], write_mode: str = "w", indent: int = 2, **json_dumps_kwargs
cls,
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
):
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
Expand Down
8 changes: 6 additions & 2 deletions app/location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Location: # pylint: disable=too-many-instance-attributes
"""

def __init__(
self, id, country, province, coordinates, last_updated, confirmed, deaths, recovered
self, id, country, province, coordinates, last_updated, confirmed, deaths, recovered,
): # pylint: disable=too-many-arguments
# General info.
self.id = id
Expand Down Expand Up @@ -66,7 +66,11 @@ def serialize(self):
# Last updated.
"last_updated": self.last_updated,
# Latest data (statistics).
"latest": {"confirmed": self.confirmed, "deaths": self.deaths, "recovered": self.recovered},
"latest": {
"confirmed": self.confirmed,
"deaths": self.deaths,
"recovered": self.recovered,
},
}


Expand Down
6 changes: 5 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@

# Enable CORS.
APP.add_middleware(
CORSMiddleware, allow_credentials=True, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"],
CORSMiddleware,
allow_credentials=True,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
APP.add_middleware(GZipMiddleware, minimum_size=1000)

Expand Down
6 changes: 5 additions & 1 deletion app/routers/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ async def all_categories():
"deaths": deaths,
"recovered": recovered,
# Latest.
"latest": {"confirmed": confirmed["latest"], "deaths": deaths["latest"], "recovered": recovered["latest"],},
"latest": {
"confirmed": confirmed["latest"],
"deaths": deaths["latest"],
"recovered": recovered["latest"],
},
}


Expand Down
14 changes: 11 additions & 3 deletions app/routers/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,17 @@ async def get_locations(

# Do filtering.
try:
locations = [location for location in locations if str(getattr(location, key)).lower() == str(value)]
locations = [
location
for location in locations
if str(getattr(location, key)).lower() == str(value)
]
except AttributeError:
pass
if not locations:
raise HTTPException(404, detail=f"Source `{source}` does not have the desired location data.")
raise HTTPException(
404, detail=f"Source `{source}` does not have the desired location data.",
)

# Return final serialized data.
return {
Expand All @@ -84,7 +90,9 @@ async def get_locations(

# pylint: disable=invalid-name
@V2.get("/locations/{id}", response_model=LocationResponse)
async def get_location_by_id(request: Request, id: int, source: Sources = "jhu", timelines: bool = True):
async def get_location_by_id(
request: Request, id: int, source: Sources = "jhu", timelines: bool = True
):
"""
Getting specific location by id.
"""
Expand Down
97 changes: 54 additions & 43 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.csbs import CSBSLocation
from ...utils import httputils
Expand Down Expand Up @@ -34,7 +35,7 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=3600))
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour
Expand All @@ -44,48 +45,58 @@ async def get_locations():
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Coordinates.
coordinates = Coordinates(item["Latitude"], item["Longitude"]) # pylint: disable=unused-variable

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")

LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
54 changes: 42 additions & 12 deletions app/services/location/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ async def get(self, loc_id): # pylint: disable=arguments-differ


# Base URL for fetching category.
BASE_URL = (
"https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/"
)
BASE_URL = "https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/"


@cached(cache=TTLCache(maxsize=128, ttl=1800))
@cached(cache=TTLCache(maxsize=4, ttl=1800))
async def get_category(category):
"""
Retrieves the data for the provided category. The data is cached for 30 minutes locally, 1 hour via shared Redis.
Expand Down Expand Up @@ -129,7 +127,7 @@ async def get_category(category):
return results


@cached(cache=TTLCache(maxsize=1024, ttl=1800))
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves the locations from the categories. The locations are cached for 1 hour.
Expand All @@ -142,22 +140,31 @@ async def get_locations():
# Get all of the data categories locations.
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
# recovered = await get_category("recovered")
recovered = await get_category("recovered")

locations_confirmed = confirmed["locations"]
locations_deaths = deaths["locations"]
# locations_recovered = recovered["locations"]
locations_recovered = recovered["locations"]

# Final locations to return.
locations = []

# ***************************************************************************
# TODO: This iteration approach assumes the indexes remain the same
# and opens us to a CRITICAL ERROR. The removal of a column in the data source
# would break the API or SHIFT all the data confirmed, deaths, recovery producting
# incorrect data to consumers.
# ***************************************************************************
# Go through locations.
for index, location in enumerate(locations_confirmed):
# Get the timelines.

# TEMP: Fix for merging recovery data. See TODO above for more details.
key = (location["country"], location["province"])

timelines = {
"confirmed": locations_confirmed[index]["history"],
"deaths": locations_deaths[index]["history"],
# 'recovered' : locations_recovered[index]['history'],
"confirmed": location["history"],
"deaths": parse_history(key, locations_deaths, index),
"recovered": parse_history(key, locations_recovered, index),
}

# Grab coordinates.
Expand Down Expand Up @@ -188,11 +195,34 @@ async def get_locations():
for date, amount in timelines["deaths"].items()
}
),
"recovered": Timeline({}),
"recovered": Timeline(
{
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["recovered"].items()
}
),
},
)
)
LOGGER.info(f"{data_id} Data normalized")

# Finally, return the locations.
return locations


def parse_history(key: tuple, locations: list, index: int):
"""
Helper for validating and extracting history content from
locations data based on index. Validates with the current country/province
key to make sure no index/column issue.

TEMP: solution because implement a more efficient and better approach in the refactor.
"""
location_history = {}
try:
if key == (locations[index]["country"], locations[index]["province"]):
location_history = locations[index]["history"]
except (IndexError, KeyError):
LOGGER.debug(f"iteration data merge error: {index} {key}")

return location_history