Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions app/services/caching/location_caching_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@


from datetime import date, datetime

import requests

CACHE_TIME_MIN = 20


class LocationCachingProxy:

def __init__(self) -> None:
self.cache = {
}

def cached_get(self, request_url):
cached_data = self.cache.get(request_url)

if cached_data:
cache_time = cached_data[0]
if (datetime.now - datetime.timedelta(minutes=CACHE_TIME_MIN)) < cache_time:
return cached_data[1]

new_data = self.requests.get(request_url)
self.cache[request_url] = (datetime.now(), new_data)

return new_data


PROXY = LocationCachingProxy()
90 changes: 35 additions & 55 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import logging
from datetime import datetime

from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.csbs import CSBSLocation
from ...utils import httputils
from . import LocationService
from ...caches import PROXY

LOGGER = logging.getLogger("services.location.csbs")

Expand All @@ -35,68 +34,49 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
result = PROXY.cached_get(BASE_URL)

data = list(csv.DictReader(result.text.splitlines()))

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(
last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)
)

# Return the locations.
return locations
117 changes: 52 additions & 65 deletions app/services/location/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from datetime import datetime
from pprint import pformat as pf

from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
Expand All @@ -16,6 +14,8 @@
from ...utils import date as date_util
from ...utils import httputils
from . import LocationService
from ...caches import PROXY


LOGGER = logging.getLogger("services.location.jhu")
PID = os.getpid()
Expand Down Expand Up @@ -44,7 +44,6 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
BASE_URL = "https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/"


@cached(cache=TTLCache(maxsize=4, ttl=1800))
async def get_category(category):
"""
Retrieves the data for the provided category. The data is cached for 30 minutes locally, 1 hour via shared Redis.
Expand All @@ -57,86 +56,74 @@ async def get_category(category):
data_id = f"jhu.{category}"

# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
results = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
# URL to request data from.
url = BASE_URL + "time_series_covid19_%s_global.csv" % category

# Request the data
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(url) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")
# URL to request data from.
url = BASE_URL + "time_series_covid19_%s_global.csv" % category

# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")
# Request the data

# The normalized locations.
locations = []
result = PROXY.cached_get(url)

for item in data:
# Filter out all the dates.
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))
# Parse the CSV.
data = list(csv.DictReader(result.text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

# Make location history from dates.
history = {date: int(float(amount or 0)) for date, amount in dates.items()}
# The normalized locations.
locations = []

# Country for this location.
country = item["Country/Region"]
for item in data:
# Filter out all the dates.
dates = dict(
filter(lambda element: date_util.is_date(element[0]), item.items()))

# Latest data insert value.
latest = list(history.values())[-1]
# Make location history from dates.
history = {date: int(float(amount or 0))
for date, amount in dates.items()}

# Normalize the item and append to locations.
locations.append(
{
# General info.
"country": country,
"country_code": countries.country_code(country),
"province": item["Province/State"],
# Coordinates.
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
# History.
"history": history,
# Latest statistic.
"latest": int(latest or 0),
}
)
LOGGER.debug(f"{data_id} Data normalized")
# Country for this location.
country = item["Country/Region"]

# Latest total.
latest = sum(map(lambda location: location["latest"], locations))
# Latest data insert value.
latest = list(history.values())[-1]

# Return the final data.
results = {
"locations": locations,
"latest": latest,
"last_updated": datetime.utcnow().isoformat() + "Z",
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
}
# save the results to distributed cache
await load_cache(data_id, results)

LOGGER.info(f"{data_id} results:\n{pf(results, depth=1)}")
# Normalize the item and append to locations.
locations.append(
{
# General info.
"country": country,
"country_code": countries.country_code(country),
"province": item["Province/State"],
# Coordinates.
"coordinates": {"lat": item["Lat"], "long": item["Long"], },
# History.
"history": history,
# Latest statistic.
"latest": int(latest or 0),
}
)
LOGGER.debug(f"{data_id} Data normalized")

# Latest total.
latest = sum(map(lambda location: location["latest"], locations))

# Return the final data.
results = {
"locations": locations,
"latest": latest,
"last_updated": datetime.utcnow().isoformat() + "Z",
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
}
# save the results to distributed cache
return results


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves the locations from the categories. The locations are cached for 1 hour.

:returns: The locations.
:rtype: List[Location]
"""
data_id = "jhu.locations"
LOGGER.info(f"pid:{PID}: {data_id} Requesting data...")
# Get all of the data categories locations.
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
Expand Down Expand Up @@ -178,7 +165,8 @@ async def get_locations():
location["country"],
location["province"],
# Coordinates.
Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]),
Coordinates(
latitude=coordinates["lat"], longitude=coordinates["long"]),
# Last update.
datetime.utcnow().isoformat() + "Z",
# Timelines (parse dates as ISO).
Expand All @@ -204,7 +192,6 @@ async def get_locations():
},
)
)
LOGGER.info(f"{data_id} Data normalized")

# Finally, return the locations.
return locations
Expand Down
Loading