Skip to content

Commit 32889a3

Browse files
Ali TayyabiAli Tayyabi
authored andcommitted
Added aggregate pattern for location service
1 parent 1c7e4ae commit 32889a3

File tree

8 files changed

+240
-277
lines changed

8 files changed

+240
-277
lines changed

app/data/__init__.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
"""app.data"""
2-
from ..services.location.csbs import CSBSLocationService
3-
from ..services.location.jhu import JhuLocationService
4-
from ..services.location.nyt import NYTLocationService
2+
from ..services.location import getDataSources
53

64
# Mapping of services to data-sources.
7-
DATA_SOURCES = {
8-
"jhu": JhuLocationService(),
9-
"csbs": CSBSLocationService(),
10-
"nyt": NYTLocationService(),
11-
}
5+
DATA_SOURCES = getDataSources()
126

137

148
def data_source(source):

app/services/location/__init__.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,22 @@ class LocationService(ABC):
77
Service for retrieving locations.
88
"""
99

10-
@abstractmethod
1110
async def get_all(self):
12-
"""
13-
Gets and returns all of the locations.
11+
# Get the locations.
12+
locations = await self.get_locations()
13+
return locations
1414

15-
:returns: The locations.
16-
:rtype: List[Location]
17-
"""
18-
raise NotImplementedError
15+
async def get(self, loc_id): # pylint: disable=arguments-differ
16+
# Get location at the index equal to the provided id.
17+
locations = await self.get_all()
18+
return locations[loc_id]
1919

2020
@abstractmethod
21-
async def get(self, id): # pylint: disable=redefined-builtin,invalid-name
22-
"""
23-
Gets and returns location with the provided id.
24-
25-
:returns: The location.
26-
:rtype: Location
27-
"""
21+
async def get_locations(self):
2822
raise NotImplementedError
23+
24+
def getDataSources():
25+
from .csbs import CSBSLocationService
26+
from .jhu import JhuLocationService
27+
from .nyt import NYTLocationService
28+
return {"jhu": JhuLocationService(), "csbs": CSBSLocationService(), "nyt": NYTLocationService()}

app/services/location/csbs.py

Lines changed: 66 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -14,89 +14,75 @@
1414

1515
LOGGER = logging.getLogger("services.location.csbs")
1616

17-
18-
class CSBSLocationService(LocationService):
19-
"""
20-
Service for retrieving locations from csbs
21-
"""
22-
23-
async def get_all(self):
24-
# Get the locations.
25-
locations = await get_locations()
26-
return locations
27-
28-
async def get(self, loc_id): # pylint: disable=arguments-differ
29-
# Get location at the index equal to the provided id.
30-
locations = await self.get_all()
31-
return locations[loc_id]
32-
33-
3417
# Base URL for fetching data
3518
BASE_URL = "https://facts.csbs.org/covid-19/covid19_county.csv"
3619

37-
38-
@cached(cache=TTLCache(maxsize=1, ttl=1800))
39-
async def get_locations():
20+
class CSBSLocationService(LocationService):
4021
"""
41-
Retrieves county locations; locations are cached for 1 hour
42-
43-
:returns: The locations.
44-
:rtype: dict
22+
Service for retrieving locations from csbs
4523
"""
46-
data_id = "csbs.locations"
47-
LOGGER.info(f"{data_id} Requesting data...")
48-
# check shared cache
49-
cache_results = await check_cache(data_id)
50-
if cache_results:
51-
LOGGER.info(f"{data_id} using shared cache results")
52-
locations = cache_results
53-
else:
54-
LOGGER.info(f"{data_id} shared cache empty")
55-
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
56-
text = await response.text()
57-
58-
LOGGER.debug(f"{data_id} Data received")
59-
60-
data = list(csv.DictReader(text.splitlines()))
61-
LOGGER.debug(f"{data_id} CSV parsed")
62-
63-
locations = []
64-
65-
for i, item in enumerate(data):
66-
# General info.
67-
state = item["State Name"]
68-
county = item["County Name"]
69-
70-
# Ensure country is specified.
71-
if county in {"Unassigned", "Unknown"}:
72-
continue
73-
74-
# Date string without "EDT" at end.
75-
last_update = " ".join(item["Last Update"].split(" ")[0:2])
76-
77-
# Append to locations.
78-
locations.append(
79-
CSBSLocation(
80-
# General info.
81-
i,
82-
state,
83-
county,
84-
# Coordinates.
85-
Coordinates(item["Latitude"], item["Longitude"]),
86-
# Last update (parse as ISO).
87-
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
88-
# Statistics.
89-
int(item["Confirmed"] or 0),
90-
int(item["Death"] or 0),
24+
@cached(cache=TTLCache(maxsize=1, ttl=1800))
25+
async def get_locations(self):
26+
"""
27+
Retrieves county locations; locations are cached for 1 hour
28+
29+
:returns: The locations.
30+
:rtype: dict
31+
"""
32+
data_id = "csbs.locations"
33+
LOGGER.info(f"{data_id} Requesting data...")
34+
# check shared cache
35+
cache_results = await check_cache(data_id)
36+
if cache_results:
37+
LOGGER.info(f"{data_id} using shared cache results")
38+
locations = cache_results
39+
else:
40+
LOGGER.info(f"{data_id} shared cache empty")
41+
async with httputils.CLIENT_SESSION.get(BASE_URL) as response:
42+
text = await response.text()
43+
44+
LOGGER.debug(f"{data_id} Data received")
45+
46+
data = list(csv.DictReader(text.splitlines()))
47+
LOGGER.debug(f"{data_id} CSV parsed")
48+
49+
locations = []
50+
51+
for i, item in enumerate(data):
52+
# General info.
53+
state = item["State Name"]
54+
county = item["County Name"]
55+
56+
# Ensure country is specified.
57+
if county in {"Unassigned", "Unknown"}:
58+
continue
59+
60+
# Date string without "EDT" at end.
61+
last_update = " ".join(item["Last Update"].split(" ")[0:2])
62+
63+
# Append to locations.
64+
locations.append(
65+
CSBSLocation(
66+
# General info.
67+
i,
68+
state,
69+
county,
70+
# Coordinates.
71+
Coordinates(item["Latitude"], item["Longitude"]),
72+
# Last update (parse as ISO).
73+
datetime.strptime(last_update, "%Y-%m-%d %H:%M").isoformat() + "Z",
74+
# Statistics.
75+
int(item["Confirmed"] or 0),
76+
int(item["Death"] or 0),
77+
)
9178
)
92-
)
93-
LOGGER.info(f"{data_id} Data normalized")
94-
# save the results to distributed cache
95-
# TODO: fix json serialization
96-
try:
97-
await load_cache(data_id, locations)
98-
except TypeError as type_err:
99-
LOGGER.error(type_err)
100-
101-
# Return the locations.
102-
return locations
79+
LOGGER.info(f"{data_id} Data normalized")
80+
# save the results to distributed cache
81+
# TODO: fix json serialization
82+
try:
83+
await load_cache(data_id, locations)
84+
except TypeError as type_err:
85+
LOGGER.error(type_err)
86+
87+
# Return the locations.
88+
return locations

app/services/location/jhu.py

Lines changed: 80 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,88 @@ class JhuLocationService(LocationService):
2626
Service for retrieving locations from Johns Hopkins CSSE (https://github.com/CSSEGISandData/COVID-19).
2727
"""
2828

29-
async def get_all(self):
30-
# Get the locations.
31-
locations = await get_locations()
29+
@cached(cache=TTLCache(maxsize=1, ttl=1800))
30+
async def get_locations(self):
31+
"""
32+
Retrieves the locations from the categories. The locations are cached for 1 hour.
33+
34+
:returns: The locations.
35+
:rtype: List[Location]
36+
"""
37+
data_id = "jhu.locations"
38+
LOGGER.info(f"pid:{PID}: {data_id} Requesting data...")
39+
# Get all of the data categories locations.
40+
confirmed = await get_category("confirmed")
41+
deaths = await get_category("deaths")
42+
recovered = await get_category("recovered")
43+
44+
locations_confirmed = confirmed["locations"]
45+
locations_deaths = deaths["locations"]
46+
locations_recovered = recovered["locations"]
47+
48+
# Final locations to return.
49+
locations = []
50+
# ***************************************************************************
51+
# TODO: This iteration approach assumes the indexes remain the same
52+
# and opens us to a CRITICAL ERROR. The removal of a column in the data source
53+
# would break the API or SHIFT all the data confirmed, deaths, recovery producting
54+
# incorrect data to consumers.
55+
# ***************************************************************************
56+
# Go through locations.
57+
for index, location in enumerate(locations_confirmed):
58+
# Get the timelines.
59+
60+
# TEMP: Fix for merging recovery data. See TODO above for more details.
61+
key = (location["country"], location["province"])
62+
63+
timelines = {
64+
"confirmed": location["history"],
65+
"deaths": parse_history(key, locations_deaths, index),
66+
"recovered": parse_history(key, locations_recovered, index),
67+
}
68+
69+
# Grab coordinates.
70+
coordinates = location["coordinates"]
71+
72+
# Create location (supporting timelines) and append.
73+
locations.append(
74+
TimelinedLocation(
75+
# General info.
76+
index,
77+
location["country"],
78+
location["province"],
79+
# Coordinates.
80+
Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]),
81+
# Last update.
82+
datetime.utcnow().isoformat() + "Z",
83+
# Timelines (parse dates as ISO).
84+
{
85+
"confirmed": Timeline(
86+
timeline={
87+
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
88+
for date, amount in timelines["confirmed"].items()
89+
}
90+
),
91+
"deaths": Timeline(
92+
timeline={
93+
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
94+
for date, amount in timelines["deaths"].items()
95+
}
96+
),
97+
"recovered": Timeline(
98+
timeline={
99+
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
100+
for date, amount in timelines["recovered"].items()
101+
}
102+
),
103+
},
104+
)
105+
)
106+
LOGGER.info(f"{data_id} Data normalized")
107+
108+
# Finally, return the locations.
32109
return locations
33110

34-
async def get(self, loc_id): # pylint: disable=arguments-differ
35-
# Get location at the index equal to provided id.
36-
locations = await self.get_all()
37-
return locations[loc_id]
38111

39112

40113
# ---------------------------------------------------------------
@@ -127,87 +200,6 @@ async def get_category(category):
127200
return results
128201

129202

130-
@cached(cache=TTLCache(maxsize=1, ttl=1800))
131-
async def get_locations():
132-
"""
133-
Retrieves the locations from the categories. The locations are cached for 1 hour.
134-
135-
:returns: The locations.
136-
:rtype: List[Location]
137-
"""
138-
data_id = "jhu.locations"
139-
LOGGER.info(f"pid:{PID}: {data_id} Requesting data...")
140-
# Get all of the data categories locations.
141-
confirmed = await get_category("confirmed")
142-
deaths = await get_category("deaths")
143-
recovered = await get_category("recovered")
144-
145-
locations_confirmed = confirmed["locations"]
146-
locations_deaths = deaths["locations"]
147-
locations_recovered = recovered["locations"]
148-
149-
# Final locations to return.
150-
locations = []
151-
# ***************************************************************************
152-
# TODO: This iteration approach assumes the indexes remain the same
153-
# and opens us to a CRITICAL ERROR. The removal of a column in the data source
154-
# would break the API or SHIFT all the data confirmed, deaths, recovery producting
155-
# incorrect data to consumers.
156-
# ***************************************************************************
157-
# Go through locations.
158-
for index, location in enumerate(locations_confirmed):
159-
# Get the timelines.
160-
161-
# TEMP: Fix for merging recovery data. See TODO above for more details.
162-
key = (location["country"], location["province"])
163-
164-
timelines = {
165-
"confirmed": location["history"],
166-
"deaths": parse_history(key, locations_deaths, index),
167-
"recovered": parse_history(key, locations_recovered, index),
168-
}
169-
170-
# Grab coordinates.
171-
coordinates = location["coordinates"]
172-
173-
# Create location (supporting timelines) and append.
174-
locations.append(
175-
TimelinedLocation(
176-
# General info.
177-
index,
178-
location["country"],
179-
location["province"],
180-
# Coordinates.
181-
Coordinates(latitude=coordinates["lat"], longitude=coordinates["long"]),
182-
# Last update.
183-
datetime.utcnow().isoformat() + "Z",
184-
# Timelines (parse dates as ISO).
185-
{
186-
"confirmed": Timeline(
187-
timeline={
188-
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
189-
for date, amount in timelines["confirmed"].items()
190-
}
191-
),
192-
"deaths": Timeline(
193-
timeline={
194-
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
195-
for date, amount in timelines["deaths"].items()
196-
}
197-
),
198-
"recovered": Timeline(
199-
timeline={
200-
datetime.strptime(date, "%m/%d/%y").isoformat() + "Z": amount
201-
for date, amount in timelines["recovered"].items()
202-
}
203-
),
204-
},
205-
)
206-
)
207-
LOGGER.info(f"{data_id} Data normalized")
208-
209-
# Finally, return the locations.
210-
return locations
211203

212204

213205
def parse_history(key: tuple, locations: list, index: int):

0 commit comments

Comments
 (0)