Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 81 additions & 69 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CSBSLocationService(LocationService):

async def get_all(self):
# Get the locations.
locations = await get_locations()
locations = await CSBSGateway.get_instance().get_locations()
return locations

async def get(self, loc_id): # pylint: disable=arguments-differ
Expand All @@ -31,72 +31,84 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
return locations[loc_id]


# Base URL for fetching data
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
class CSBSGateway(LocationGateway):
__instance = None

@staticmethod
def get_instance():
if CSBSGateway.__instance is None:
CSBSGateway.__instance = CSBSGateway()

return CSBSGateway.__instance

def __init__(self):
self.BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"

if CSBSGateway.__instance is not None:
raise Exception("The system already has one instance")

@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations(self):
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(self.BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
Loading