Skip to content

Commit f998e0c

Browse files
committed
add async task scheduler and mail scheduler
1 parent 97f34dc commit f998e0c

24 files changed

+441
-175
lines changed

apps/startup/apps.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ class MyAppConfig(AppConfig):
1111
def ready(self):
1212
print(
1313
f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
14+
# start scraping
1415
Thread(target=start).start()

apps/storage/query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"):
1313

1414
def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"):
1515
sort_seq = [(sort_key, pymongo.ASCENDING)]
16-
return find_many(collection_name, filter, db_name=db_name, sort=sort_seq)
16+
return find_many(collection_name, filter, db_name=db_name, sort=sort_seq)

apps/ticketscraping/constants.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from functools import reduce
12
from uuid import uuid4
23

34
ANTIBOT_JS_CODE_URL = "https://epsf.ticketmaster.com/eps-d"
@@ -18,20 +19,38 @@ def get_top_picks_url(
1819
"BEST_HISTORY_SEATS": "best-history-seats"
1920
}
2021

22+
SUBSCRIBE_REQUEST_PROPS = {
23+
'NAME': 'name',
24+
'TARGET_PRICE': 'target_price',
25+
'TOLERANCE': 'tolerance',
26+
'TICKET_NUM': 'ticket_num',
27+
'TM_EVENT_ID': 'tm_event_id'
28+
}
29+
30+
def filter_obj_from_attrs(obj, atts: dict[str,str]):
31+
res = {}
32+
for key in atts.values():
33+
if key in obj:
34+
res[key] = obj[key]
35+
return res
36+
37+
2138
# metric thresholds
39+
MINIMUM_HISTORY_DATA = 3
2240
PERCENT_OF_CHANGE = 0.5
23-
PERCENTILE_HISTORY_PRICES = 0.5
41+
PERCENTILE_HISTORY_PRICES = 0.25
42+
ALERT_SEATS_MAX_COUNT = 3
2443

2544
def get_top_picks_header(): return {
2645
**BASIC_REQ_HEADER,
2746
"tmps-correlation-id": str(uuid4())
2847
}
2948

30-
def get_top_picks_query_params(qty, priceInterval): return {
49+
def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): return {
3150
'show': 'places maxQuantity sections',
3251
'mode': 'primary:ppsectionrow resale:ga_areas platinum:all',
3352
'qty': qty,
34-
'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{priceInterval[0]}),lte(@,{priceInterval[1]}))))",
53+
'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{target_price - tolerance}),lte(@,{target_price + tolerance}))))",
3554
'includeStandard': 'true',
3655
'includeResale': 'true',
3756
'includePlatinumInventoryType': 'false',

apps/ticketscraping/prepare_reese84token.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from . import constants
77

88

9-
def getReese84Token():
9+
def getReese84Token()->tuple[str, int]:
1010
def readFileContentToString(filename):
1111
f = open(filename, 'r')
1212
content = f.read()
@@ -19,7 +19,7 @@ def readFileContentToString(filename):
1919
# trim the code to the function that is only used
2020
match_obj = re.search(constants.FN_MATCHING_REGEX, antibot_js_code_full)
2121
if not match_obj:
22-
return None
22+
raise Exception('reese84 manufacture fails')
2323
start, end = match_obj.span()
2424
antibot_js_code_trim = antibot_js_code_full[start:end]
2525

@@ -51,4 +51,5 @@ def readFileContentToString(filename):
5151
# invoke the get token api to get the reese84 token
5252
token_json_res = requests.post(
5353
constants.TOKEN_INTERROGATION_URL, headers=constants.BASIC_REQ_HEADER, json=token)
54-
return token_json_res.json()
54+
json_obj = token_json_res.json()
55+
return json_obj['token'], json_obj['renewInSec']
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .scheduler_process import SchedulerProcess
2+
3+
print(__name__)
4+
async_tasks_scheduler = SchedulerProcess()
5+
async_tasks_scheduler.start()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import threading
2+
from multiprocessing import Queue
3+
from ..tasks.util.scheduler import Scheduler
4+
from ..tasks.asynchronous import run_async_tasks
5+
6+
def child_process(queue: Queue):
7+
scheduler = Scheduler('asynchronous tasks scheduler', run_async_tasks)
8+
sched_thread = threading.Thread(target=scheduler.run)
9+
sched_thread.start()
10+
while True:
11+
received_params = queue.get()
12+
scheduler.enter(*received_params)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# from .scheduler_process import SchedulerProcess
2+
# from .mail_scheduler_child_process import child_process
3+
4+
# mail_scheduler = SchedulerProcess(child_process)
5+
# mail_scheduler.start()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import threading
2+
from multiprocessing import Queue
3+
from ..tasks.util.scheduler import Scheduler
4+
5+
def child_process(queue: Queue):
6+
#TODO
7+
scheduler = Scheduler('mail scheduler', lambda x: x)
8+
sched_thread = threading.Thread(target=scheduler.run)
9+
sched_thread.start()
10+
while True:
11+
received_params = queue.get()
12+
scheduler.enter(*received_params)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import threading
2+
from multiprocessing import Process, Queue
3+
from .async_tasks_child_process import child_process
4+
5+
class SchedulerProcess:
6+
def __init__(self):
7+
self.queue = Queue()
8+
self.started = False
9+
self.starter_thread = None
10+
11+
def generate_child_process(self):
12+
cp = Process(target=child_process, args=(self.queue,))
13+
cp.start()
14+
self.started = True
15+
cp.join()
16+
17+
def start(self):
18+
self.starter_thread = threading.Thread(
19+
target=self.generate_child_process)
20+
self.starter_thread.start()
21+
22+
def produce(self, *args):
23+
if not self.started:
24+
return
25+
self.queue.put(args)

apps/ticketscraping/scraping.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@
1414
class Reese84TokenUpdating():
1515
def __init__(self):
1616
self.is_running = False
17-
self.reese84_token = {}
17+
self.reese84_token = ''
18+
self.reese84_renewInSec = 0
1819
self.token_semaphore = Semaphore(0)
1920
self.scheduler = sched.scheduler(time.time, time.sleep)
2021

2122
def initialize_reese84_token(self):
2223
"""
2324
This method should not be called directly.
2425
"""
25-
self.reese84_token = getReese84Token()
26+
self.reese84_token, self.reese84_renewInSec = getReese84Token()
2627
self.token_semaphore.release() # produce a new token
27-
self.scheduler.enter(self.reese84_token['renewInSec'] -
28+
self.scheduler.enter(self.reese84_renewInSec -
2829
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)
2930

3031
def renew_reese84_token(self):
@@ -35,7 +36,7 @@ def renew_reese84_token(self):
3536
self.token_semaphore.acquire() # invalidate a token
3637
self.reese84_token = getReese84Token()
3738
self.token_semaphore.release()
38-
self.scheduler.enter(self.reese84_token['renewInSec'] -
39+
self.scheduler.enter(self.reese84_renewInSec -
3940
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)
4041

4142
def start(self):
@@ -48,14 +49,15 @@ def start(self):
4849

4950

5051
class TicketScraping(threading.Thread):
51-
def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id, num_seats=2, price_range=(0, 200)):
52+
def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int):
5253
threading.Thread.__init__(self)
5354
self.is_running = False
5455
self.is_stopping = False
5556
self.event_id = event_id
5657
self.subscribe_id = subscribe_id
5758
self.num_seats = num_seats
58-
self.price_range = price_range
59+
self.target_price = target_price
60+
self.tolerance = tolerance
5961
self.token_gen = token_generator
6062
self.scheduler = sched.scheduler(time.time, time.sleep)
6163
self.initialDelay = random.randint(
@@ -83,17 +85,17 @@ def ticket_scraping(self):
8385
# scrape the top-picks from ticketmaster
8486
top_picks_url = constants.get_top_picks_url(self.event_id)
8587
top_picks_q_params = constants.get_top_picks_query_params(
86-
self.num_seats, self.price_range)
88+
self.num_seats, self.target_price, self.tolerance)
8789
top_picks_header = constants.get_top_picks_header()
8890
res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params,
89-
cookies=dict(reese84=self.token_gen.reese84_token['token']))
90-
# print(res.json())
91+
cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore
9192

9293
# prune and format the received picks
9394
picks_obj = format_seats(res.json(), self.subscribe_id)
9495

9596
# periodic task: update collections best_available_seats and best_history_seats
96-
run_periodic_task(picks_obj, self.subscribe_id)
97+
# and automatically spawn async tasks
98+
run_periodic_task(picks_obj, self.subscribe_id, self.target_price)
9799

98100
print("Got the ticket info from TM. /", res.status_code)
99101
self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL,
@@ -102,8 +104,8 @@ def ticket_scraping(self):
102104
def get_id(self):
103105
# returns id of the respective thread
104106
if hasattr(self, '_thread_id'):
105-
return self._thread_id
106-
for id, thread in threading._active.items():
107+
return self._thread_id # type: ignore
108+
for id, thread in threading._active.items(): # type: ignore
107109
if thread is self:
108110
return id
109111

@@ -135,7 +137,7 @@ def start():
135137
'$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]})
136138
for evt in events:
137139
ticket_scraping = TicketScraping(
138-
reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"])
140+
reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"])
139141
print(ticket_scraping.initialDelay, "s")
140142
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
141143
for scraping_thread in scraping_list.values():
@@ -156,7 +158,7 @@ def start():
156158
# spawn a thread to do scraping operations
157159
full_doc = change['fullDocument']
158160
ticket_scraping = TicketScraping(
159-
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"])
161+
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"])
160162
print(ticket_scraping.initialDelay, "s")
161163
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
162164
ticket_scraping.start()
@@ -177,7 +179,7 @@ def start():
177179
# resume scraping if currently paused
178180
if doc_id not in scraping_list:
179181
ticket_scraping = TicketScraping(
180-
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"])
182+
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"])
181183
print(ticket_scraping.initialDelay, "s")
182184
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
183185
ticket_scraping.start()

0 commit comments

Comments
 (0)