diff --git a/app/services/caching/location_caching_proxy.py b/app/services/caching/location_caching_proxy.py new file mode 100644 index 00000000..bc51593c --- /dev/null +++ b/app/services/caching/location_caching_proxy.py @@ -0,0 +1,30 @@ + + +from datetime import date, datetime + +import requests + +CACHE_TIME_MIN = 20 + + +class LocationCachingProxy: + + def __init__(self) -> None: + self.cache = { + } + + def cached_get(self, request_url): + cached_data = self.cache.get(request_url) + + if cached_data: + cache_time = cached_data[0] + if (datetime.now - datetime.timedelta(minutes=CACHE_TIME_MIN)) < cache_time: + return cached_data[1] + + new_data = self.requests.get(request_url) + self.cache[request_url] = (datetime.now(), new_data) + + return new_data + + +PROXY = LocationCachingProxy() diff --git a/app/services/location/csbs.py b/app/services/location/csbs.py index 444ebad6..7b2e082a 100644 --- a/app/services/location/csbs.py +++ b/app/services/location/csbs.py @@ -3,14 +3,13 @@ import logging from datetime import datetime -from asyncache import cached -from cachetools import TTLCache from ...caches import check_cache, load_cache from ...coordinates import Coordinates from ...location.csbs import CSBSLocation from ...utils import httputils from . import LocationService +from ...caches import PROXY LOGGER = logging.getLogger("services.location.csbs") @@ -35,7 +34,6 @@ async def get(self, loc_id): # pylint: disable=arguments-differ BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv" -@cached(cache=TTLCache(maxsize=1, ttl=1800)) async def get_locations(): """ Retrieves county locations; locations are cached for 1 hour @@ -43,60 +41,42 @@ async def get_locations(): :returns: The locations. :rtype: dict """ - data_id = "csbs.locations" - LOGGER.info(f"{data_id} Requesting data...") # check shared cache - cache_results = await check_cache(data_id) - if cache_results: - LOGGER.info(f"{data_id} using shared cache results") - locations = cache_results - else: - LOGGER.info(f"{data_id} shared cache empty") - async with httputils.CLIENT_SESSION.get(BASE_URL) as response: - text = await response.text() - - LOGGER.debug(f"{data_id} Data received") - - data = list(csv.DictReader(text.splitlines())) - LOGGER.debug(f"{data_id} CSV parsed") - - locations = [] - - for i, item in enumerate(data): - # General info. - state = item["State Name"] - county = item["County Name"] - - # Ensure country is specified. - if county in {"Unassigned", "Unknown"}: - continue - - # Date string without "EDT" at end. - last_update = " ".join(item["Last Update"].split(" ")[0:2]) - - # Append to locations. - locations.append( - CSBSLocation( - # General info. - i, - state, - county, - # Coordinates. - Coordinates(item["Latitude"], item["Longitude"]), - # Last update (parse as ISO). - datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z", - # Statistics. - int(item["Confirmed"] or 0), - int(item["Death"] or 0), - ) + result = PROXY.cached_get(BASE_URL) + + data = list(csv.DictReader(result.text.splitlines())) + + locations = [] + + for i, item in enumerate(data): + # General info. + state = item["State Name"] + county = item["County Name"] + + # Ensure country is specified. + if county in {"Unassigned", "Unknown"}: + continue + + # Date string without "EDT" at end. + last_update = " ".join(item["Last Update"].split(" ")[0:2]) + + # Append to locations. + locations.append( + CSBSLocation( + # General info. + i, + state, + county, + # Coordinates. + Coordinates(item["Latitude"], item["Longitude"]), + # Last update (parse as ISO). + datetime.strptime( + last_update, "%Y-%m-%d %H:%M").isoformat() + "Z", + # Statistics. + int(item["Confirmed"] or 0), + int(item["Death"] or 0), ) - LOGGER.info(f"{data_id} Data normalized") - # save the results to distributed cache - # TODO: fix json serialization - try: - await load_cache(data_id, locations) - except TypeError as type_err: - LOGGER.error(type_err) + ) # Return the locations. return locations diff --git a/app/services/location/jhu.py b/app/services/location/jhu.py index ebed3960..d7459d99 100644 --- a/app/services/location/jhu.py +++ b/app/services/location/jhu.py @@ -5,8 +5,6 @@ from datetime import datetime from pprint import pformat as pf -from asyncache import cached -from cachetools import TTLCache from ...caches import check_cache, load_cache from ...coordinates import Coordinates @@ -16,6 +14,8 @@ from ...utils import date as date_util from ...utils import httputils from . import LocationService +from ...caches import PROXY + LOGGER = logging.getLogger("services.location.jhu") PID = os.getpid() @@ -44,7 +44,6 @@ async def get(self, loc_id): # pylint: disable=arguments-differ BASE_URL = "https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/" -@cached(cache=TTLCache(maxsize=4, ttl=1800)) async def get_category(category): """ Retrieves the data for the provided category. The data is cached for 30 minutes locally, 1 hour via shared Redis. @@ -57,77 +56,67 @@ async def get_category(category): data_id = f"jhu.{category}" # check shared cache - cache_results = await check_cache(data_id) - if cache_results: - LOGGER.info(f"{data_id} using shared cache results") - results = cache_results - else: - LOGGER.info(f"{data_id} shared cache empty") - # URL to request data from. - url = BASE_URL + "time_series_covid19_%s_global.csv" % category - - # Request the data - LOGGER.info(f"{data_id} Requesting data...") - async with httputils.CLIENT_SESSION.get(url) as response: - text = await response.text() - LOGGER.debug(f"{data_id} Data received") + # URL to request data from. + url = BASE_URL + "time_series_covid19_%s_global.csv" % category - # Parse the CSV. - data = list(csv.DictReader(text.splitlines())) - LOGGER.debug(f"{data_id} CSV parsed") + # Request the data - # The normalized locations. - locations = [] + result = PROXY.cached_get(url) - for item in data: - # Filter out all the dates. - dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items())) + # Parse the CSV. + data = list(csv.DictReader(result.text.splitlines())) + LOGGER.debug(f"{data_id} CSV parsed") - # Make location history from dates. - history = {date: int(float(amount or 0)) for date, amount in dates.items()} + # The normalized locations. + locations = [] - # Country for this location. - country = item["Country/Region"] + for item in data: + # Filter out all the dates. + dates = dict( + filter(lambda element: date_util.is_date(element[0]), item.items())) - # Latest data insert value. - latest = list(history.values())[-1] + # Make location history from dates. + history = {date: int(float(amount or 0)) + for date, amount in dates.items()} - # Normalize the item and append to locations. - locations.append( - { - # General info. - "country": country, - "country_code": countries.country_code(country), - "province": item["Province/State"], - # Coordinates. - "coordinates": {"lat": item["Lat"], "long": item["Long"],}, - # History. - "history": history, - # Latest statistic. - "latest": int(latest or 0), - } - ) - LOGGER.debug(f"{data_id} Data normalized") + # Country for this location. + country = item["Country/Region"] - # Latest total. - latest = sum(map(lambda location: location["latest"], locations)) + # Latest data insert value. + latest = list(history.values())[-1] - # Return the final data. - results = { - "locations": locations, - "latest": latest, - "last_updated": datetime.utcnow().isoformat() + "Z", - "source": "https://github.com/ExpDev07/coronavirus-tracker-api", - } - # save the results to distributed cache - await load_cache(data_id, results) - - LOGGER.info(f"{data_id} results:\n{pf(results, depth=1)}") + # Normalize the item and append to locations. + locations.append( + { + # General info. + "country": country, + "country_code": countries.country_code(country), + "province": item["Province/State"], + # Coordinates. + "coordinates": {"lat": item["Lat"], "long": item["Long"], }, + # History. + "history": history, + # Latest statistic. + "latest": int(latest or 0), + } + ) + LOGGER.debug(f"{data_id} Data normalized") + + # Latest total. + latest = sum(map(lambda location: location["latest"], locations)) + + # Return the final data. + results = { + "locations": locations, + "latest": latest, + "last_updated": datetime.utcnow().isoformat() + "Z", + "source": "https://github.com/ExpDev07/coronavirus-tracker-api", + } + # save the results to distributed cache return results -@cached(cache=TTLCache(maxsize=1, ttl=1800)) async def get_locations(): """ Retrieves the locations from the categories. The locations are cached for 1 hour. @@ -135,8 +124,6 @@ async def get_locations(): :returns: The locations. :rtype: List[Location] """ - data_id = "jhu.locations" - LOGGER.info(f"pid:{PID}: {data_id} Requesting data...") # Get all of the data categories locations. confirmed = await get_category("confirmed") deaths = await get_category("deaths") @@ -178,7 +165,8 @@ async def get_locations(): location["country"], location["province"], # Coordinates. - Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]), + Coordinates( + latitude=coordinates["lat"], longitude=coordinates["long"]), # Last update. datetime.utcnow().isoformat() + "Z", # Timelines (parse dates as ISO). @@ -204,7 +192,6 @@ async def get_locations(): }, ) ) - LOGGER.info(f"{data_id} Data normalized") # Finally, return the locations. return locations diff --git a/app/services/location/nyt.py b/app/services/location/nyt.py index 1f25ec34..23a531f8 100644 --- a/app/services/location/nyt.py +++ b/app/services/location/nyt.py @@ -1,11 +1,9 @@ """app.services.location.nyt.py""" +from app.services.caching.location_caching_proxy import PROXY import csv import logging from datetime import datetime -from asyncache import cached -from cachetools import TTLCache - from ...caches import check_cache, load_cache from ...coordinates import Coordinates from ...location.nyt import NYTLocation @@ -67,7 +65,6 @@ def get_grouped_locations_dict(data): return grouped_locations -@cached(cache=TTLCache(maxsize=1, ttl=1800)) async def get_locations(): """ Returns a list containing parsed NYT data by US county. The data is cached for 1 hour. @@ -75,71 +72,53 @@ async def get_locations(): :returns: The complete data for US Counties. :rtype: dict """ - data_id = "nyt.locations" - # Request the data. - LOGGER.info(f"{data_id} Requesting data...") - # check shared cache - cache_results = await check_cache(data_id) - if cache_results: - LOGGER.info(f"{data_id} using shared cache results") - locations = cache_results - else: - LOGGER.info(f"{data_id} shared cache empty") - async with httputils.CLIENT_SESSION.get(BASE_URL) as response: - text = await response.text() - - LOGGER.debug(f"{data_id} Data received") - - # Parse the CSV. - data = list(csv.DictReader(text.splitlines())) - LOGGER.debug(f"{data_id} CSV parsed") - - # Group together locations (NYT data ordered by dates not location). - grouped_locations = get_grouped_locations_dict(data) - - # The normalized locations. - locations = [] - - for idx, (county_state, histories) in enumerate(grouped_locations.items()): - # Make location history for confirmed and deaths from dates. - # List is tuples of (date, amount) in order of increasing dates. - confirmed_list = histories["confirmed"] - confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list} - - deaths_list = histories["deaths"] - deaths_history = {date: int(amount or 0) for date, amount in deaths_list} - - # Normalize the item and append to locations. - locations.append( - NYTLocation( - id=idx, - state=county_state[1], - county=county_state[0], - coordinates=Coordinates(None, None), # NYT does not provide coordinates - last_updated=datetime.utcnow().isoformat() + "Z", # since last request - timelines={ - "confirmed": Timeline( - timeline={ - datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount - for date, amount in confirmed_history.items() - } - ), - "deaths": Timeline( - timeline={ - datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount - for date, amount in deaths_history.items() - } - ), - "recovered": Timeline(), - }, - ) + + result = PROXY.cached_get(BASE_URL) + + data = list(csv.DictReader(result.text.splitlines())) + + # Group together locations (NYT data ordered by dates not location). + grouped_locations = get_grouped_locations_dict(data) + + # The normalized locations. + locations = [] + + for idx, (county_state, histories) in enumerate(grouped_locations.items()): + # Make location history for confirmed and deaths from dates. + # List is tuples of (date, amount) in order of increasing dates. + confirmed_list = histories["confirmed"] + confirmed_history = {date: int(amount or 0) + for date, amount in confirmed_list} + + deaths_list = histories["deaths"] + deaths_history = {date: int(amount or 0) + for date, amount in deaths_list} + + # Normalize the item and append to locations. + locations.append( + NYTLocation( + id=idx, + state=county_state[1], + county=county_state[0], + # NYT does not provide coordinates + coordinates=Coordinates(None, None), + last_updated=datetime.utcnow().isoformat() + "Z", # since last request + timelines={ + "confirmed": Timeline( + timeline={ + datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount + for date, amount in confirmed_history.items() + } + ), + "deaths": Timeline( + timeline={ + datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount + for date, amount in deaths_history.items() + } + ), + "recovered": Timeline(), + }, ) - LOGGER.info(f"{data_id} Data normalized") - # save the results to distributed cache - # TODO: fix json serialization - try: - await load_cache(data_id, locations) - except TypeError as type_err: - LOGGER.error(type_err) + ) return locations