Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added adapter design pattern for location services
  • Loading branch information
simran7s committed Aug 12, 2021
commit 0719d24fa4fec463a67cda05e4bf5db437fc61b9
134 changes: 67 additions & 67 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,73 +30,73 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
locations = await self.get_all()
return locations[loc_id]

@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(
last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations


# Base URL for fetching data
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
174 changes: 88 additions & 86 deletions app/services/location/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,89 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
locations = await self.get_all()
return locations[loc_id]

@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves the locations from the categories. The locations are cached for 1 hour.

:returns: The locations.
:rtype: List[Location]
"""
data_id = "jhu.locations"
LOGGER.info(f"pid:{PID}: {data_id} Requesting data...")
# Get all of the data categories locations.
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
recovered = await get_category("recovered")

locations_confirmed = confirmed["locations"]
locations_deaths = deaths["locations"]
locations_recovered = recovered["locations"]

# Final locations to return.
locations = []
# ***************************************************************************
# TODO: This iteration approach assumes the indexes remain the same
# and opens us to a CRITICAL ERROR. The removal of a column in the data source
# would break the API or SHIFT all the data confirmed, deaths, recovery producting
# incorrect data to consumers.
# ***************************************************************************
# Go through locations.
for index, location in enumerate(locations_confirmed):
# Get the timelines.

# TEMP: Fix for merging recovery data. See TODO above for more details.
key = (location["country"], location["province"])

timelines = {
"confirmed": location["history"],
"deaths": parse_history(key, locations_deaths, index),
"recovered": parse_history(key, locations_recovered, index),
}

# Grab coordinates.
coordinates = location["coordinates"]

# Create location (supporting timelines) and append.
locations.append(
TimelinedLocation(
# General info.
index,
location["country"],
location["province"],
# Coordinates.
Coordinates(
latitude=coordinates["lat"], longitude=coordinates["long"]),
# Last update.
datetime.utcnow().isoformat() + "Z",
# Timelines (parse dates as ISO).
{
"confirmed": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["confirmed"].items()
}
),
"deaths": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["deaths"].items()
}
),
"recovered": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["recovered"].items()
}
),
},
)
)
LOGGER.info(f"{data_id} Data normalized")

# Finally, return the locations.
return locations


# ---------------------------------------------------------------

Expand Down Expand Up @@ -82,10 +165,12 @@ async def get_category(category):

for item in data:
# Filter out all the dates.
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))
dates = dict(
filter(lambda element: date_util.is_date(element[0]), item.items()))

# Make location history from dates.
history = {date: int(float(amount or 0)) for date, amount in dates.items()}
history = {date: int(float(amount or 0))
for date, amount in dates.items()}

# Country for this location.
country = item["Country/Region"]
Expand All @@ -101,7 +186,7 @@ async def get_category(category):
"country_code": countries.country_code(country),
"province": item["Province/State"],
# Coordinates.
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
"coordinates": {"lat": item["Lat"], "long": item["Long"], },
# History.
"history": history,
# Latest statistic.
Expand All @@ -127,89 +212,6 @@ async def get_category(category):
return results


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves the locations from the categories. The locations are cached for 1 hour.

:returns: The locations.
:rtype: List[Location]
"""
data_id = "jhu.locations"
LOGGER.info(f"pid:{PID}: {data_id} Requesting data...")
# Get all of the data categories locations.
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
recovered = await get_category("recovered")

locations_confirmed = confirmed["locations"]
locations_deaths = deaths["locations"]
locations_recovered = recovered["locations"]

# Final locations to return.
locations = []
# ***************************************************************************
# TODO: This iteration approach assumes the indexes remain the same
# and opens us to a CRITICAL ERROR. The removal of a column in the data source
# would break the API or SHIFT all the data confirmed, deaths, recovery producting
# incorrect data to consumers.
# ***************************************************************************
# Go through locations.
for index, location in enumerate(locations_confirmed):
# Get the timelines.

# TEMP: Fix for merging recovery data. See TODO above for more details.
key = (location["country"], location["province"])

timelines = {
"confirmed": location["history"],
"deaths": parse_history(key, locations_deaths, index),
"recovered": parse_history(key, locations_recovered, index),
}

# Grab coordinates.
coordinates = location["coordinates"]

# Create location (supporting timelines) and append.
locations.append(
TimelinedLocation(
# General info.
index,
location["country"],
location["province"],
# Coordinates.
Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]),
# Last update.
datetime.utcnow().isoformat() + "Z",
# Timelines (parse dates as ISO).
{
"confirmed": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["confirmed"].items()
}
),
"deaths": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["deaths"].items()
}
),
"recovered": Timeline(
timeline={
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
for date, amount in timelines["recovered"].items()
}
),
},
)
)
LOGGER.info(f"{data_id} Data normalized")

# Finally, return the locations.
return locations


def parse_history(key: tuple, locations: list, index: int):
"""
Helper for validating and extracting history content from
Expand Down
23 changes: 23 additions & 0 deletions app/services/location/locationServiceAdapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from services.location import LocationService

"""
Implement Adapter method for LocationService
"""


class Adapter (LocationService):
# def __init__(self):
# self.locations

# async def get_locations(self):
# self.locations = await LocationService.get_locations()

async def get_all(self):
# Get the locations.
locations = await LocationService.get_locations()
return locations

async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to the provided id.
locations = await LocationService.get_locations()
return locations[loc_id]
Loading