Skip to content

Commit 5890286

Browse files
author
Jack Li
committed
add subscription watch and apis
1 parent d277c4c commit 5890286

File tree

7 files changed

+150
-31
lines changed

7 files changed

+150
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
/**/js/node_modules
33
/**/js/antibot-simulation.js
44
/**/tmp
5+
.vscode

startupApp/apps.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from django.apps import AppConfig
22
from ticketscraping.scraping import start
33
from datetime import datetime
4+
from threading import Thread
45

56

67
class MyAppConfig(AppConfig):
@@ -11,4 +12,4 @@ def ready(self):
1112
print(
1213
f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
1314
print("=== database connection is established ===")
14-
start()
15+
Thread(target=start).start()

storage/base.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,18 @@ def delete_many__(coll: collection.Collection, filter: dict):
1515
return coll.delete_many(filter)
1616

1717
def find_one_and_replace__(coll: collection.Collection, filter: dict, new_doc: dict):
18+
new_doc.update(gen_additional_attrs())
1819
return coll.find_one_and_replace(filter, new_doc)
1920

21+
def find_one_and_update__(coll: collection.Collection, filter: dict, update=dict):
22+
meta_attrs = get_meta_attrs()
23+
key = "$set"
24+
if key in update:
25+
update[key].update(meta_attrs)
26+
else:
27+
update[key] = meta_attrs
28+
return coll.find_one_and_update(filter, update)
29+
2030
def find_one_and_delete__(coll: collection.Collection, filter: dict):
2131
return coll.find_one_and_delete(filter)
2232

@@ -26,8 +36,16 @@ def find_one__(coll: collection.Collection, filter: dict, projection):
2636
def find_many__(coll: collection.Collection, filter: dict, projection, **kwargs):
2737
return coll.find(filter=filter, projection=projection, **kwargs)
2838

39+
def watch__(coll: collection.Collection, **kwargs):
40+
return coll.watch(**kwargs)
41+
2942
def gen_additional_attrs():
3043
return {
3144
"_id": str(uuid4()),
45+
**get_meta_attrs()
46+
}
47+
48+
def get_meta_attrs():
49+
return {
3250
"last_modified": datetime.today().replace(microsecond=0)
3351
}

storage/storage.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ def find_one_and_replace(collection_name, filter: dict, new_doc: dict, db_name="
3636
coll = db[collection_name]
3737
return find_one_and_replace__(coll, filter, new_doc)
3838

39+
# find one and update
40+
def find_one_and_update(collection_name, filter: dict, update, db_name="tickets"):
41+
db = get_db_handle(db_name)
42+
coll = db[collection_name]
43+
return find_one_and_update__(coll, filter, update)
44+
3945
# find one and delete
4046
def find_one_and_delete(collection_name, filter: dict, db_name="tickets"):
4147
db = get_db_handle(db_name)
@@ -52,4 +58,11 @@ def find_one(collection_name, filter: dict, projection=None, db_name="tickets"):
5258
def find_many(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs):
5359
db = get_db_handle(db_name)
5460
coll = db[collection_name]
55-
return list(find_many__(coll, filter, projection, **kwargs))
61+
return list(find_many__(coll, filter, projection, **kwargs))
62+
63+
# watch changes
64+
def watch(collection_name, db_name="tickets", **kwargs):
65+
db = get_db_handle(db_name)
66+
coll = db[collection_name]
67+
return watch__(coll, **kwargs)
68+

ticketscraping/scraping.py

Lines changed: 99 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,28 @@
11
import sched
22
import time
3+
import ctypes
34
import random
45
import requests
5-
from threading import Thread
6+
import threading
67
from . import constants
78
from threading import Semaphore
89
from .prepare_reese84token import getReese84Token
910
from storage.storage import *
1011

12+
1113
class Reese84TokenUpdating():
1214
def __init__(self):
1315
self.is_running = False
1416
self.reese84_token = {}
1517
self.token_semaphore = Semaphore(0)
1618
self.scheduler = sched.scheduler(time.time, time.sleep)
17-
19+
1820
def initialize_reese84_token(self):
1921
"""
2022
This method should not be called directly.
2123
"""
2224
self.reese84_token = getReese84Token()
23-
self.token_semaphore.release() # produce a new token
25+
self.token_semaphore.release() # produce a new token
2426
self.scheduler.enter(self.reese84_token['renewInSec'] -
2527
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)
2628

@@ -29,24 +31,26 @@ def renew_reese84_token(self):
2931
This method should not be called directly.
3032
"""
3133
print("renewing token")
32-
self.token_semaphore.acquire() # invalidate a token
34+
self.token_semaphore.acquire() # invalidate a token
3335
self.reese84_token = getReese84Token()
3436
self.token_semaphore.release()
3537
self.scheduler.enter(self.reese84_token['renewInSec'] -
3638
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)
37-
39+
3840
def start(self):
3941
# if the scheduler is already started - do nothing
40-
if self.is_running: return
42+
if self.is_running:
43+
return
4144
self.is_running = True
4245
self.initialize_reese84_token()
4346
self.scheduler.run()
44-
4547

4648

47-
class TicketScraping():
49+
class TicketScraping(threading.Thread):
4850
def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id, num_seats=2, price_range=(0, 200)):
51+
threading.Thread.__init__(self)
4952
self.is_running = False
53+
self.is_stopping = False
5054
self.event_id = event_id
5155
self.subscribe_id = subscribe_id
5256
self.num_seats = num_seats
@@ -56,6 +60,19 @@ def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id
5660
self.initialDelay = random.randint(
5761
1, constants.TICKET_SCRAPING_INTERVAL)
5862

63+
def flag_for_termination(self):
64+
# cancel all scheduled jobs
65+
list(map(self.scheduler.cancel, self.scheduler.queue))
66+
# raise exception to terminate the thread
67+
thread_id = self.get_id()
68+
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id,
69+
ctypes.py_object(SystemExit))
70+
print(
71+
f"Ticket scraping with subscription id={self.subscribe_id} marked for termination.")
72+
if res > 1:
73+
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
74+
print(f'Failed to terminate the thread with id={thread_id}')
75+
5976
def ticket_scraping(self):
6077
if self.token_gen.token_semaphore._value <= 0:
6178
# retry after a delay
@@ -71,34 +88,89 @@ def ticket_scraping(self):
7188
# print(res.json())
7289
print("Got the ticket info from TM. /", res.status_code)
7390
self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL,
74-
constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping)
91+
constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping)
7592

76-
def start(self):
77-
# if the scheduler is already started - do nothing
78-
if self.is_running:
79-
return
80-
self.is_running = True
81-
self.ticket_scraping()
82-
# randomize start time to scatter out event of API fetching
83-
time.sleep(self.initialDelay)
84-
self.scheduler.run()
93+
def get_id(self):
94+
# returns id of the respective thread
95+
if hasattr(self, '_thread_id'):
96+
return self._thread_id
97+
for id, thread in threading._active.items():
98+
if thread is self:
99+
return id
100+
101+
def run(self):
102+
try:
103+
# if the scheduler is already started - do nothing
104+
if self.is_running:
105+
return
106+
self.is_running = True
107+
self.is_stopping = False
108+
self.ticket_scraping()
109+
# randomize start time to scatter out event of API fetching
110+
time.sleep(self.initialDelay)
111+
self.scheduler.run()
112+
finally:
113+
print(
114+
f"Ticket scraping with subscription id={self.subscribe_id} has been terminated.")
85115

86116

87117
def start():
88118
# reese84 token renewing thread
89119
reese_token_gen = Reese84TokenUpdating()
90-
serverThread_reese = Thread(target=reese_token_gen.start)
120+
serverThread_reese = threading.Thread(target=reese_token_gen.start)
91121
serverThread_reese.start()
92122

93123
# ticket scraping threads
94-
scraping_list = []
95-
events = find_many(constants.DATABASE["EVENTS"], {})
96-
124+
scraping_list = dict()
125+
events = find_many(constants.DATABASE["EVENTS"], {
126+
'$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]})
97127
for evt in events:
98-
ticket_scraping = TicketScraping(reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"])
128+
ticket_scraping = TicketScraping(
129+
reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"])
99130
print(ticket_scraping.initialDelay, "s")
100-
serverThread_ticket_scraping = Thread(target=ticket_scraping.start)
101-
scraping_list.append(serverThread_ticket_scraping)
102-
103-
for scraping_thread in scraping_list:
131+
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
132+
for scraping_thread in scraping_list.values():
104133
scraping_thread.start()
134+
135+
# listen for changes in ticket scraping subscriptions
136+
while(True):
137+
with watch(constants.DATABASE["EVENTS"], pipeline=[{'$match': {'operationType': {'$in': ['delete', 'insert', 'replace', 'update']}}}], full_document='updateLookup') as stream:
138+
for change in stream:
139+
if change['operationType'] == "delete":
140+
# stop the thread
141+
doc_id = change['documentKey']['_id']
142+
if doc_id in scraping_list:
143+
scraping_obj = scraping_list[doc_id]
144+
scraping_obj.flag_for_termination()
145+
del scraping_list[doc_id]
146+
elif change['operationType'] == "insert":
147+
# spawn a thread to do scraping operations
148+
full_doc = change['fullDocument']
149+
ticket_scraping = TicketScraping(
150+
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"])
151+
print(ticket_scraping.initialDelay, "s")
152+
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
153+
ticket_scraping.start()
154+
else:
155+
# replace or update - pause or resume ticket scraping
156+
full_doc = change['fullDocument']
157+
doc_id = full_doc['_id']
158+
if not 'markPaused' in full_doc:
159+
print("'markPaused' flag is unset, skip processing.")
160+
break
161+
if full_doc['markPaused'] == True:
162+
# pause scraping
163+
if doc_id in scraping_list:
164+
scraping_obj = scraping_list[doc_id]
165+
scraping_obj.flag_for_termination()
166+
del scraping_list[doc_id]
167+
else:
168+
# resume scraping if currently paused
169+
if doc_id not in scraping_list:
170+
ticket_scraping = TicketScraping(
171+
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"])
172+
print(ticket_scraping.initialDelay, "s")
173+
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
174+
ticket_scraping.start()
175+
# display current number of ticket scraping
176+
print(f"{len(scraping_list)} ticket scraping threads now.")

trackerapi/urls.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
from . import views
33

44
urlpatterns = [
5-
path('subscribe/', views.subscribe_tm_event_price_tracking)
5+
path('subscribe/', views.subscribe_tm_event_price_tracking),
6+
path('unsubscribe/', views.unsubscribe_tm_event_price_tracking)
67
]

trackerapi/views.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from django.shortcuts import render
22
from django.http import HttpResponse, HttpRequest
3-
from storage.storage import insert_one
3+
from storage.storage import insert_one, find_one_and_update
44
import ticketscraping.constants as constants
55
import json
66

@@ -21,3 +21,16 @@ def subscribe_tm_event_price_tracking(req: HttpRequest):
2121
}
2222
insert_one(constants.DATABASE['EVENTS'], doc)
2323
return HttpResponse('OK', status=200)
24+
25+
def unsubscribe_tm_event_price_tracking(req: HttpRequest):
26+
if req.method == 'POST':
27+
body = json.loads(req.body)
28+
# validation
29+
id = body['subscription_id']
30+
if not id:
31+
return HttpResponse('Request is invalid.', status=400)
32+
res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True, }})
33+
if res:
34+
return HttpResponse('OK', status=200)
35+
else:
36+
return HttpResponse('Subscription id not found.', status=400)

0 commit comments

Comments
 (0)