Skip to content

Commit 16e2355

Browse files
committed
use shared cache for jhu data
1 parent 9cefc72 commit 16e2355

File tree

4 files changed

+82
-78
lines changed

4 files changed

+82
-78
lines changed

app/caches.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,50 @@
11
"""app.caches.py"""
2-
# from walrus import Database
3-
import logging
4-
import asyncio
52
import functools
3+
import logging
4+
from typing import Union
65

76
import aiocache
87

9-
import config
8+
from .config import get_settings
109

1110
LOGGER = logging.getLogger(name="app.caches")
1211

13-
SETTINGS = config.get_settings()
12+
SETTINGS = get_settings()
1413

1514
if SETTINGS.rediscloud_url:
1615
REDIS_URL = SETTINGS.rediscloud_url
16+
LOGGER.info("Using Rediscloud")
1717
else:
1818
REDIS_URL = SETTINGS.local_redis_url
19+
LOGGER.info("Using Local Redis")
1920

2021

2122
@functools.lru_cache()
22-
def get_cache(namespace, redis=False) -> aiocache.RedisCache:
23+
def get_cache(namespace) -> Union[aiocache.RedisCache, aiocache.SimpleMemoryCache]:
2324
"""Retunr """
24-
if redis:
25+
if REDIS_URL:
26+
LOGGER.info("using RedisCache")
2527
return aiocache.RedisCache(
2628
endpoint=REDIS_URL.host,
2729
port=REDIS_URL.port,
2830
password=REDIS_URL.password,
2931
namespace=namespace,
3032
create_connection_timeout=5,
3133
)
34+
LOGGER.info("using SimpleMemoryCache")
3235
return aiocache.SimpleMemoryCache(namespace=namespace)
3336

3437

35-
CACHE = get_cache("test", redis=False)
36-
37-
38-
async def cach_test():
39-
try:
40-
await CACHE.set("foo", {"foobar": "bar"}, ttl=30)
41-
except OSError as redis_err:
42-
LOGGER.error(f"Redis Error: {redis_err}")
43-
return
44-
print(await CACHE.get("foo"))
45-
await CACHE.close()
38+
async def check_cache(data_id: str, namespace: str = None):
39+
cache = get_cache(namespace)
40+
result = await cache.get(data_id, None)
41+
LOGGER.info(f"{data_id} cache pulled")
42+
await cache.close()
43+
return result
4644

4745

48-
if __name__ == "__main__":
49-
# print(REDIS_DB)
50-
# h = REDIS_DB.Hash("Test Hash")
51-
# h["foo"] = "bar"
52-
# print(h)
53-
asyncio.get_event_loop().run_until_complete(cach_test())
46+
async def load_cache(data_id: str, data, namespace: str = None, cache_life: int = 3600):
47+
cache = get_cache(namespace)
48+
await cache.set(data_id, data, ttl=cache_life)
49+
LOGGER.info(f"{data_id} cache loaded")
50+
await cache.close()

app/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class _Settings(BaseSettings):
1111
port: int = 5000
1212
rediscloud_url: HttpUrl = None
13-
local_redis_url: AnyUrl = "redis://localhost:6379"
13+
local_redis_url: AnyUrl = None
1414

1515

1616
@functools.lru_cache()

app/services/location/jhu.py

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from asyncache import cached
99
from cachetools import TTLCache
1010

11+
from ...caches import check_cache, get_cache, load_cache
1112
from ...coordinates import Coordinates
1213
from ...location import TimelinedLocation
1314
from ...timeline import Timeline
@@ -57,68 +58,74 @@ async def get_category(category):
5758
category = category.lower()
5859
data_id = f"jhu.{category}"
5960

60-
# TODO: check cache
61+
# check shared cache
62+
cache_results = await check_cache(data_id)
63+
if cache_results:
64+
LOGGER.info(f"{data_id} using shared cache results")
65+
results = cache_results
66+
else:
67+
LOGGER.info(f"{data_id} shared cache empty")
68+
# URL to request data from.
69+
url = BASE_URL + "time_series_covid19_%s_global.csv" % category
6170

62-
# URL to request data from.
63-
url = BASE_URL + "time_series_covid19_%s_global.csv" % category
71+
# Request the data
72+
LOGGER.info(f"{data_id} Requesting data...")
73+
async with httputils.CLIENT_SESSION.get(url) as response:
74+
text = await response.text()
6475

65-
# Request the data
66-
LOGGER.info(f"{data_id} Requesting data...")
67-
async with httputils.CLIENT_SESSION.get(url) as response:
68-
text = await response.text()
76+
LOGGER.debug(f"{data_id} Data received")
6977

70-
LOGGER.debug(f"{data_id} Data received")
78+
# Parse the CSV.
79+
data = list(csv.DictReader(text.splitlines()))
80+
LOGGER.debug(f"{data_id} CSV parsed")
7181

72-
# Parse the CSV.
73-
data = list(csv.DictReader(text.splitlines()))
74-
LOGGER.debug(f"{data_id} CSV parsed")
82+
# The normalized locations.
83+
locations = []
7584

76-
# The normalized locations.
77-
locations = []
85+
for item in data:
86+
# Filter out all the dates.
87+
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))
7888

79-
for item in data:
80-
# Filter out all the dates.
81-
dates = dict(filter(lambda element: date_util.is_date(element[0]), item.items()))
89+
# Make location history from dates.
90+
history = {date: int(amount or 0) for date, amount in dates.items()}
8291

83-
# Make location history from dates.
84-
history = {date: int(amount or 0) for date, amount in dates.items()}
92+
# Country for this location.
93+
country = item["Country/Region"]
8594

86-
# Country for this location.
87-
country = item["Country/Region"]
95+
# Latest data insert value.
96+
latest = list(history.values())[-1]
8897

89-
# Latest data insert value.
90-
latest = list(history.values())[-1]
98+
# Normalize the item and append to locations.
99+
locations.append(
100+
{
101+
# General info.
102+
"country": country,
103+
"country_code": countries.country_code(country),
104+
"province": item["Province/State"],
105+
# Coordinates.
106+
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
107+
# History.
108+
"history": history,
109+
# Latest statistic.
110+
"latest": int(latest or 0),
111+
}
112+
)
113+
LOGGER.debug(f"{data_id} Data normalized")
114+
115+
# Latest total.
116+
latest = sum(map(lambda location: location["latest"], locations))
117+
118+
# Return the final data.
119+
results = {
120+
"locations": locations,
121+
"latest": latest,
122+
"last_updated": datetime.utcnow().isoformat() + "Z",
123+
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
124+
}
125+
# save the results to distributed cache
126+
await load_cache(data_id, results)
91127

92-
# Normalize the item and append to locations.
93-
locations.append(
94-
{
95-
# General info.
96-
"country": country,
97-
"country_code": countries.country_code(country),
98-
"province": item["Province/State"],
99-
# Coordinates.
100-
"coordinates": {"lat": item["Lat"], "long": item["Long"],},
101-
# History.
102-
"history": history,
103-
# Latest statistic.
104-
"latest": int(latest or 0),
105-
}
106-
)
107-
LOGGER.debug(f"{data_id} Data normalized")
108-
109-
# Latest total.
110-
latest = sum(map(lambda location: location["latest"], locations))
111-
112-
# Return the final data.
113-
results = {
114-
"locations": locations,
115-
"latest": latest,
116-
"last_updated": datetime.utcnow().isoformat() + "Z",
117-
"source": "https://github.com/ExpDev07/coronavirus-tracker-api",
118-
}
119128
LOGGER.info(f"{data_id} results:\n{pf(results, depth=1)}")
120-
# save the results to distributed cache
121-
# TODO: async
122129
return results
123130

124131

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tool.black]
2-
line-length = 100
2+
line-length = 120
33
target-version = ['py36', 'py37', 'py38']
44
include = '\.pyi?$'
55
exclude = '''
@@ -23,4 +23,4 @@ multi_line_output = 3
2323
include_trailing_comma = "True"
2424
force_grid_wrap = 0
2525
use_parentheses = "True"
26-
line_length = 100
26+
line_length = 120

0 commit comments

Comments
 (0)