forked from ExpDev07/coronavirus-tracker-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnyt.py
More file actions
145 lines (116 loc) · 4.97 KB
/
nyt.py
File metadata and controls
145 lines (116 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""app.services.location.nyt.py"""
import csv
import logging
from datetime import datetime
from asyncache import cached
from cachetools import TTLCache
from ...caches import check_cache, load_cache
from ...coordinates import Coordinates
from ...location.nyt import NYTLocation
from ...models import Timeline
from ...utils import httputils
from . import LocationService
LOGGER = logging.getLogger("services.location.nyt")
class NYTLocationService(LocationService):
"""
Service for retrieving locations from New York Times (https://github.com/nytimes/covid-19-data).
"""
async def get_all(self):
# Get the locations.
locations = await get_locations()
return locations
async def get(self, loc_id): # pylint: disable=arguments-differ
# Get location at the index equal to provided id.
locations = await self.get_all()
return locations[loc_id]
# ---------------------------------------------------------------
# Base URL for fetching category.
BASE_URL = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv"
def get_grouped_locations_dict(data):
"""
Helper function to group history for locations into one dict.
:returns: The complete data for each unique US county
:rdata: dict
"""
grouped_locations = {}
# in increasing order of dates
for row in data:
county_state = (row["county"], row["state"])
date = row["date"]
confirmed = row["cases"]
deaths = row["deaths"]
# initialize if not existing
if county_state not in grouped_locations:
grouped_locations[county_state] = {"confirmed": [], "deaths": []}
# append confirmed tuple to county_state (date, # confirmed)
grouped_locations[county_state]["confirmed"].append((date, confirmed))
# append deaths tuple to county_state (date, # deaths)
grouped_locations[county_state]["deaths"].append((date, deaths))
return grouped_locations
@cached(cache=TTLCache(maxsize=1, ttl=1800))
async def get_locations():
"""
Returns a list containing parsed NYT data by US county. The data is cached for 1 hour.
:returns: The complete data for US Counties.
:rtype: dict
"""
data_id = "nyt.locations"
# Request the data.
LOGGER.info(f"{data_id} Requesting data...")
# check shared cache
cache_results = await check_cache(data_id)
if cache_results:
LOGGER.info(f"{data_id} using shared cache results")
locations = cache_results
else:
LOGGER.info(f"{data_id} shared cache empty")
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
text = await response.text()
LOGGER.debug(f"{data_id} Data received")
# Parse the CSV.
data = list(csv.DictReader(text.splitlines()))
LOGGER.debug(f"{data_id} CSV parsed")
# Group together locations (NYT data ordered by dates not location).
grouped_locations = get_grouped_locations_dict(data)
# The normalized locations.
locations = []
for idx, (county_state, histories) in enumerate(grouped_locations.items()):
# Make location history for confirmed and deaths from dates.
# List is tuples of (date, amount) in order of increasing dates.
confirmed_list = histories["confirmed"]
confirmed_history = {date: int(amount or 0) for date, amount in confirmed_list}
deaths_list = histories["deaths"]
deaths_history = {date: int(amount or 0) for date, amount in deaths_list}
# Normalize the item and append to locations.
locations.append(
NYTLocation(
id=idx,
state=county_state[1],
county=county_state[0],
coordinates=Coordinates(None, None), # NYT does not provide coordinates
last_updated=datetime.utcnow().isoformat() + "Z", # since last request
timelines={
"confirmed": Timeline(
timeline={
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in confirmed_history.items()
}
),
"deaths": Timeline(
timeline={
datetime.strptime(date, "%Y-%m-%d").isoformat() + "Z": amount
for date, amount in deaths_history.items()
}
),
"recovered": Timeline(),
},
)
)
LOGGER.info(f"{data_id} Data normalized")
# save the results to distributed cache
# TODO: fix json serialization
try:
await load_cache(data_id, locations)
except TypeError as type_err:
LOGGER.error(type_err)
return locations