diff --git a/app/data/__init__.py b/app/data/__init__.py index 60a75dac..7ba2712f 100644 --- a/app/data/__init__.py +++ b/app/data/__init__.py @@ -1,14 +1,8 @@ """app.data""" -from ..services.location.csbs import CSBSLocationService -from ..services.location.jhu import JhuLocationService -from ..services.location.nyt import NYTLocationService +from ..services.location import getDataSources # Mapping of services to data-sources. -DATA_SOURCES = { - "jhu": JhuLocationService(), - "csbs": CSBSLocationService(), - "nyt": NYTLocationService(), -} +DATA_SOURCES = getDataSources() def data_source(source): diff --git a/app/services/location/__init__.py b/app/services/location/__init__.py index 6d292b54..9565b68b 100644 --- a/app/services/location/__init__.py +++ b/app/services/location/__init__.py @@ -7,22 +7,22 @@ class LocationService(ABC): Service for retrieving locations. """ - @abstractmethod async def get_all(self): - """ - Gets and returns all of the locations. + # Get the locations. + locations = await self.get_locations() + return locations - :returns: The locations. - :rtype: List[Location] - """ - raise NotImplementedError + async def get(self, loc_id): # pylint: disable=arguments-differ + # Get location at the index equal to the provided id. + locations = await self.get_all() + return locations[loc_id] @abstractmethod - async def get(self, id): # pylint: disable=redefined-builtin,invalid-name - """ - Gets and returns location with the provided id. - - :returns: The location. - :rtype: Location - """ + async def get_locations(self): raise NotImplementedError + +def getDataSources(): + from .csbs import CSBSLocationService + from .jhu import JhuLocationService + from .nyt import NYTLocationService + return {"jhu": JhuLocationService(), "csbs": CSBSLocationService(), "nyt": NYTLocationService()} \ No newline at end of file diff --git a/app/services/location/csbs.py b/app/services/location/csbs.py index 444ebad6..53a3de15 100644 --- a/app/services/location/csbs.py +++ b/app/services/location/csbs.py @@ -14,89 +14,75 @@ LOGGER = logging.getLogger("services.location.csbs") - -class CSBSLocationService(LocationService): - """ - Service for retrieving locations from csbs - """ - - async def get_all(self): - # Get the locations. - locations = await get_locations() - return locations - - async def get(self, loc_id): # pylint: disable=arguments-differ - # Get location at the index equal to the provided id. - locations = await self.get_all() - return locations[loc_id] - - # Base URL for fetching data BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv" - -@cached(cache=TTLCache(maxsize=1, ttl=1800)) -async def get_locations(): +class CSBSLocationService(LocationService): """ - Retrieves county locations; locations are cached for 1 hour - - :returns: The locations. - :rtype: dict + Service for retrieving locations from csbs """ - data_id = "csbs.locations" - LOGGER.info(f"{data_id} Requesting data...") - # check shared cache - cache_results = await check_cache(data_id) - if cache_results: - LOGGER.info(f"{data_id} using shared cache results") - locations = cache_results - else: - LOGGER.info(f"{data_id} shared cache empty") - async with httputils.CLIENT_SESSION.get(BASE_URL) as response: - text = await response.text() - - LOGGER.debug(f"{data_id} Data received") - - data = list(csv.DictReader(text.splitlines())) - LOGGER.debug(f"{data_id} CSV parsed") - - locations = [] - - for i, item in enumerate(data): - # General info. - state = item["State Name"] - county = item["County Name"] - - # Ensure country is specified. - if county in {"Unassigned", "Unknown"}: - continue - - # Date string without "EDT" at end. - last_update = " ".join(item["Last Update"].split(" ")[0:2]) - - # Append to locations. - locations.append( - CSBSLocation( - # General info. - i, - state, - county, - # Coordinates. - Coordinates(item["Latitude"], item["Longitude"]), - # Last update (parse as ISO). - datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z", - # Statistics. - int(item["Confirmed"] or 0), - int(item["Death"] or 0), + @cached(cache=TTLCache(maxsize=1, ttl=1800)) + async def get_locations(self): + """ + Retrieves county locations; locations are cached for 1 hour + + :returns: The locations. + :rtype: dict + """ + data_id = "csbs.locations" + LOGGER.info(f"{data_id} Requesting data...") + # check shared cache + cache_results = await check_cache(data_id) + if cache_results: + LOGGER.info(f"{data_id} using shared cache results") + locations = cache_results + else: + LOGGER.info(f"{data_id} shared cache empty") + async with httputils.CLIENT_SESSION.get(BASE_URL) as response: + text = await response.text() + + LOGGER.debug(f"{data_id} Data received") + + data = list(csv.DictReader(text.splitlines())) + LOGGER.debug(f"{data_id} CSV parsed") + + locations = [] + + for i, item in enumerate(data): + # General info. + state = item["State Name"] + county = item["County Name"] + + # Ensure country is specified. + if county in {"Unassigned", "Unknown"}: + continue + + # Date string without "EDT" at end. + last_update = " ".join(item["Last Update"].split(" ")[0:2]) + + # Append to locations. + locations.append( + CSBSLocation( + # General info. + i, + state, + county, + # Coordinates. + Coordinates(item["Latitude"], item["Longitude"]), + # Last update (parse as ISO). + datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z", + # Statistics. + int(item["Confirmed"] or 0), + int(item["Death"] or 0), + ) ) - ) - LOGGER.info(f"{data_id} Data normalized") - # save the results to distributed cache - # TODO: fix json serialization - try: - await load_cache(data_id, locations) - except TypeError as type_err: - LOGGER.error(type_err) - - # Return the locations. - return locations + LOGGER.info(f"{data_id} Data normalized") + # save the results to distributed cache + # TODO: fix json serialization + try: + await load_cache(data_id, locations) + except TypeError as type_err: + LOGGER.error(type_err) + + # Return the locations. + return locations diff --git a/app/services/location/jhu.py b/app/services/location/jhu.py index ebed3960..80b9eeef 100644 --- a/app/services/location/jhu.py +++ b/app/services/location/jhu.py @@ -26,15 +26,88 @@ class JhuLocationService(LocationService): Service for retrieving locations from Johns Hopkins CSSE (https://github.com/CSSEGISandData/COVID-19). """ - async def get_all(self): - # Get the locations. - locations = await get_locations() + @cached(cache=TTLCache(maxsize=1, ttl=1800)) + async def get_locations(self): + """ + Retrieves the locations from the categories. The locations are cached for 1 hour. + + :returns: The locations. + :rtype: List[Location] + """ + data_id = "jhu.locations" + LOGGER.info(f"pid:{PID}: {data_id} Requesting data...") + # Get all of the data categories locations. + confirmed = await get_category("confirmed") + deaths = await get_category("deaths") + recovered = await get_category("recovered") + + locations_confirmed = confirmed["locations"] + locations_deaths = deaths["locations"] + locations_recovered = recovered["locations"] + + # Final locations to return. + locations = [] + # *************************************************************************** + # TODO: This iteration approach assumes the indexes remain the same + # and opens us to a CRITICAL ERROR. The removal of a column in the data source + # would break the API or SHIFT all the data confirmed, deaths, recovery producting + # incorrect data to consumers. + # *************************************************************************** + # Go through locations. + for index, location in enumerate(locations_confirmed): + # Get the timelines. + + # TEMP: Fix for merging recovery data. See TODO above for more details. + key = (location["country"], location["province"]) + + timelines = { + "confirmed": location["history"], + "deaths": parse_history(key, locations_deaths, index), + "recovered": parse_history(key, locations_recovered, index), + } + + # Grab coordinates. + coordinates = location["coordinates"] + + # Create location (supporting timelines) and append. + locations.append( + TimelinedLocation( + # General info. + index, + location["country"], + location["province"], + # Coordinates. + Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]), + # Last update. + datetime.utcnow().isoformat() + "Z", + # Timelines (parse dates as ISO). + { + "confirmed": Timeline( + timeline={ + datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount + for date, amount in timelines["confirmed"].items() + } + ), + "deaths": Timeline( + timeline={ + datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount + for date, amount in timelines["deaths"].items() + } + ), + "recovered": Timeline( + timeline={ + datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount + for date, amount in timelines["recovered"].items() + } + ), + }, + ) + ) + LOGGER.info(f"{data_id} Data normalized") + + # Finally, return the locations. return locations - async def get(self, loc_id): # pylint: disable=arguments-differ - # Get location at the index equal to provided id. - locations = await self.get_all() - return locations[loc_id] # --------------------------------------------------------------- @@ -127,87 +200,6 @@ async def get_category(category): return results -@cached(cache=TTLCache(maxsize=1, ttl=1800)) -async def get_locations(): - """ - Retrieves the locations from the categories. The locations are cached for 1 hour. - - :returns: The locations. - :rtype: List[Location] - """ - data_id = "jhu.locations" - LOGGER.info(f"pid:{PID}: {data_id} Requesting data...") - # Get all of the data categories locations. - confirmed = await get_category("confirmed") - deaths = await get_category("deaths") - recovered = await get_category("recovered") - - locations_confirmed = confirmed["locations"] - locations_deaths = deaths["locations"] - locations_recovered = recovered["locations"] - - # Final locations to return. - locations = [] - # *************************************************************************** - # TODO: This iteration approach assumes the indexes remain the same - # and opens us to a CRITICAL ERROR. The removal of a column in the data source - # would break the API or SHIFT all the data confirmed, deaths, recovery producting - # incorrect data to consumers. - # *************************************************************************** - # Go through locations. - for index, location in enumerate(locations_confirmed): - # Get the timelines. - - # TEMP: Fix for merging recovery data. See TODO above for more details. - key = (location["country"], location["province"]) - - timelines = { - "confirmed": location["history"], - "deaths": parse_history(key, locations_deaths, index), - "recovered": parse_history(key, locations_recovered, index), - } - - # Grab coordinates. - coordinates = location["coordinates"] - - # Create location (supporting timelines) and append. - locations.append( - TimelinedLocation( - # General info. - index, - location["country"], - location["province"], - # Coordinates. - Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]), - # Last update. - datetime.utcnow().isoformat() + "Z", - # Timelines (parse dates as ISO). - { - "confirmed": Timeline( - timeline={ - datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount - for date, amount in timelines["confirmed"].items() - } - ), - "deaths": Timeline( - timeline={ - datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount - for date, amount in timelines["deaths"].items() - } - ), - "recovered": Timeline( - timeline={ - datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount - for date, amount in timelines["recovered"].items() - } - ), - }, - ) - ) - LOGGER.info(f"{data_id} Data normalized") - - # Finally, return the locations. - return locations def parse_history(key: tuple, locations: list, index: int): diff --git a/app/services/location/nyt.py b/app/services/location/nyt.py index 1f25ec34..d6a3b419 100644 --- a/app/services/location/nyt.py +++ b/app/services/location/nyt.py @@ -15,21 +15,88 @@ LOGGER = logging.getLogger("services.location.nyt") - class NYTLocationService(LocationService): """ Service for retrieving locations from New York Times (https://github.com/nytimes/covid-19-data). """ - async def get_all(self): - # Get the locations. - locations = await get_locations() + @cached(cache=TTLCache(maxsize=1, ttl=1800)) + async def get_locations(self): + """ + Returns a list containing parsed NYT data by US county. The data is cached for 1 hour. + + :returns: The complete data for US Counties. + :rtype: dict + """ + data_id = "nyt.locations" + # Request the data. + LOGGER.info(f"{data_id} Requesting data...") + # check shared cache + cache_results = await check_cache(data_id) + if cache_results: + LOGGER.info(f"{data_id} using shared cache results") + locations = cache_results + else: + LOGGER.info(f"{data_id} shared cache empty") + async with httputils.CLIENT_SESSION.get(BASE_URL) as response: + text = await response.text() + + LOGGER.debug(f"{data_id} Data received") + + # Parse the CSV. + data = list(csv.DictReader(text.splitlines())) + LOGGER.debug(f"{data_id} CSV parsed") + + # Group together locations (NYT data ordered by dates not location). + grouped_locations = get_grouped_locations_dict(data) + + # The normalized locations. + locations = [] + + for idx, (county_state, histories) in enumerate(grouped_locations.items()): + # Make location history for confirmed and deaths from dates. + # List is tuples of (date, amount) in order of increasing dates. + confirmed_list = histories["confirmed"] + confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list} + + deaths_list = histories["deaths"] + deaths_history = {date: int(amount or 0) for date, amount in deaths_list} + + # Normalize the item and append to locations. + locations.append( + NYTLocation( + id=idx, + state=county_state[1], + county=county_state[0], + coordinates=Coordinates(None, None), # NYT does not provide coordinates + last_updated=datetime.utcnow().isoformat() + "Z", # since last request + timelines={ + "confirmed": Timeline( + timeline={ + datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount + for date, amount in confirmed_history.items() + } + ), + "deaths": Timeline( + timeline={ + datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount + for date, amount in deaths_history.items() + } + ), + "recovered": Timeline(), + }, + ) + ) + LOGGER.info(f"{data_id} Data normalized") + # save the results to distributed cache + # TODO: fix json serialization + try: + await load_cache(data_id, locations) + except TypeError as type_err: + LOGGER.error(type_err) + return locations - async def get(self, loc_id): # pylint: disable=arguments-differ - # Get location at the index equal to provided id. - locations = await self.get_all() - return locations[loc_id] # --------------------------------------------------------------- @@ -67,79 +134,3 @@ def get_grouped_locations_dict(data): return grouped_locations -@cached(cache=TTLCache(maxsize=1, ttl=1800)) -async def get_locations(): - """ - Returns a list containing parsed NYT data by US county. The data is cached for 1 hour. - - :returns: The complete data for US Counties. - :rtype: dict - """ - data_id = "nyt.locations" - # Request the data. - LOGGER.info(f"{data_id} Requesting data...") - # check shared cache - cache_results = await check_cache(data_id) - if cache_results: - LOGGER.info(f"{data_id} using shared cache results") - locations = cache_results - else: - LOGGER.info(f"{data_id} shared cache empty") - async with httputils.CLIENT_SESSION.get(BASE_URL) as response: - text = await response.text() - - LOGGER.debug(f"{data_id} Data received") - - # Parse the CSV. - data = list(csv.DictReader(text.splitlines())) - LOGGER.debug(f"{data_id} CSV parsed") - - # Group together locations (NYT data ordered by dates not location). - grouped_locations = get_grouped_locations_dict(data) - - # The normalized locations. - locations = [] - - for idx, (county_state, histories) in enumerate(grouped_locations.items()): - # Make location history for confirmed and deaths from dates. - # List is tuples of (date, amount) in order of increasing dates. - confirmed_list = histories["confirmed"] - confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list} - - deaths_list = histories["deaths"] - deaths_history = {date: int(amount or 0) for date, amount in deaths_list} - - # Normalize the item and append to locations. - locations.append( - NYTLocation( - id=idx, - state=county_state[1], - county=county_state[0], - coordinates=Coordinates(None, None), # NYT does not provide coordinates - last_updated=datetime.utcnow().isoformat() + "Z", # since last request - timelines={ - "confirmed": Timeline( - timeline={ - datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount - for date, amount in confirmed_history.items() - } - ), - "deaths": Timeline( - timeline={ - datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount - for date, amount in deaths_history.items() - } - ), - "recovered": Timeline(), - }, - ) - ) - LOGGER.info(f"{data_id} Data normalized") - # save the results to distributed cache - # TODO: fix json serialization - try: - await load_cache(data_id, locations) - except TypeError as type_err: - LOGGER.error(type_err) - - return locations diff --git a/tests/test_csbs.py b/tests/test_csbs.py index 828a5b65..b8c547de 100644 --- a/tests/test_csbs.py +++ b/tests/test_csbs.py @@ -27,7 +27,7 @@ def read_file(self): @pytest.mark.asyncio async def test_get_locations(mock_client_session): - data = await csbs.get_locations() + data = await csbs.CSBSLocationService().get_locations() assert isinstance(data, list) diff --git a/tests/test_jhu.py b/tests/test_jhu.py index 65f960db..fd1709b7 100644 --- a/tests/test_jhu.py +++ b/tests/test_jhu.py @@ -14,7 +14,7 @@ async def test_get_locations(mock_client_session): with mock.patch("app.services.location.jhu.datetime") as mock_datetime: mock_datetime.utcnow.return_value.isoformat.return_value = DATETIME_STRING mock_datetime.strptime.side_effect = mocked_strptime_isoformat - output = await jhu.get_locations() + output = await jhu.JhuLocationService().get_locations() assert isinstance(output, list) assert isinstance(output[0], location.Location) diff --git a/tests/test_nyt.py b/tests/test_nyt.py index ca9c9dca..b2a69a2c 100644 --- a/tests/test_nyt.py +++ b/tests/test_nyt.py @@ -16,7 +16,7 @@ async def test_get_locations(mock_client_session): with mock.patch("app.services.location.nyt.datetime") as mock_datetime: mock_datetime.utcnow.return_value.isoformat.return_value = DATETIME_STRING mock_datetime.strptime.side_effect = mocked_strptime_isoformat - locations = await nyt.get_locations() + locations = await nyt.NYTLocationService().get_locations() assert isinstance(locations, list)