Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions app/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
"""app.data"""
from ..services.location.csbs import CSBSLocationService
from ..services.location.jhu import JhuLocationService
from ..services.location.nyt import NYTLocationService
from ..services.location.csbs import CSBSGateway
from ..services.location.jhu import JHUGateway
from ..services.location.nyt import NYTGateway

from ..services.location import LocationGateway, LocationService



class ServiceFactory:

def create_service(self, source_name: str):
source_name = source_name.lower()

gateway: LocationGateway

if source_name == 'jhu':
gateway = JHUGateway("https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/")
elif source_name == 'csbs':
gateway = CSBSGateway("https://facts.csbs.org/covid-19/covid19_county.csv")
elif source_name == 'nyt':
gateway = NYTGateway("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv")

service: LocationService = LocationService(gateway)


# Mapping of services to data-sources.
DATA_SOURCES = {
"jhu": JhuLocationService(),
"csbs": CSBSLocationService(),
"nyt": NYTLocationService(),
"jhu": ServiceFactory().create_service("jhu"),
"csbs": ServiceFactory().create_service("csbs"),
"nyt": ServiceFactory().create_service("nyt"),
}


Expand All @@ -19,3 +40,7 @@ def data_source(source):
:rtype: LocationService
"""
return DATA_SOURCES.get(source.lower())




15 changes: 8 additions & 7 deletions app/routers/v1.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
"""app.routers.v1.py"""
from fastapi import APIRouter

from ..services.location.jhu import get_category
from ..services.location.jhu import JHUGateway

V1 = APIRouter()
gateway = JHUGateway("https://raw.githubusercontent.com/CSSEGISandData/2019-nCoV/master/csse_covid_19_data/csse_covid_19_time_series/")


@V1.get("/all")
async def all_categories():
"""Get all the categories."""
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
recovered = await get_category("recovered")
confirmed = await gateway.get_category("confirmed")
deaths = await gateway.get_category("deaths")
recovered = await gateway.get_category("recovered")

return {
# Data.
Expand All @@ -30,22 +31,22 @@ async def all_categories():
@V1.get("/confirmed")
async def get_confirmed():
"""Confirmed cases."""
confirmed_data = await get_category("confirmed")
confirmed_data = await gateway.get_category("confirmed")

return confirmed_data


@V1.get("/deaths")
async def get_deaths():
"""Total deaths."""
deaths_data = await get_category("deaths")
deaths_data = await gateway.get_category("deaths")

return deaths_data


@V1.get("/recovered")
async def get_recovered():
"""Recovered cases."""
recovered_data = await get_category("recovered")
recovered_data = await gateway.get_category("recovered")

return recovered_data
38 changes: 25 additions & 13 deletions app/services/location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,39 @@
from abc import ABC, abstractmethod


class LocationService(ABC):
class LocationService:
"""
Service for retrieving locations.
"""
"""
Service for retrieving locations from csbs
"""

def __init__(self, gateway: "LocationGateway"):
self.gateway = gateway

@abstractmethod
async def get_all(self):
"""
Gets and returns all of the locations.
# Get the locations.
locations = await self.gateway.get_locations()
return locations

:returns: The locations.
:rtype: List[Location]
"""
raise NotImplementedError
async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to the provided id.
locations = await self.get_all()
return locations[loc_id]

def set_gateway(self, gateway: "LocationGateway"):
self.gateway = gateway


class LocationGateway(ABC):
"""
real processing for all kinds of locations
"""

@abstractmethod
async def get(self, id): # pylint: disable=redefined-builtin,invalid-name
async def get_locations(self):
"""
Gets and returns location with the provided id.

:returns: The location.
:rtype: Location
parse all locations from the datasource
"""
raise NotImplementedError
159 changes: 76 additions & 83 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,93 +10,86 @@
from ...coordinates import Coordinates
from ...location.csbs import CSBSLocation
from ...utils import httputils
from . import LocationService
from . import LocationService, LocationGateway

LOGGER = logging.getLogger("services.location.csbs")


class CSBSLocationService(LocationService):
"""
Service for retrieving locations from csbs
"""

async def get_all(self):
# Get the locations.
locations = await get_locations()
return locations

async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to the provided id.
locations = await self.get_all()
return locations[loc_id]


# Base URL for fetching data
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),

class CSBSGateway(LocationGateway):

def __init__(self, base_url):
self.BASE_URL = base_url

@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations(self):
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(self.BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations





Loading