Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use shared cache for jhu data
  • Loading branch information
Kilo59 committed Apr 27, 2020
commit de1cda3a0da085157752886be59f39f1033e7158
45 changes: 21 additions & 24 deletions app/caches.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,50 @@
"""app.caches.py"""
# from walrus import Database
import logging
import asyncio
import functools
import logging
from typing import Union

import aiocache

import config
from .config import get_settings

LOGGER = logging.getLogger(name="app.caches")

SETTINGS = config.get_settings()
SETTINGS = get_settings()

if SETTINGS.rediscloud_url:
REDIS_URL = SETTINGS.rediscloud_url
LOGGER.info("Using Rediscloud")
else:
REDIS_URL = SETTINGS.local_redis_url
LOGGER.info("Using Local Redis")


@functools.lru_cache()
def get_cache(namespace, redis=False) -> aiocache.RedisCache:
def get_cache(namespace) -> Union[aiocache.RedisCache, aiocache.SimpleMemoryCache]:
"""Retunr """
if redis:
if REDIS_URL:
LOGGER.info("using RedisCache")
return aiocache.RedisCache(
endpoint=REDIS_URL.host,
port=REDIS_URL.port,
password=REDIS_URL.password,
namespace=namespace,
create_connection_timeout=5,
)
LOGGER.info("using SimpleMemoryCache")
return aiocache.SimpleMemoryCache(namespace=namespace)


CACHE = get_cache("test", redis=False)


async def cach_test():
try:
await CACHE.set("foo", {"foobar": "bar"}, ttl=30)
except OSError as redis_err:
LOGGER.error(f"Redis Error: {redis_err}")
return
print(await CACHE.get("foo"))
await CACHE.close()
async def check_cache(data_id: str, namespace: str = None):
cache = get_cache(namespace)
result = await cache.get(data_id, None)
LOGGER.info(f"{data_id} cache pulled")
await cache.close()
return result


if __name__ == "__main__":
# print(REDIS_DB)
# h = REDIS_DB.Hash("Test Hash")
# h["foo"] = "bar"
# print(h)
asyncio.get_event_loop().run_until_complete(cach_test())
async def load_cache(data_id: str, data, namespace: str = None, cache_life: int = 3600):
cache = get_cache(namespace)
await cache.set(data_id, data, ttl=cache_life)
LOGGER.info(f"{data_id} cache loaded")
await cache.close()
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class _Settings(BaseSettings):
port: int = 5000
rediscloud_url: HttpUrl = None
local_redis_url: AnyUrl = "redis://localhost:6379"
local_redis_url: AnyUrl = None


@functools.lru_cache()
Expand Down
109 changes: 58 additions & 51 deletions app/services/location/jhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from asyncache import cached
from cachetools import TTLCache

from ...caches import check_cache, get_cache, load_cache
from ...coordinates import Coordinates
from ...location import TimelinedLocation
from ...timeline import Timeline
Expand Down Expand Up @@ -57,68 +58,74 @@ async def get_category(category):
category = category.lower()
data_id = f"jhu.{category}"

# TODO: check cache
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
results = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
# URL to request data from.
url = BASE_URL + "time_series_covid19_%s_global.csv" % category

# URL to request data from.
url = BASE_URL + "time_series_covid19_%s_global.csv" % category
# Request the data
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(url) as response:
text = await response.text()

# Request the data
LOGGER.info(f"{data_id} Requesting data...")
async with httputils.CLIENT_SESSION.get(url) as response:
text = await response.text()
LOGGER.debug(f"{data_id} Data received")

LOGGER.debug(f"{data_id} Data received")
# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")

# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")
# The normalized locations.
locations = []

# The normalized locations.
locations = []
for item in data:
# Filter out all the dates.
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))

for item in data:
# Filter out all the dates.
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))
# Make location history from dates.
history = {date: int(amount or 0) for date, amount in dates.items()}

# Make location history from dates.
history = {date: int(amount or 0) for date, amount in dates.items()}
# Country for this location.
country = item["Country/Region"]

# Country for this location.
country = item["Country/Region"]
# Latest data insert value.
latest = list(history.values())[-1]

# Latest data insert value.
latest = list(history.values())[-1]
# Normalize the item and append to locations.
locations.append(
{
# General info.
"country": country,
"country_code": countries.country_code(country),
"province": item["Province/State"],
# Coordinates.
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
# History.
"history": history,
# Latest statistic.
"latest": int(latest or 0),
}
)
LOGGER.debug(f"{data_id} Data normalized")

# Latest total.
latest = sum(map(lambda location: location["latest"], locations))

# Return the final data.
results = {
"locations": locations,
"latest": latest,
"last_updated": datetime.utcnow().isoformat() + "Z",
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
}
# save the results to distributed cache
await load_cache(data_id, results)

# Normalize the item and append to locations.
locations.append(
{
# General info.
"country": country,
"country_code": countries.country_code(country),
"province": item["Province/State"],
# Coordinates.
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
# History.
"history": history,
# Latest statistic.
"latest": int(latest or 0),
}
)
LOGGER.debug(f"{data_id} Data normalized")

# Latest total.
latest = sum(map(lambda location: location["latest"], locations))

# Return the final data.
results = {
"locations": locations,
"latest": latest,
"last_updated": datetime.utcnow().isoformat() + "Z",
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
}
LOGGER.info(f"{data_id} results:\n{pf(results, depth=1)}")
# save the results to distributed cache
# TODO: async
return results


Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tool.black]
line-length = 100
line-length = 120
target-version = ['py36', 'py37', 'py38']
include = '\.pyi?$'
exclude = '''
Expand All @@ -23,4 +23,4 @@ multi_line_output = 3
include_trailing_comma = "True"
force_grid_wrap = 0
use_parentheses = "True"
line_length = 100
line_length = 120