Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions app/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
"""app.data"""
from ..services.location.csbs import CSBSLocationService
from ..services.location.jhu import JhuLocationService
from ..services.location.nyt import NYTLocationService
from ..services.location import getDataSources

# Mapping of services to data-sources.
DATA_SOURCES = {
"jhu": JhuLocationService(),
"csbs": CSBSLocationService(),
"nyt": NYTLocationService(),
}
DATA_SOURCES = getDataSources()


def data_source(source):
Expand Down
64 changes: 34 additions & 30 deletions app/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,41 @@
DATA = HERE.joinpath("..", "data").resolve()


def save(
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
) -> pathlib.Path:
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
if isinstance(content, (dict, list)):
content = json.dumps(content, indent=indent, **json_dumps_kwargs)
with open(DATA / name, mode=write_mode) as f_out:
f_out.write(content)
return path


def load(name: str, **json_kwargs) -> Union[str, Dict, List]:
"""Loads content from a file. If file ends with '.json', call json.load() and return a Dictionary."""
path = DATA / name
with open(path) as f_in:
if path.suffix == ".json":
return json.load(f_in, **json_kwargs)
return f_in.read()

class IO:
@classmethod
def __processContent__(cls,content: Union[str, Dict, List],indent: int = 2,**json_dumps_kwargs):
if isinstance(content, (dict, list)):
content = json.dumps(content, indent=indent, **json_dumps_kwargs)
return content

class AIO:
"""Asynsc compatible file io operations."""
@classmethod
def save(
cls,
name: str,
content: Union[str, Dict, List],
write_mode: str = "w",
indent: int = 2,
**json_dumps_kwargs,
) -> pathlib.Path:
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
content = cls.__processContent__(content, indent, **json_dumps_kwargs)
with open(DATA / name, mode=write_mode) as f_out:
f_out.write(content)
return path

@classmethod
async def save(
def load(cls,name: str, **json_kwargs) -> Union[str, Dict, List]:
"""Loads content from a file. If file ends with '.json', call json.load() and return a Dictionary."""
path = DATA / name
with open(path) as f_in:
if path.suffix == ".json":
return json.load(f_in, **json_kwargs)
return f_in.read()

"""Belowe are Asynsc compatible file io operations."""
@classmethod
async def async_save(
cls,
name: str,
content: Union[str, Dict, List],
Expand All @@ -48,14 +53,13 @@ async def save(
):
"""Save content to a file. If content is a dictionary, use json.dumps()."""
path = DATA / name
if isinstance(content, (dict, list)):
content = json.dumps(content, indent=indent, **json_dumps_kwargs)
content = cls.__processContent__(content, indent, **json_dumps_kwargs)
async with aiofiles.open(DATA / name, mode=write_mode) as f_out:
await f_out.write(content)
return path

@classmethod
async def load(cls, name: str, **json_kwargs) -> Union[str, Dict, List]:
async def async_load(cls, name: str, **json_kwargs) -> Union[str, Dict, List]:
"""Loads content from a file. If file ends with '.json', call json.load() and return a Dictionary."""
path = DATA / name
async with aiofiles.open(path) as f_in:
Expand Down
28 changes: 14 additions & 14 deletions app/services/location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ class LocationService(ABC):
Service for retrieving locations.
"""

@abstractmethod
async def get_all(self):
"""
Gets and returns all of the locations.
# Get the locations.
locations = await self.get_locations()
return locations

:returns: The locations.
:rtype: List[Location]
"""
raise NotImplementedError
async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to the provided id.
locations = await self.get_all()
return locations[loc_id]

@abstractmethod
async def get(self, id): # pylint: disable=redefined-builtin,invalid-name
"""
Gets and returns location with the provided id.

:returns: The location.
:rtype: Location
"""
async def get_locations(self):
raise NotImplementedError

def getDataSources():
from .csbs import CSBSLocationService
from .jhu import JhuLocationService
from .nyt import NYTLocationService
return {"jhu": JhuLocationService(), "csbs": CSBSLocationService(), "nyt": NYTLocationService()}
146 changes: 66 additions & 80 deletions app/services/location/csbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,89 +14,75 @@

LOGGER = logging.getLogger("services.location.csbs")


class CSBSLocationService(LocationService):
"""
Service for retrieving locations from csbs
"""

async def get_all(self):
# Get the locations.
locations = await get_locations()
return locations

async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to the provided id.
locations = await self.get_all()
return locations[loc_id]


# Base URL for fetching data
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"


@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
class CSBSLocationService(LocationService):
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
Service for retrieving locations from csbs
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations(self):
"""
Retrieves county locations; locations are cached for 1 hour

:returns: The locations.
:rtype: dict
"""
data_id = "csbs.locations"
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()

LOGGER.debug(f"{data_id} Data received")

data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

locations = []

for i, item in enumerate(data):
# General info.
state = item["State Name"]
county = item["County Name"]

# Ensure country is specified.
if county in {"Unassigned", "Unknown"}:
continue

# Date string without "EDT" at end.
last_update = " ".join(item["Last Update"].split(" ")[0:2])

# Append to locations.
locations.append(
CSBSLocation(
# General info.
i,
state,
county,
# Coordinates.
Coordinates(item["Latitude"], item["Longitude"]),
# Last update (parse as ISO).
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
# Statistics.
int(item["Confirmed"] or 0),
int(item["Death"] or 0),
)
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)

# Return the locations.
return locations
Loading