Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from .config import get_settings
from .data import data_source
from .routers import V1, V2
from .utils.httputils import setup_client_session, teardown_client_session

from services.ProcessSystem import ProcessSystem

# ############
# FastAPI App
Expand All @@ -37,8 +38,8 @@
version="2.0.4",
docs_url="/",
redoc_url="/docs",
on_startup=[setup_client_session],
on_shutdown=[teardown_client_session],
on_startup=[ProcessSystem.get_instance().setup_client_session()],
on_shutdown=[ProcessSystem.get_instance().teardown_client_session()],
)

# #####################
Expand Down
14 changes: 7 additions & 7 deletions app/routers/v1.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""app.routers.v1.py"""
from fastapi import APIRouter

from ..services.location.jhu import get_category
from ..services.location.jhu import JHUGateway

V1 = APIRouter()


@V1.get("/all")
async def all_categories():
"""Get all the categories."""
confirmed = await get_category("confirmed")
deaths = await get_category("deaths")
recovered = await get_category("recovered")
confirmed = await JHUGateway.get_instance().get_category("confirmed")
deaths = await JHUGateway.get_instance().get_category("deaths")
recovered = await JHUGateway.get_instance().get_category("recovered")

return {
# Data.
Expand All @@ -30,22 +30,22 @@ async def all_categories():
@V1.get("/confirmed")
async def get_confirmed():
"""Confirmed cases."""
confirmed_data = await get_category("confirmed")
confirmed_data = await JHUGateway.get_instance().get_category("confirmed")

return confirmed_data


@V1.get("/deaths")
async def get_deaths():
"""Total deaths."""
deaths_data = await get_category("deaths")
deaths_data = await JHUGateway.get_instance().get_category("deaths")

return deaths_data


@V1.get("/recovered")
async def get_recovered():
"""Recovered cases."""
recovered_data = await get_category("recovered")
recovered_data = await JHUGateway.get_instance().get_category("recovered")

return recovered_data
80 changes: 80 additions & 0 deletions app/services/ProcessSystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from location.csbs import CSBSLocationService
from location.jhu import JhuLocationService
from location.nyt import NYTLocationService

from aiohttp import ClientSession

class ProcessSystem():
"""
provide all the functionalities related to the system

use singleton and facade pattern

"""
__instance = None

@staticmethod
def get_instance():

if ProcessSystem.__instance is None:
ProcessSystem.__instance = ProcessSystem()

return ProcessSystem.__instance


def __init__(self):

if ProcessSystem.__instance is not None:
raise Exception("Process system only has one instance")

self.data_sources = {
"jhu": JhuLocationService(),
"csbs": CSBSLocationService(),
"nyt": NYTLocationService()
}

self.CLIENT_SESSION = None

async def get_all_locations(self, source: str):
"""
Extract all the locations in one source
"""

if source not in self.data_sources:
return None
return await self.data_sources[source].get_all()

async def get_specify_location(self, source: str, id):
"""
Extract one specific location based on the id
"""

if source not in self.data_sources:
return None
return await self.data_sources[source].get(id)


def get_all_sources(self):
"""
return the current sources of the system
"""
return list(self.data_sources.keys())

"""
http util related functions
"""

async def setup_client_session(self):
self.CLIENT_SESSION = ClientSession()

async def teardown_client_session(self):
await self.CLIENT_SESSION.close()

def get_client_session(self):
return self.CLIENT_SESSION






152 changes: 82 additions & 70 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.csbs import CSBSLocation
from ...utils import httputils
from ..ProcessSystem import ProcessSystem
from . import LocationService

LOGGER = logging.getLogger("services.location.csbs")
Expand All @@ -22,7 +22,7 @@ class CSBSLocationService(LocationService):

async def get_all(self):
# Get the locations.
locations = await get_locations()
locations = await CSBSGateway.get_instance().get_locations()
return locations

async def get(self, loc_id): # pylint: disable=arguments-differ
Expand All @@ -31,72 +31,84 @@ async def get(self, loc_id): # pylint: disable=arguments-differ
return locations[loc_id]


# Base URL for fetching data
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
class CSBSGateway(LocationGateway):
__instance = None

@staticmethod
def get_instance():
if CSBSGateway.__instance is None:
CSBSGateway.__instance = CSBSGateway()

return CSBSGateway.__instance

def __init__(self):
self.BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"

if CSBSGateway.__instance is not None:
raise Exception("The system already has one instance")

@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations(self):
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with ProcessSystem.get_instance().get_client_session().get(self.BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
Loading