Skip to content

Commit 2572203

Browse files
Merge pull request #15 from Jackiebibili/schedulers
add async task scheduler and mail scheduler and synchronize startup process
2 parents 078b4a1 + 15cca37 commit 2572203

23 files changed

+565
-47
lines changed

apps/startup/apps.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,40 @@
11
from django.apps import AppConfig
2-
from ..ticketscraping.scraping import start
32
from datetime import datetime
43
from threading import Thread
4+
from multiprocessing import Process
5+
6+
7+
def run_prepare():
8+
# import module inside the child process to prevent execution in the parent process
9+
print(
10+
f"ticket scraping service started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
11+
12+
# start sender socket
13+
from apps.ticketscraping.schedulers.async_tasks_scheduler import async_tasks_scheduler
14+
conn_thread = Thread(target=async_tasks_scheduler.connect)
15+
conn_thread.start()
16+
# wait for async tasks handler to connect
17+
conn_thread.join()
18+
19+
# start itself (scraping)
20+
from apps.ticketscraping.scraping import start
21+
start()
22+
23+
24+
def run():
25+
# starter
26+
p = Process(target=run_prepare, daemon=True)
27+
p.start()
28+
# start receiver socket
29+
from apps.ticketscraping.connection.asyn_tasks_receiver import run
30+
conn_process = Process(target=run)
31+
conn_process.start()
32+
conn_process.join()
533

634

735
class MyAppConfig(AppConfig):
836
name = "apps.startup"
937
verbose_name = "start tmtracker"
1038

1139
def ready(self):
12-
print(
13-
f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
14-
Thread(target=start).start()
40+
run()

apps/storage/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,18 @@ def find_one_and_update__(coll: collection.Collection, filter: dict, update=dict
3030
def find_one_and_delete__(coll: collection.Collection, filter: dict):
3131
return coll.find_one_and_delete(filter)
3232

33-
def find_one__(coll: collection.Collection, filter: dict, projection):
34-
return coll.find_one(filter=filter, projection=projection)
33+
def find_one__(coll: collection.Collection, filter: dict, projection, **kwargs):
34+
return coll.find_one(filter=filter, projection=projection, **kwargs)
3535

3636
def find_many__(coll: collection.Collection, filter: dict, projection, **kwargs):
3737
return coll.find(filter=filter, projection=projection, **kwargs)
3838

39+
def count_docs__(coll: collection.Collection, filter: dict):
40+
return coll.count_documents(filter=filter)
41+
42+
def estimated_count_docs__(coll: collection.Collection):
43+
return coll.estimated_document_count()
44+
3945
def watch__(coll: collection.Collection, **kwargs):
4046
return coll.watch(**kwargs)
4147

apps/storage/query.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from .storage import *
2+
import pymongo
3+
4+
# find the max value in a collection
5+
def find_max(collection_name, filter: dict, sort_key: str, db_name="tickets"):
6+
sort_seq = [(sort_key, pymongo.DESCENDING)]
7+
return find_one(collection_name, filter, db_name=db_name, sort=sort_seq)
8+
9+
# find the min value in a collection
10+
def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"):
11+
sort_seq = [(sort_key, pymongo.ASCENDING)]
12+
return find_one(collection_name, filter, db_name=db_name, sort=sort_seq)
13+
14+
def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"):
15+
sort_seq = [(sort_key, pymongo.ASCENDING)]
16+
return find_many(collection_name, filter, db_name=db_name, sort=sort_seq)

apps/storage/storage.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,29 @@ def find_one_and_delete(collection_name, filter: dict, db_name="tickets"):
5151
return find_one_and_delete__(coll, filter)
5252

5353
# find one
54-
def find_one(collection_name, filter: dict, projection=None, db_name="tickets"):
54+
def find_one(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs):
5555
db = get_db_handle(db_name)
5656
coll = db[collection_name]
57-
return find_one__(coll, filter, projection)
57+
return find_one__(coll, filter, projection, **kwargs)
5858

5959
# find many
6060
def find_many(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs):
6161
db = get_db_handle(db_name)
6262
coll = db[collection_name]
6363
return list(find_many__(coll, filter, projection, **kwargs))
6464

65+
# count with filter
66+
def count_docs(collection_name, filter: dict, db_name="tickets"):
67+
db = get_db_handle(db_name)
68+
coll = db[collection_name]
69+
return count_docs__(coll, filter)
70+
71+
# count all docs in a collection
72+
def estimated_count_docs(collection_name, db_name="tickets"):
73+
db = get_db_handle(db_name)
74+
coll = db[collection_name]
75+
return estimated_count_docs__(coll)
76+
6577
# watch changes
6678
def watch(collection_name, db_name="tickets", **kwargs):
6779
db = get_db_handle(db_name)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# start sockets
2+
from threading import Thread
3+
from multiprocessing import Process
4+
from apps.ticketscraping.connection.receiver_process import ReceiverProcess
5+
from apps.ticketscraping.constants import SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT
6+
7+
8+
def run_prepare():
9+
# start receiver socket
10+
from apps.ticketscraping.connection.mail_receiver import run
11+
conn_process = Process(target=run, daemon=True)
12+
conn_process.start()
13+
14+
# start sender socket
15+
from apps.ticketscraping.schedulers.mail_scheduler import mail_scheduler
16+
conn_thread = Thread(target=mail_scheduler.connect)
17+
conn_thread.start()
18+
# wait for mailer to connect
19+
conn_thread.join()
20+
21+
# start itself
22+
from apps.ticketscraping.tasks.asynchronous import run_async_tasks
23+
receiver = ReceiverProcess(run_async_tasks, SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT)
24+
receiver.connect()
25+
receiver.serve_forever()
26+
27+
28+
def run():
29+
# starter
30+
p = Process(target=run_prepare)
31+
p.start()
32+
33+
34+
if __name__ == '__main__':
35+
run()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from apps.ticketscraping.connection.receiver_process import ReceiverProcess
2+
# from ..tasks.asynchronous import run_async_tasks
3+
from apps.ticketscraping.constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT
4+
5+
def run():
6+
# start itself
7+
receiver = ReceiverProcess(lambda x: print(
8+
x), SERVICE_LOCALHOST, MAIL_RECEIVER_PORT)
9+
receiver.connect()
10+
receiver.serve_forever()
11+
12+
if __name__ == '__main__':
13+
run()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from multiprocessing.connection import Client
2+
from threading import Semaphore
3+
4+
class Receiver:
5+
def __init__(self, hostname: str, port: int):
6+
self.lock = Semaphore(1)
7+
self.hostname = hostname
8+
self.port = port
9+
self.conn = None
10+
11+
def connect(self):
12+
self.conn = Client(address=(self.hostname, self.port,))
13+
14+
def recv(self):
15+
if self.conn is None:
16+
raise Exception('connection is not established')
17+
self.lock.acquire()
18+
res = self.conn.recv()
19+
self.lock.release()
20+
return res
21+
22+
def __del__(self):
23+
if self.conn is not None: self.conn.close()
24+
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from .receiver import Receiver
2+
3+
class ReceiverProcess(Receiver):
4+
def __init__(self, action, hostname: str, port: int):
5+
super().__init__(hostname, port)
6+
self.action = action
7+
8+
def serve_forever(self):
9+
while True:
10+
res = self.recv()
11+
self.action(*res)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from multiprocessing.connection import Listener
2+
from threading import Semaphore
3+
4+
class Sender:
5+
def __init__(self, hostname: str, port: int):
6+
self.lock = Semaphore(1)
7+
self.hostname = hostname
8+
self.port = port
9+
self.conn = None
10+
11+
def connect(self):
12+
listener = Listener(address=(self.hostname, self.port))
13+
self.conn = listener.accept()
14+
print("conn accepted ", self.port)
15+
16+
def send(self, *args):
17+
if self.conn is None:
18+
raise Exception('connection is not established')
19+
self.lock.acquire()
20+
self.conn.send(args)
21+
self.lock.release()
22+
return True
23+
24+
def __del__(self):
25+
if self.conn is not None:
26+
self.conn.close()

apps/ticketscraping/constants.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
from uuid import uuid4
22

3+
# services - async action handlers
4+
ASYNC_TASKS_RECEIVER_PORT = 8100
5+
MAIL_RECEIVER_PORT = 8200
6+
SERVICE_LOCALHOST = 'localhost'
7+
38
ANTIBOT_JS_CODE_URL = "https://epsf.ticketmaster.com/eps-d"
49
TOKEN_INTERROGATION_URL = "https://epsf.ticketmaster.com/eps-d?d=www.ticketmaster.com"
510

@@ -17,16 +22,39 @@ def get_top_picks_url(
1722
"BEST_AVAILABLE_SEATS": "best-available-seats",
1823
"BEST_HISTORY_SEATS": "best-history-seats"
1924
}
25+
26+
SUBSCRIBE_REQUEST_PROPS = {
27+
'NAME': 'name',
28+
'TARGET_PRICE': 'target_price',
29+
'TOLERANCE': 'tolerance',
30+
'TICKET_NUM': 'ticket_num',
31+
'TM_EVENT_ID': 'tm_event_id'
32+
}
33+
34+
def filter_obj_from_attrs(obj, atts: dict[str,str]):
35+
res = {}
36+
for key in atts.values():
37+
if key in obj:
38+
res[key] = obj[key]
39+
return res
40+
41+
42+
# metric thresholds
43+
MINIMUM_HISTORY_DATA = 3
44+
PERCENT_OF_CHANGE = 0.5
45+
PERCENTILE_HISTORY_PRICES = 0.25
46+
ALERT_SEATS_MAX_COUNT = 3
47+
2048
def get_top_picks_header(): return {
2149
**BASIC_REQ_HEADER,
2250
"tmps-correlation-id": str(uuid4())
2351
}
2452

25-
def get_top_picks_query_params(qty, priceInterval): return {
53+
def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): return {
2654
'show': 'places maxQuantity sections',
2755
'mode': 'primary:ppsectionrow resale:ga_areas platinum:all',
2856
'qty': qty,
29-
'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{priceInterval[0]}),lte(@,{priceInterval[1]}))))",
57+
'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{target_price - tolerance}),lte(@,{target_price + tolerance}))))",
3058
'includeStandard': 'true',
3159
'includeResale': 'true',
3260
'includePlatinumInventoryType': 'false',

0 commit comments

Comments
 (0)