From cd4083601cf3bdaa030d80c3637098cbe94f42bd Mon Sep 17 00:00:00 2001 From: Jack Li Date: Mon, 17 Oct 2022 13:17:57 -0500 Subject: [PATCH 1/3] Add async task --- apps/storage/base.py | 10 ++- apps/storage/query.py | 16 +++++ apps/storage/storage.py | 16 ++++- apps/ticketscraping/constants.py | 5 ++ apps/ticketscraping/tasks/asynchronous.py | 83 +++++++++++++++++++++++ apps/ticketscraping/tasks/periodic.py | 5 +- 6 files changed, 129 insertions(+), 6 deletions(-) create mode 100644 apps/storage/query.py create mode 100644 apps/ticketscraping/tasks/asynchronous.py diff --git a/apps/storage/base.py b/apps/storage/base.py index 24bab63..abd6eb4 100644 --- a/apps/storage/base.py +++ b/apps/storage/base.py @@ -30,12 +30,18 @@ def find_one_and_update__(coll: collection.Collection, filter: dict, update=dict def find_one_and_delete__(coll: collection.Collection, filter: dict): return coll.find_one_and_delete(filter) -def find_one__(coll: collection.Collection, filter: dict, projection): - return coll.find_one(filter=filter, projection=projection) +def find_one__(coll: collection.Collection, filter: dict, projection, **kwargs): + return coll.find_one(filter=filter, projection=projection, **kwargs) def find_many__(coll: collection.Collection, filter: dict, projection, **kwargs): return coll.find(filter=filter, projection=projection, **kwargs) +def count_docs__(coll: collection.Collection, filter: dict): + return coll.count_documents(filter=filter) + +def estimated_count_docs__(coll: collection.Collection): + return coll.estimated_document_count() + def watch__(coll: collection.Collection, **kwargs): return coll.watch(**kwargs) diff --git a/apps/storage/query.py b/apps/storage/query.py new file mode 100644 index 0000000..70b5be4 --- /dev/null +++ b/apps/storage/query.py @@ -0,0 +1,16 @@ +from .storage import * +import pymongo + +# find the max value in a collection +def find_max(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.DESCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +# find the min value in a collection +def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_many(collection_name, filter, db_name=db_name, sort=sort_seq) \ No newline at end of file diff --git a/apps/storage/storage.py b/apps/storage/storage.py index ef1d48d..f9c3256 100644 --- a/apps/storage/storage.py +++ b/apps/storage/storage.py @@ -51,10 +51,10 @@ def find_one_and_delete(collection_name, filter: dict, db_name="tickets"): return find_one_and_delete__(coll, filter) # find one -def find_one(collection_name, filter: dict, projection=None, db_name="tickets"): +def find_one(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): db = get_db_handle(db_name) coll = db[collection_name] - return find_one__(coll, filter, projection) + return find_one__(coll, filter, projection, **kwargs) # find many def find_many(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): @@ -62,6 +62,18 @@ def find_many(collection_name, filter: dict, projection=None, db_name="tickets", coll = db[collection_name] return list(find_many__(coll, filter, projection, **kwargs)) +# count with filter +def count_docs(collection_name, filter: dict, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return count_docs__(coll, filter) + +# count all docs in a collection +def estimated_count_docs(collection_name, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return estimated_count_docs__(coll) + # watch changes def watch(collection_name, db_name="tickets", **kwargs): db = get_db_handle(db_name) diff --git a/apps/ticketscraping/constants.py b/apps/ticketscraping/constants.py index 87d60d7..0c4f2f4 100644 --- a/apps/ticketscraping/constants.py +++ b/apps/ticketscraping/constants.py @@ -17,6 +17,11 @@ def get_top_picks_url( "BEST_AVAILABLE_SEATS": "best-available-seats", "BEST_HISTORY_SEATS": "best-history-seats" } + +# metric thresholds +PERCENT_OF_CHANGE = 0.5 +PERCENTILE_HISTORY_PRICES = 0.5 + def get_top_picks_header(): return { **BASIC_REQ_HEADER, "tmps-correlation-id": str(uuid4()) diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py new file mode 100644 index 0000000..6fa3600 --- /dev/null +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -0,0 +1,83 @@ +from ...storage.storage import count_docs +from ...storage.query import find_max, find_min, find_many_ascending_order +from ...ticketscraping import constants +from ..models.pick import Pick + +# metric 1 + + +def percent_of_change_metric(pick: Pick, scraping_id: str) -> float: + # Find the % of change + max_seat = find_max(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": scraping_id}, 'price') + min_seat = find_min(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": scraping_id}, 'price') + max_price = max_seat.get('price', 0) + min_price = min_seat.get('price', 0) + + # min and max are identical - abort + if max_price == min_price: + raise Exception('min and max prices are identical') + + percent_change = (pick.price - min_price) / (max_price - min_price) + + # price of change exceeds the metric value - abort + if percent_change > constants.PERCENT_OF_CHANGE: + raise Exception( + f'price of change ({percent_change}) exceeds the metric value') + + return percent_change + +# metric 2 + + +def percentile_metric(pick: Pick, scraping_id: str) -> float: + rank = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": scraping_id, "price": {"$lte": pick.price}}) + total_count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": scraping_id}) + + # no history seats data - abort + if total_count == 0: + raise Exception('no history seats data') + + percentile = rank / total_count + + # percentile of history prices exceeds the metric value - abort + if percentile > constants.PERCENTILE_HISTORY_PRICES: + raise Exception( + 'percentile of history prices ({percentile}) exceeds the metric value') + + return percentile + + +def get_exact_same_seats(pick: Pick, scraping_id: str): + return find_many_ascending_order(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": scraping_id, "section": pick.section, + "row": pick.row, "seat_columns": pick.seat_columns}, + 'last_modified') + + +def run_async_task(pick: Pick, scraping_id: str): + try: + # Find the % of change + percent_change = percent_of_change_metric(pick, scraping_id) + # Find the percentile of the seat based on some criteria(e.g. rank or price). + percentile = percentile_metric(pick, scraping_id) + # If found the exact same seat based on(sec, row?, seat?), get the history price(s) of the seat. + same_seats = get_exact_same_seats(pick, scraping_id) + + print(f"percent change: {percent_change*100}") + print(f"percentile: {percentile*100}") + print(f"same seats in chronological order") + print(f"new seat price: {pick.price}") + print(f"history seat prices:") + print(list(map(lambda seat: seat.get('price', -1), same_seats))) + + # TODO + # Alert the user based on alert conditions + + except Exception as ex: + print(ex) + pass diff --git a/apps/ticketscraping/tasks/periodic.py b/apps/ticketscraping/tasks/periodic.py index d0ce8ac..0b8ecec 100644 --- a/apps/ticketscraping/tasks/periodic.py +++ b/apps/ticketscraping/tasks/periodic.py @@ -1,6 +1,7 @@ from ...storage.storage import find_many, insert_many, delete_many from ...ticketscraping import constants from ..models.pick import Pick +from .asynchronous import run_async_task def generate_picks_set_from_picks(picks): def __helper(pick: dict): @@ -65,7 +66,7 @@ def run_periodic_task(picks: dict, scraping_id: str): # Save C to best_history_seats. insert_history_seats(overwritten_seats) - # TODO # Use D to invoke a handler to analyze them against the best_history_seats asynchronously. - + for seat in new_seats: + run_async_task(seat, scraping_id) pass \ No newline at end of file From 97f34dc6accf391effda50ebc013b8434cb45b59 Mon Sep 17 00:00:00 2001 From: Jack Li Date: Mon, 17 Oct 2022 15:11:51 -0500 Subject: [PATCH 2/3] Add alert strategies --- apps/ticketscraping/tasks/asynchronous.py | 20 ++++++++------ .../tasks/strategies/similarPrice.py | 21 +++++++++++++++ .../tasks/strategies/similarRank.py | 27 +++++++++++++++++++ 3 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 apps/ticketscraping/tasks/strategies/similarPrice.py create mode 100644 apps/ticketscraping/tasks/strategies/similarRank.py diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py index 6fa3600..61c5556 100644 --- a/apps/ticketscraping/tasks/asynchronous.py +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -2,6 +2,7 @@ from ...storage.query import find_max, find_min, find_many_ascending_order from ...ticketscraping import constants from ..models.pick import Pick +from .strategies import similarRank # metric 1 @@ -68,15 +69,18 @@ def run_async_task(pick: Pick, scraping_id: str): # If found the exact same seat based on(sec, row?, seat?), get the history price(s) of the seat. same_seats = get_exact_same_seats(pick, scraping_id) - print(f"percent change: {percent_change*100}") - print(f"percentile: {percentile*100}") - print(f"same seats in chronological order") - print(f"new seat price: {pick.price}") - print(f"history seat prices:") - print(list(map(lambda seat: seat.get('price', -1), same_seats))) - - # TODO # Alert the user based on alert conditions + count, seats = similarRank.price_decrease_similar_rank( + pick, scraping_id) + if(count > 0): + print(f"percent change: {percent_change*100}") + print(f"percentile: {percentile*100}") + print(f"same seats in chronological order") + print(f"new seat - price: {pick.price} rank: {pick.quality} section: {pick.section} row: {pick.row}") + print(f"history exact same seat prices:") + print(list(map(lambda seat: seat.get('price', -1), same_seats))) + print(f"strategy compared to history seat ({count}) prices:") + print(list(map(lambda seat: f"sec: {seat.get('section')}; row: {seat.get('row')}; rank: {seat.get('quality')}; price: {seat.get('price')}", seats))) except Exception as ex: print(ex) diff --git a/apps/ticketscraping/tasks/strategies/similarPrice.py b/apps/ticketscraping/tasks/strategies/similarPrice.py new file mode 100644 index 0000000..cf46613 --- /dev/null +++ b/apps/ticketscraping/tasks/strategies/similarPrice.py @@ -0,0 +1,21 @@ +from ....storage.storage import count_docs +from ....ticketscraping import constants +from ...models.pick import Pick + + +def rank_increase_similar_price(pick: Pick, scraping_id: str): + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": scraping_id, + "price": {"$gte": pick.price}, + "quality": {"$lt": pick.quality}, + }) > 0 + + +def rank_decrease_similar_price(pick: Pick, scraping_id: str): + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": scraping_id, + "price": {"$lte": pick.price}, + "quality": {"$gt": pick.quality}, + }) > 0 diff --git a/apps/ticketscraping/tasks/strategies/similarRank.py b/apps/ticketscraping/tasks/strategies/similarRank.py new file mode 100644 index 0000000..3afbf0c --- /dev/null +++ b/apps/ticketscraping/tasks/strategies/similarRank.py @@ -0,0 +1,27 @@ +from ....storage.storage import count_docs, find_many +from ....ticketscraping import constants +from ...models.pick import Pick + + +def price_decrease_similar_rank(pick: Pick, scraping_id: str): + filter_obj = { + "scraping_id": scraping_id, + "price": {"$gt": pick.price}, + "quality": {"$lte": pick.quality}, + } + count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + seats = find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=[ + ("quality", -1)]) if count > 0 else None + return (count, seats) + + +def price_increase_similar_rank(pick: Pick, scraping_id: str): + filter_obj = { + "scraping_id": scraping_id, + "price": {"$lt": pick.price}, + "quality": {"$gte": pick.quality}, + } + count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + seats = find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=[ + ("quality", 1)]) if count > 0 else None + return (count, seats) From f998e0cdf45ee709aee72d1cdd6340f862f47249 Mon Sep 17 00:00:00 2001 From: Jack Li Date: Sun, 23 Oct 2022 14:32:58 -0500 Subject: [PATCH 3/3] add async task scheduler and mail scheduler --- apps/startup/apps.py | 1 + apps/storage/query.py | 2 +- apps/ticketscraping/constants.py | 25 ++- apps/ticketscraping/prepare_reese84token.py | 7 +- .../schedulers/async_task_scheduler.py | 5 + .../schedulers/async_tasks_child_process.py | 12 ++ .../schedulers/mail_scheduler.py | 5 + .../mail_scheduler_child_process.py | 12 ++ .../schedulers/scheduler_process.py | 25 +++ apps/ticketscraping/scraping.py | 32 ++-- apps/ticketscraping/tasks/asynchronous.py | 123 ++++-------- apps/ticketscraping/tasks/periodic.py | 10 +- .../tasks/strategies/quarters_seats.py | 178 ++++++++++++++++++ .../tasks/strategies/similarPrice.py | 21 --- .../tasks/strategies/similarRank.py | 27 --- apps/ticketscraping/tasks/util/decorators.py | 36 ++++ apps/ticketscraping/tasks/util/math.py | 9 + apps/ticketscraping/tasks/util/scheduler.py | 14 ++ apps/trackerapi/error_handler.py | 18 ++ apps/trackerapi/views.py | 30 +-- start.py | 15 ++ startserver.sh | 5 + tmtracker/settings.py | 2 +- utils.py | 2 +- 24 files changed, 441 insertions(+), 175 deletions(-) create mode 100644 apps/ticketscraping/schedulers/async_task_scheduler.py create mode 100644 apps/ticketscraping/schedulers/async_tasks_child_process.py create mode 100644 apps/ticketscraping/schedulers/mail_scheduler.py create mode 100644 apps/ticketscraping/schedulers/mail_scheduler_child_process.py create mode 100644 apps/ticketscraping/schedulers/scheduler_process.py create mode 100644 apps/ticketscraping/tasks/strategies/quarters_seats.py delete mode 100644 apps/ticketscraping/tasks/strategies/similarPrice.py delete mode 100644 apps/ticketscraping/tasks/strategies/similarRank.py create mode 100644 apps/ticketscraping/tasks/util/decorators.py create mode 100644 apps/ticketscraping/tasks/util/math.py create mode 100644 apps/ticketscraping/tasks/util/scheduler.py create mode 100644 apps/trackerapi/error_handler.py create mode 100644 start.py create mode 100755 startserver.sh diff --git a/apps/startup/apps.py b/apps/startup/apps.py index da34cfa..9865b0d 100644 --- a/apps/startup/apps.py +++ b/apps/startup/apps.py @@ -11,4 +11,5 @@ class MyAppConfig(AppConfig): def ready(self): print( f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") + # start scraping Thread(target=start).start() diff --git a/apps/storage/query.py b/apps/storage/query.py index 70b5be4..96f18a7 100644 --- a/apps/storage/query.py +++ b/apps/storage/query.py @@ -13,4 +13,4 @@ def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"): def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"): sort_seq = [(sort_key, pymongo.ASCENDING)] - return find_many(collection_name, filter, db_name=db_name, sort=sort_seq) \ No newline at end of file + return find_many(collection_name, filter, db_name=db_name, sort=sort_seq) diff --git a/apps/ticketscraping/constants.py b/apps/ticketscraping/constants.py index 0c4f2f4..4c9fb59 100644 --- a/apps/ticketscraping/constants.py +++ b/apps/ticketscraping/constants.py @@ -1,3 +1,4 @@ +from functools import reduce from uuid import uuid4 ANTIBOT_JS_CODE_URL = "https://epsf.ticketmaster.com/eps-d" @@ -18,20 +19,38 @@ def get_top_picks_url( "BEST_HISTORY_SEATS": "best-history-seats" } +SUBSCRIBE_REQUEST_PROPS = { + 'NAME': 'name', + 'TARGET_PRICE': 'target_price', + 'TOLERANCE': 'tolerance', + 'TICKET_NUM': 'ticket_num', + 'TM_EVENT_ID': 'tm_event_id' +} + +def filter_obj_from_attrs(obj, atts: dict[str,str]): + res = {} + for key in atts.values(): + if key in obj: + res[key] = obj[key] + return res + + # metric thresholds +MINIMUM_HISTORY_DATA = 3 PERCENT_OF_CHANGE = 0.5 -PERCENTILE_HISTORY_PRICES = 0.5 +PERCENTILE_HISTORY_PRICES = 0.25 +ALERT_SEATS_MAX_COUNT = 3 def get_top_picks_header(): return { **BASIC_REQ_HEADER, "tmps-correlation-id": str(uuid4()) } -def get_top_picks_query_params(qty, priceInterval): return { +def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): return { 'show': 'places maxQuantity sections', 'mode': 'primary:ppsectionrow resale:ga_areas platinum:all', 'qty': qty, - 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{priceInterval[0]}),lte(@,{priceInterval[1]}))))", + 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{target_price - tolerance}),lte(@,{target_price + tolerance}))))", 'includeStandard': 'true', 'includeResale': 'true', 'includePlatinumInventoryType': 'false', diff --git a/apps/ticketscraping/prepare_reese84token.py b/apps/ticketscraping/prepare_reese84token.py index 2fbde23..0b23f2a 100644 --- a/apps/ticketscraping/prepare_reese84token.py +++ b/apps/ticketscraping/prepare_reese84token.py @@ -6,7 +6,7 @@ from . import constants -def getReese84Token(): +def getReese84Token()->tuple[str, int]: def readFileContentToString(filename): f = open(filename, 'r') content = f.read() @@ -19,7 +19,7 @@ def readFileContentToString(filename): # trim the code to the function that is only used match_obj = re.search(constants.FN_MATCHING_REGEX, antibot_js_code_full) if not match_obj: - return None + raise Exception('reese84 manufacture fails') start, end = match_obj.span() antibot_js_code_trim = antibot_js_code_full[start:end] @@ -51,4 +51,5 @@ def readFileContentToString(filename): # invoke the get token api to get the reese84 token token_json_res = requests.post( constants.TOKEN_INTERROGATION_URL, headers=constants.BASIC_REQ_HEADER, json=token) - return token_json_res.json() + json_obj = token_json_res.json() + return json_obj['token'], json_obj['renewInSec'] diff --git a/apps/ticketscraping/schedulers/async_task_scheduler.py b/apps/ticketscraping/schedulers/async_task_scheduler.py new file mode 100644 index 0000000..6b48101 --- /dev/null +++ b/apps/ticketscraping/schedulers/async_task_scheduler.py @@ -0,0 +1,5 @@ +from .scheduler_process import SchedulerProcess + +print(__name__) +async_tasks_scheduler = SchedulerProcess() +async_tasks_scheduler.start() diff --git a/apps/ticketscraping/schedulers/async_tasks_child_process.py b/apps/ticketscraping/schedulers/async_tasks_child_process.py new file mode 100644 index 0000000..3b09033 --- /dev/null +++ b/apps/ticketscraping/schedulers/async_tasks_child_process.py @@ -0,0 +1,12 @@ +import threading +from multiprocessing import Queue +from ..tasks.util.scheduler import Scheduler +from ..tasks.asynchronous import run_async_tasks + +def child_process(queue: Queue): + scheduler = Scheduler('asynchronous tasks scheduler', run_async_tasks) + sched_thread = threading.Thread(target=scheduler.run) + sched_thread.start() + while True: + received_params = queue.get() + scheduler.enter(*received_params) diff --git a/apps/ticketscraping/schedulers/mail_scheduler.py b/apps/ticketscraping/schedulers/mail_scheduler.py new file mode 100644 index 0000000..d5f7af7 --- /dev/null +++ b/apps/ticketscraping/schedulers/mail_scheduler.py @@ -0,0 +1,5 @@ +# from .scheduler_process import SchedulerProcess +# from .mail_scheduler_child_process import child_process + +# mail_scheduler = SchedulerProcess(child_process) +# mail_scheduler.start() diff --git a/apps/ticketscraping/schedulers/mail_scheduler_child_process.py b/apps/ticketscraping/schedulers/mail_scheduler_child_process.py new file mode 100644 index 0000000..bb74362 --- /dev/null +++ b/apps/ticketscraping/schedulers/mail_scheduler_child_process.py @@ -0,0 +1,12 @@ +import threading +from multiprocessing import Queue +from ..tasks.util.scheduler import Scheduler + +def child_process(queue: Queue): + #TODO + scheduler = Scheduler('mail scheduler', lambda x: x) + sched_thread = threading.Thread(target=scheduler.run) + sched_thread.start() + while True: + received_params = queue.get() + scheduler.enter(*received_params) diff --git a/apps/ticketscraping/schedulers/scheduler_process.py b/apps/ticketscraping/schedulers/scheduler_process.py new file mode 100644 index 0000000..fae2b74 --- /dev/null +++ b/apps/ticketscraping/schedulers/scheduler_process.py @@ -0,0 +1,25 @@ +import threading +from multiprocessing import Process, Queue +from .async_tasks_child_process import child_process + +class SchedulerProcess: + def __init__(self): + self.queue = Queue() + self.started = False + self.starter_thread = None + + def generate_child_process(self): + cp = Process(target=child_process, args=(self.queue,)) + cp.start() + self.started = True + cp.join() + + def start(self): + self.starter_thread = threading.Thread( + target=self.generate_child_process) + self.starter_thread.start() + + def produce(self, *args): + if not self.started: + return + self.queue.put(args) diff --git a/apps/ticketscraping/scraping.py b/apps/ticketscraping/scraping.py index 2a7efed..2e3d3d4 100644 --- a/apps/ticketscraping/scraping.py +++ b/apps/ticketscraping/scraping.py @@ -14,7 +14,8 @@ class Reese84TokenUpdating(): def __init__(self): self.is_running = False - self.reese84_token = {} + self.reese84_token = '' + self.reese84_renewInSec = 0 self.token_semaphore = Semaphore(0) self.scheduler = sched.scheduler(time.time, time.sleep) @@ -22,9 +23,9 @@ def initialize_reese84_token(self): """ This method should not be called directly. """ - self.reese84_token = getReese84Token() + self.reese84_token, self.reese84_renewInSec = getReese84Token() self.token_semaphore.release() # produce a new token - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def renew_reese84_token(self): @@ -35,7 +36,7 @@ def renew_reese84_token(self): self.token_semaphore.acquire() # invalidate a token self.reese84_token = getReese84Token() self.token_semaphore.release() - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def start(self): @@ -48,14 +49,15 @@ def start(self): class TicketScraping(threading.Thread): - def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id, num_seats=2, price_range=(0, 200)): + def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int): threading.Thread.__init__(self) self.is_running = False self.is_stopping = False self.event_id = event_id self.subscribe_id = subscribe_id self.num_seats = num_seats - self.price_range = price_range + self.target_price = target_price + self.tolerance = tolerance self.token_gen = token_generator self.scheduler = sched.scheduler(time.time, time.sleep) self.initialDelay = random.randint( @@ -83,17 +85,17 @@ def ticket_scraping(self): # scrape the top-picks from ticketmaster top_picks_url = constants.get_top_picks_url(self.event_id) top_picks_q_params = constants.get_top_picks_query_params( - self.num_seats, self.price_range) + self.num_seats, self.target_price, self.tolerance) top_picks_header = constants.get_top_picks_header() res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, - cookies=dict(reese84=self.token_gen.reese84_token['token'])) - # print(res.json()) + cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore # prune and format the received picks picks_obj = format_seats(res.json(), self.subscribe_id) # periodic task: update collections best_available_seats and best_history_seats - run_periodic_task(picks_obj, self.subscribe_id) + # and automatically spawn async tasks + run_periodic_task(picks_obj, self.subscribe_id, self.target_price) print("Got the ticket info from TM. /", res.status_code) self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL, @@ -102,8 +104,8 @@ def ticket_scraping(self): def get_id(self): # returns id of the respective thread if hasattr(self, '_thread_id'): - return self._thread_id - for id, thread in threading._active.items(): + return self._thread_id # type: ignore + for id, thread in threading._active.items(): # type: ignore if thread is self: return id @@ -135,7 +137,7 @@ def start(): '$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]}) for evt in events: ticket_scraping = TicketScraping( - reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"]) + reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping for scraping_thread in scraping_list.values(): @@ -156,7 +158,7 @@ def start(): # spawn a thread to do scraping operations full_doc = change['fullDocument'] ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() @@ -177,7 +179,7 @@ def start(): # resume scraping if currently paused if doc_id not in scraping_list: ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py index 61c5556..96acd28 100644 --- a/apps/ticketscraping/tasks/asynchronous.py +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -1,87 +1,42 @@ from ...storage.storage import count_docs -from ...storage.query import find_max, find_min, find_many_ascending_order from ...ticketscraping import constants from ..models.pick import Pick -from .strategies import similarRank - -# metric 1 - - -def percent_of_change_metric(pick: Pick, scraping_id: str) -> float: - # Find the % of change - max_seat = find_max(constants.DATABASE['BEST_HISTORY_SEATS'], { - "scraping_id": scraping_id}, 'price') - min_seat = find_min(constants.DATABASE['BEST_HISTORY_SEATS'], { - "scraping_id": scraping_id}, 'price') - max_price = max_seat.get('price', 0) - min_price = min_seat.get('price', 0) - - # min and max are identical - abort - if max_price == min_price: - raise Exception('min and max prices are identical') - - percent_change = (pick.price - min_price) / (max_price - min_price) - - # price of change exceeds the metric value - abort - if percent_change > constants.PERCENT_OF_CHANGE: - raise Exception( - f'price of change ({percent_change}) exceeds the metric value') - - return percent_change - -# metric 2 - - -def percentile_metric(pick: Pick, scraping_id: str) -> float: - rank = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], - {"scraping_id": scraping_id, "price": {"$lte": pick.price}}) - total_count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], - { - "scraping_id": scraping_id}) - - # no history seats data - abort - if total_count == 0: - raise Exception('no history seats data') - - percentile = rank / total_count - - # percentile of history prices exceeds the metric value - abort - if percentile > constants.PERCENTILE_HISTORY_PRICES: - raise Exception( - 'percentile of history prices ({percentile}) exceeds the metric value') - - return percentile - - -def get_exact_same_seats(pick: Pick, scraping_id: str): - return find_many_ascending_order(constants.DATABASE['BEST_HISTORY_SEATS'], - {"scraping_id": scraping_id, "section": pick.section, - "row": pick.row, "seat_columns": pick.seat_columns}, - 'last_modified') - - -def run_async_task(pick: Pick, scraping_id: str): - try: - # Find the % of change - percent_change = percent_of_change_metric(pick, scraping_id) - # Find the percentile of the seat based on some criteria(e.g. rank or price). - percentile = percentile_metric(pick, scraping_id) - # If found the exact same seat based on(sec, row?, seat?), get the history price(s) of the seat. - same_seats = get_exact_same_seats(pick, scraping_id) - - # Alert the user based on alert conditions - count, seats = similarRank.price_decrease_similar_rank( - pick, scraping_id) - if(count > 0): - print(f"percent change: {percent_change*100}") - print(f"percentile: {percentile*100}") - print(f"same seats in chronological order") - print(f"new seat - price: {pick.price} rank: {pick.quality} section: {pick.section} row: {pick.row}") - print(f"history exact same seat prices:") - print(list(map(lambda seat: seat.get('price', -1), same_seats))) - print(f"strategy compared to history seat ({count}) prices:") - print(list(map(lambda seat: f"sec: {seat.get('section')}; row: {seat.get('row')}; rank: {seat.get('quality')}; price: {seat.get('price')}", seats))) - - except Exception as ex: - print(ex) - pass +import typing, threading +from .util.decorators import StorageInsertionOrder, wrap_fn_return +from .strategies.quarters_seats import QuartersSeats +# from ..schedulers.mail_scheduler import mail_scheduler + + +def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int): + store = StorageInsertionOrder(size=len(list(picks))) + print("inside async tasks") + def fn(arg: tuple[int, Pick]): + idx, pick = arg + thread = threading.Thread(target=wrap_fn_return(run_async_task, store.set, idx), args=(pick, scraping_id, target_price)) + thread.start() + return thread + + threads = list(map(fn, enumerate(picks))) + + for thread in threads: thread.join() + + # filter out new seats that do not need alerts + store.filter() + # better seats first + store.sort() + # get seats used in alerting + alert_seats = store.sublist(0, min(store.size, constants.ALERT_SEATS_MAX_COUNT)) + # get the alert information + alert_contents = list(map(lambda qs: qs.get_alert_content(), alert_seats)) # type: ignore + # send the alert to user + mail_scheduler.produce((alert_contents,)) # type: ignore + + +def run_async_task(pick: Pick, scraping_id: str, target_price: int): + # Alert the user based on alert conditions + qs = QuartersSeats(pick, scraping_id, target_price) + if not qs.shouldAlert(): + return None + + # success + return qs diff --git a/apps/ticketscraping/tasks/periodic.py b/apps/ticketscraping/tasks/periodic.py index 0b8ecec..a48a574 100644 --- a/apps/ticketscraping/tasks/periodic.py +++ b/apps/ticketscraping/tasks/periodic.py @@ -1,7 +1,7 @@ from ...storage.storage import find_many, insert_many, delete_many from ...ticketscraping import constants from ..models.pick import Pick -from .asynchronous import run_async_task +from ..schedulers.async_task_scheduler import async_tasks_scheduler def generate_picks_set_from_picks(picks): def __helper(pick: dict): @@ -39,8 +39,7 @@ def insert_history_seats(seats: set[Pick]): return insert_many(constants.DATABASE['BEST_HISTORY_SEATS'], list(map(lambda seat: vars(seat), seats))) - -def run_periodic_task(picks: dict, scraping_id: str): +def run_periodic_task(picks, scraping_id: str, target_price: int): # B the list of new best available seats new_best_avail = generate_picks_set_from_picks(picks) # A be the list of current best available seats @@ -67,6 +66,5 @@ def run_periodic_task(picks: dict, scraping_id: str): insert_history_seats(overwritten_seats) # Use D to invoke a handler to analyze them against the best_history_seats asynchronously. - for seat in new_seats: - run_async_task(seat, scraping_id) - pass \ No newline at end of file + async_tasks_scheduler.produce( + (new_seats, scraping_id, target_price)) diff --git a/apps/ticketscraping/tasks/strategies/quarters_seats.py b/apps/ticketscraping/tasks/strategies/quarters_seats.py new file mode 100644 index 0000000..2e35131 --- /dev/null +++ b/apps/ticketscraping/tasks/strategies/quarters_seats.py @@ -0,0 +1,178 @@ +import pymongo +from ....storage.storage import count_docs, find_many +from ....storage.query import find_max, find_min, find_many_ascending_order +from ....ticketscraping import constants +from ...models.pick import Pick +from ..util.math import percentile + + +class QuartersSeats(): + top_better_history_seats_sort = [ + ('rank', pymongo.DESCENDING), ('price', pymongo.ASCENDING)] + + def __init__(self, pick: Pick, scraping_id: str, target_price: int): + self._pick = pick + self._scraping_id = scraping_id + self.target_price = target_price + self.percentile = 1.0 + + def __lt__(self, other): + # smaller the percentile, better the pick + return self.percentile > other.percentile + + def __eq__(self, other): + return self.percentile == other.percentile + + @property + def pick(self): + return self._pick + + @property + def scraping_id(self): + return self._scraping_id + + def get_alert_content(self): + # alert user + # Find the exact same seat based on(sec, row?, seat?) + same_seats = self.get_exact_same_seats() + # rank = self._num_before + 1 + # top best history = get_top_better_history_seats() + # notify user with info + return '' + pass + + def get_top_better_history_seats(self): + return self.find_better_history_seats(sort=self.top_better_history_seats_sort) + + def shouldAlert(self): + try: + # Find price match + self.target_price_metric() + # Find enough history seats data + percentile = self.percentile_metric() + # Find the % of change + percent_change = self.percent_of_change_metric() + # Find percentile of seats in quarters + self.quarter_percentile_metric() + except Exception as ex: + print(ex) + return False + + # success + print(f"percent change out of max-min: {percent_change*100}") + print(f"all history seats percentile: {percentile*100}") + print( + f"new seat - price: {self.pick.price} rank: {self.pick.quality} section: {self.pick.section} row: {self.pick.row}") + print(f"quarters history seats percentile: {self.percentile*100}") + return True + + + def quarter_percentile_metric(self): + if self.get_percentile() > constants.PERCENTILE_HISTORY_PRICES: + raise Exception('the seat is not recommended') + + def get_percentile(self): + self._num_before = self.count_better_history_seats() + self._num_total = self.count_quarters_history_seats() + self.percentile = percentile(self._num_before, self._num_total) + return self.percentile + + def count_quarters_history_seats(self): + filter_obj = self.__get_quarters_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def __get_quarters_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "$or": [ + { + "price": {"$lte": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + }, + { + "price": {"$gt": self._pick.price}, + "quality": {"$lt": self._pick.quality}, + } + ] + } + + def __get_better_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "$or": [ + { + "price": {"$lt": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + }, + { + "price": {"$lte": self._pick.price}, + "quality": {"$gt": self._pick.quality}, + } + ] + } + + def find_better_history_seats(self, **kwargs): + limit = kwargs.get('limit') + sort_seq = kwargs.get('sort') + filter_obj = self.__get_better_history_seats_filter__() + return find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=sort_seq, limit=limit) + + def count_better_history_seats(self): + filter_obj = self.__get_better_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def target_price_metric(self): + # exceed target price - abort + if self.pick.price > self.target_price: + raise Exception('price of the seat is not low enough') + + + def percent_of_change_metric(self) -> float: + # Find the % of change + max_seat = find_max(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + min_seat = find_min(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + max_price = 0 if type(max_seat) is not dict else max_seat.get('price', 0) + min_price = 0 if type(min_seat) is not dict else min_seat.get('price', 0) + + # min and max are identical - abort + if max_price == min_price: + raise Exception('min and max prices are identical') + + percent_change = (self.pick.price - min_price) / (max_price - min_price) + + # # price of change exceeds the metric value - abort + # if percent_change > constants.PERCENT_OF_CHANGE: + # raise Exception( + # f'price of change ({percent_change}) exceeds the metric value') + + return percent_change + + + def percentile_metric(self) -> float: + rank = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "price": {"$lte": self.pick.price}}) + total_count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": self.scraping_id}) + + # no history seats data - abort + if total_count < constants.MINIMUM_HISTORY_DATA: + raise Exception('no enough history seats data (count < 3)') + + percentile = rank / total_count + + # # percentile of history prices exceeds the metric value - abort + # if percentile > constants.PERCENTILE_HISTORY_PRICES: + # raise Exception( + # 'percentile of history prices ({percentile}) exceeds the metric value') + + return percentile + + + def get_exact_same_seats(self): + return find_many_ascending_order(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "section": self.pick.section, + "row": self.pick.row, "seat_columns": self.pick.seat_columns}, + 'last_modified') diff --git a/apps/ticketscraping/tasks/strategies/similarPrice.py b/apps/ticketscraping/tasks/strategies/similarPrice.py deleted file mode 100644 index cf46613..0000000 --- a/apps/ticketscraping/tasks/strategies/similarPrice.py +++ /dev/null @@ -1,21 +0,0 @@ -from ....storage.storage import count_docs -from ....ticketscraping import constants -from ...models.pick import Pick - - -def rank_increase_similar_price(pick: Pick, scraping_id: str): - return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], - { - "scraping_id": scraping_id, - "price": {"$gte": pick.price}, - "quality": {"$lt": pick.quality}, - }) > 0 - - -def rank_decrease_similar_price(pick: Pick, scraping_id: str): - return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], - { - "scraping_id": scraping_id, - "price": {"$lte": pick.price}, - "quality": {"$gt": pick.quality}, - }) > 0 diff --git a/apps/ticketscraping/tasks/strategies/similarRank.py b/apps/ticketscraping/tasks/strategies/similarRank.py deleted file mode 100644 index 3afbf0c..0000000 --- a/apps/ticketscraping/tasks/strategies/similarRank.py +++ /dev/null @@ -1,27 +0,0 @@ -from ....storage.storage import count_docs, find_many -from ....ticketscraping import constants -from ...models.pick import Pick - - -def price_decrease_similar_rank(pick: Pick, scraping_id: str): - filter_obj = { - "scraping_id": scraping_id, - "price": {"$gt": pick.price}, - "quality": {"$lte": pick.quality}, - } - count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) - seats = find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=[ - ("quality", -1)]) if count > 0 else None - return (count, seats) - - -def price_increase_similar_rank(pick: Pick, scraping_id: str): - filter_obj = { - "scraping_id": scraping_id, - "price": {"$lt": pick.price}, - "quality": {"$gte": pick.quality}, - } - count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) - seats = find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=[ - ("quality", 1)]) if count > 0 else None - return (count, seats) diff --git a/apps/ticketscraping/tasks/util/decorators.py b/apps/ticketscraping/tasks/util/decorators.py new file mode 100644 index 0000000..aa2929c --- /dev/null +++ b/apps/ticketscraping/tasks/util/decorators.py @@ -0,0 +1,36 @@ +class StorageInsertionOrder: + def __init__(self, size=0): + self.store = [0] * size + + def __iter__(self): + return iter(self.store) + + @property + def size(self): + return len(self.store) + + def add(self, item): + self.store.append(item) + + def get(self, index: int): + return self.store[index] + + def set(self, index: int, item): + self.store[index] = item + + def sort(self): + sorted(self.store) + + def filter(self): + self.store = list(filter(lambda item: item is not None, self.store)) + + def sublist(self, from_idx: int, to_idx: int): + return self.store[from_idx:to_idx] + + +def wrap_fn_return(fn, storing_fn, index): + def inner_fn(*args, **kwargs): + res = fn(*args, **kwargs) + storing_fn(index, res) + return res + return inner_fn \ No newline at end of file diff --git a/apps/ticketscraping/tasks/util/math.py b/apps/ticketscraping/tasks/util/math.py new file mode 100644 index 0000000..a19642c --- /dev/null +++ b/apps/ticketscraping/tasks/util/math.py @@ -0,0 +1,9 @@ +def percentile(num_before: int, total: int) -> float: + denominator = total + if denominator == 0: + denominator = 1 + return num_before / denominator + +def percentileInBetween(num_before: int, num_after: int)->float: + denominator = num_before + num_after + 1 + return percentile(num_before, denominator) diff --git a/apps/ticketscraping/tasks/util/scheduler.py b/apps/ticketscraping/tasks/util/scheduler.py new file mode 100644 index 0000000..09ae528 --- /dev/null +++ b/apps/ticketscraping/tasks/util/scheduler.py @@ -0,0 +1,14 @@ +import time +from sched import scheduler + +class Scheduler: + def __init__(self, name: str, action): + self.name = name + self.scheduler = scheduler(time.time, time.sleep) + self.action = action + + def enter(self, *args, **kwargs): + return self.scheduler.enter(0, 1, self.action, args, kwargs) + + def run(self): + self.scheduler.run() diff --git a/apps/trackerapi/error_handler.py b/apps/trackerapi/error_handler.py new file mode 100644 index 0000000..b278909 --- /dev/null +++ b/apps/trackerapi/error_handler.py @@ -0,0 +1,18 @@ +from django.http import JsonResponse + +# custom exceptions +class BadRequestException(Exception): + pass + +# error handler decorator +def wrap_error_handler(fn): + def inner_fn(*args, **kwargs): + try: + return fn(*args, **kwargs) + except BadRequestException as ex: + # bad request + return JsonResponse({"error": str(ex)}, status=400) + except Exception as ex: + # internal server error + return JsonResponse({"error": "something went wrong."}, status=500) + return inner_fn \ No newline at end of file diff --git a/apps/trackerapi/views.py b/apps/trackerapi/views.py index 635d7d2..39a98c9 100644 --- a/apps/trackerapi/views.py +++ b/apps/trackerapi/views.py @@ -1,36 +1,40 @@ from django.shortcuts import render from django.http import HttpResponse, HttpRequest from ..storage.storage import insert_one, find_one_and_update +from .error_handler import BadRequestException, wrap_error_handler from ..ticketscraping import constants import json # Create your views here. +@wrap_error_handler def subscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) # validation - for key in ['name', 'price_range', 'ticket_num', 'tm_event_id']: + for key in constants.SUBSCRIBE_REQUEST_PROPS.values(): if key not in body: - return HttpResponse('Request is invalid.', status=400) - doc = { - "name": body["name"], - "price_range": body["price_range"], - "ticket_num": body["ticket_num"], - "tm_event_id": body["tm_event_id"] - } + raise BadRequestException('Request is invalid.') + # validation + target_price = body["target_price"] + tolerance = body["tolerance"] + if target_price - tolerance < 0: + raise BadRequestException('Lowest price cannot be negative.') + + doc = constants.filter_obj_from_attrs(body, constants.SUBSCRIBE_REQUEST_PROPS) + insert_one(constants.DATABASE['EVENTS'], doc) return HttpResponse('OK', status=200) + +@wrap_error_handler def unsubscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) - # validation id = body['subscription_id'] - if not id: - return HttpResponse('Request is invalid.', status=400) - res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True, }}) + + res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True}}) if res: return HttpResponse('OK', status=200) else: - return HttpResponse('Subscription id not found.', status=400) + raise BadRequestException('Subscription id not found.') diff --git a/start.py b/start.py new file mode 100644 index 0000000..a50e45f --- /dev/null +++ b/start.py @@ -0,0 +1,15 @@ +from apps.ticketscraping.scraping import start +from datetime import datetime +from threading import Thread +from multiprocessing import Process + +def run(): + print( + f"ticket scraping service started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") + # start scraping + start() + +if __name__ == '__main__': + p = Process(target=run) + p.start() + p.join() \ No newline at end of file diff --git a/startserver.sh b/startserver.sh new file mode 100755 index 0000000..931dacf --- /dev/null +++ b/startserver.sh @@ -0,0 +1,5 @@ +# start the ticket scraping service +python start.py & + +# start the API +python manage.py runserver --noreload \ No newline at end of file diff --git a/tmtracker/settings.py b/tmtracker/settings.py index 8c06096..cb09a96 100644 --- a/tmtracker/settings.py +++ b/tmtracker/settings.py @@ -32,7 +32,7 @@ 'django.contrib.contenttypes', # 'django.contrib.messages', 'django.contrib.staticfiles', - 'apps.startup', + # 'apps.startup', 'apps.trackerapi', ] diff --git a/utils.py b/utils.py index 4f1ced6..70dd09f 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,4 @@ -from pymongo import MongoClient +from pymongo.mongo_client import MongoClient from secret import CONN_SRV client = MongoClient(CONN_SRV)