diff --git a/apps/startup/apps.py b/apps/startup/apps.py index da34cfa..5de7b92 100644 --- a/apps/startup/apps.py +++ b/apps/startup/apps.py @@ -1,7 +1,35 @@ from django.apps import AppConfig -from ..ticketscraping.scraping import start from datetime import datetime from threading import Thread +from multiprocessing import Process + + +def run_prepare(): + # import module inside the child process to prevent execution in the parent process + print( + f"ticket scraping service started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") + + # start sender socket + from apps.ticketscraping.schedulers.async_tasks_scheduler import async_tasks_scheduler + conn_thread = Thread(target=async_tasks_scheduler.connect) + conn_thread.start() + # wait for async tasks handler to connect + conn_thread.join() + + # start itself (scraping) + from apps.ticketscraping.scraping import start + start() + + +def run(): + # starter + p = Process(target=run_prepare, daemon=True) + p.start() + # start receiver socket + from apps.ticketscraping.connection.asyn_tasks_receiver import run + conn_process = Process(target=run) + conn_process.start() + conn_process.join() class MyAppConfig(AppConfig): @@ -9,6 +37,4 @@ class MyAppConfig(AppConfig): verbose_name = "start tmtracker" def ready(self): - print( - f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") - Thread(target=start).start() + run() diff --git a/apps/storage/base.py b/apps/storage/base.py index 24bab63..abd6eb4 100644 --- a/apps/storage/base.py +++ b/apps/storage/base.py @@ -30,12 +30,18 @@ def find_one_and_update__(coll: collection.Collection, filter: dict, update=dict def find_one_and_delete__(coll: collection.Collection, filter: dict): return coll.find_one_and_delete(filter) -def find_one__(coll: collection.Collection, filter: dict, projection): - return coll.find_one(filter=filter, projection=projection) +def find_one__(coll: collection.Collection, filter: dict, projection, **kwargs): + return coll.find_one(filter=filter, projection=projection, **kwargs) def find_many__(coll: collection.Collection, filter: dict, projection, **kwargs): return coll.find(filter=filter, projection=projection, **kwargs) +def count_docs__(coll: collection.Collection, filter: dict): + return coll.count_documents(filter=filter) + +def estimated_count_docs__(coll: collection.Collection): + return coll.estimated_document_count() + def watch__(coll: collection.Collection, **kwargs): return coll.watch(**kwargs) diff --git a/apps/storage/query.py b/apps/storage/query.py new file mode 100644 index 0000000..96f18a7 --- /dev/null +++ b/apps/storage/query.py @@ -0,0 +1,16 @@ +from .storage import * +import pymongo + +# find the max value in a collection +def find_max(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.DESCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +# find the min value in a collection +def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_many(collection_name, filter, db_name=db_name, sort=sort_seq) diff --git a/apps/storage/storage.py b/apps/storage/storage.py index ef1d48d..f9c3256 100644 --- a/apps/storage/storage.py +++ b/apps/storage/storage.py @@ -51,10 +51,10 @@ def find_one_and_delete(collection_name, filter: dict, db_name="tickets"): return find_one_and_delete__(coll, filter) # find one -def find_one(collection_name, filter: dict, projection=None, db_name="tickets"): +def find_one(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): db = get_db_handle(db_name) coll = db[collection_name] - return find_one__(coll, filter, projection) + return find_one__(coll, filter, projection, **kwargs) # find many def find_many(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): @@ -62,6 +62,18 @@ def find_many(collection_name, filter: dict, projection=None, db_name="tickets", coll = db[collection_name] return list(find_many__(coll, filter, projection, **kwargs)) +# count with filter +def count_docs(collection_name, filter: dict, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return count_docs__(coll, filter) + +# count all docs in a collection +def estimated_count_docs(collection_name, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return estimated_count_docs__(coll) + # watch changes def watch(collection_name, db_name="tickets", **kwargs): db = get_db_handle(db_name) diff --git a/apps/ticketscraping/connection/asyn_tasks_receiver.py b/apps/ticketscraping/connection/asyn_tasks_receiver.py new file mode 100644 index 0000000..bc5efe1 --- /dev/null +++ b/apps/ticketscraping/connection/asyn_tasks_receiver.py @@ -0,0 +1,35 @@ +# start sockets +from threading import Thread +from multiprocessing import Process +from apps.ticketscraping.connection.receiver_process import ReceiverProcess +from apps.ticketscraping.constants import SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT + + +def run_prepare(): + # start receiver socket + from apps.ticketscraping.connection.mail_receiver import run + conn_process = Process(target=run, daemon=True) + conn_process.start() + + # start sender socket + from apps.ticketscraping.schedulers.mail_scheduler import mail_scheduler + conn_thread = Thread(target=mail_scheduler.connect) + conn_thread.start() + # wait for mailer to connect + conn_thread.join() + + # start itself + from apps.ticketscraping.tasks.asynchronous import run_async_tasks + receiver = ReceiverProcess(run_async_tasks, SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT) + receiver.connect() + receiver.serve_forever() + + +def run(): + # starter + p = Process(target=run_prepare) + p.start() + + +if __name__ == '__main__': + run() diff --git a/apps/ticketscraping/connection/mail_receiver.py b/apps/ticketscraping/connection/mail_receiver.py new file mode 100644 index 0000000..6ea71be --- /dev/null +++ b/apps/ticketscraping/connection/mail_receiver.py @@ -0,0 +1,13 @@ +from apps.ticketscraping.connection.receiver_process import ReceiverProcess +# from ..tasks.asynchronous import run_async_tasks +from apps.ticketscraping.constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT + +def run(): + # start itself + receiver = ReceiverProcess(lambda x: print( + x), SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) + receiver.connect() + receiver.serve_forever() + +if __name__ == '__main__': + run() diff --git a/apps/ticketscraping/connection/receiver.py b/apps/ticketscraping/connection/receiver.py new file mode 100644 index 0000000..2216267 --- /dev/null +++ b/apps/ticketscraping/connection/receiver.py @@ -0,0 +1,24 @@ +from multiprocessing.connection import Client +from threading import Semaphore + +class Receiver: + def __init__(self, hostname: str, port: int): + self.lock = Semaphore(1) + self.hostname = hostname + self.port = port + self.conn = None + + def connect(self): + self.conn = Client(address=(self.hostname, self.port,)) + + def recv(self): + if self.conn is None: + raise Exception('connection is not established') + self.lock.acquire() + res = self.conn.recv() + self.lock.release() + return res + + def __del__(self): + if self.conn is not None: self.conn.close() + diff --git a/apps/ticketscraping/connection/receiver_process.py b/apps/ticketscraping/connection/receiver_process.py new file mode 100644 index 0000000..dc2bf08 --- /dev/null +++ b/apps/ticketscraping/connection/receiver_process.py @@ -0,0 +1,11 @@ +from .receiver import Receiver + +class ReceiverProcess(Receiver): + def __init__(self, action, hostname: str, port: int): + super().__init__(hostname, port) + self.action = action + + def serve_forever(self): + while True: + res = self.recv() + self.action(*res) diff --git a/apps/ticketscraping/connection/sender.py b/apps/ticketscraping/connection/sender.py new file mode 100644 index 0000000..aa15e5e --- /dev/null +++ b/apps/ticketscraping/connection/sender.py @@ -0,0 +1,26 @@ +from multiprocessing.connection import Listener +from threading import Semaphore + +class Sender: + def __init__(self, hostname: str, port: int): + self.lock = Semaphore(1) + self.hostname = hostname + self.port = port + self.conn = None + + def connect(self): + listener = Listener(address=(self.hostname, self.port)) + self.conn = listener.accept() + print("conn accepted ", self.port) + + def send(self, *args): + if self.conn is None: + raise Exception('connection is not established') + self.lock.acquire() + self.conn.send(args) + self.lock.release() + return True + + def __del__(self): + if self.conn is not None: + self.conn.close() diff --git a/apps/ticketscraping/constants.py b/apps/ticketscraping/constants.py index 87d60d7..116a20f 100644 --- a/apps/ticketscraping/constants.py +++ b/apps/ticketscraping/constants.py @@ -1,5 +1,10 @@ from uuid import uuid4 +# services - async action handlers +ASYNC_TASKS_RECEIVER_PORT = 8100 +MAIL_RECEIVER_PORT = 8200 +SERVICE_LOCALHOST = 'localhost' + ANTIBOT_JS_CODE_URL = "https://epsf.ticketmaster.com/eps-d" TOKEN_INTERROGATION_URL = "https://epsf.ticketmaster.com/eps-d?d=www.ticketmaster.com" @@ -17,16 +22,39 @@ def get_top_picks_url( "BEST_AVAILABLE_SEATS": "best-available-seats", "BEST_HISTORY_SEATS": "best-history-seats" } + +SUBSCRIBE_REQUEST_PROPS = { + 'NAME': 'name', + 'TARGET_PRICE': 'target_price', + 'TOLERANCE': 'tolerance', + 'TICKET_NUM': 'ticket_num', + 'TM_EVENT_ID': 'tm_event_id' +} + +def filter_obj_from_attrs(obj, atts: dict[str,str]): + res = {} + for key in atts.values(): + if key in obj: + res[key] = obj[key] + return res + + +# metric thresholds +MINIMUM_HISTORY_DATA = 3 +PERCENT_OF_CHANGE = 0.5 +PERCENTILE_HISTORY_PRICES = 0.25 +ALERT_SEATS_MAX_COUNT = 3 + def get_top_picks_header(): return { **BASIC_REQ_HEADER, "tmps-correlation-id": str(uuid4()) } -def get_top_picks_query_params(qty, priceInterval): return { +def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): return { 'show': 'places maxQuantity sections', 'mode': 'primary:ppsectionrow resale:ga_areas platinum:all', 'qty': qty, - 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{priceInterval[0]}),lte(@,{priceInterval[1]}))))", + 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{target_price - tolerance}),lte(@,{target_price + tolerance}))))", 'includeStandard': 'true', 'includeResale': 'true', 'includePlatinumInventoryType': 'false', diff --git a/apps/ticketscraping/prepare_reese84token.py b/apps/ticketscraping/prepare_reese84token.py index 2fbde23..0b23f2a 100644 --- a/apps/ticketscraping/prepare_reese84token.py +++ b/apps/ticketscraping/prepare_reese84token.py @@ -6,7 +6,7 @@ from . import constants -def getReese84Token(): +def getReese84Token()->tuple[str, int]: def readFileContentToString(filename): f = open(filename, 'r') content = f.read() @@ -19,7 +19,7 @@ def readFileContentToString(filename): # trim the code to the function that is only used match_obj = re.search(constants.FN_MATCHING_REGEX, antibot_js_code_full) if not match_obj: - return None + raise Exception('reese84 manufacture fails') start, end = match_obj.span() antibot_js_code_trim = antibot_js_code_full[start:end] @@ -51,4 +51,5 @@ def readFileContentToString(filename): # invoke the get token api to get the reese84 token token_json_res = requests.post( constants.TOKEN_INTERROGATION_URL, headers=constants.BASIC_REQ_HEADER, json=token) - return token_json_res.json() + json_obj = token_json_res.json() + return json_obj['token'], json_obj['renewInSec'] diff --git a/apps/ticketscraping/schedulers/async_tasks_scheduler.py b/apps/ticketscraping/schedulers/async_tasks_scheduler.py new file mode 100644 index 0000000..3c4ec81 --- /dev/null +++ b/apps/ticketscraping/schedulers/async_tasks_scheduler.py @@ -0,0 +1,4 @@ +from ..connection.sender import Sender +from ..constants import SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT + +async_tasks_scheduler = Sender(SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT) diff --git a/apps/ticketscraping/schedulers/mail_scheduler.py b/apps/ticketscraping/schedulers/mail_scheduler.py new file mode 100644 index 0000000..6b3b954 --- /dev/null +++ b/apps/ticketscraping/schedulers/mail_scheduler.py @@ -0,0 +1,5 @@ +from ..connection.sender import Sender +from ..constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT + + +mail_scheduler = Sender(SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) diff --git a/apps/ticketscraping/scraping.py b/apps/ticketscraping/scraping.py index 2a7efed..2e3d3d4 100644 --- a/apps/ticketscraping/scraping.py +++ b/apps/ticketscraping/scraping.py @@ -14,7 +14,8 @@ class Reese84TokenUpdating(): def __init__(self): self.is_running = False - self.reese84_token = {} + self.reese84_token = '' + self.reese84_renewInSec = 0 self.token_semaphore = Semaphore(0) self.scheduler = sched.scheduler(time.time, time.sleep) @@ -22,9 +23,9 @@ def initialize_reese84_token(self): """ This method should not be called directly. """ - self.reese84_token = getReese84Token() + self.reese84_token, self.reese84_renewInSec = getReese84Token() self.token_semaphore.release() # produce a new token - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def renew_reese84_token(self): @@ -35,7 +36,7 @@ def renew_reese84_token(self): self.token_semaphore.acquire() # invalidate a token self.reese84_token = getReese84Token() self.token_semaphore.release() - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def start(self): @@ -48,14 +49,15 @@ def start(self): class TicketScraping(threading.Thread): - def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id, num_seats=2, price_range=(0, 200)): + def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int): threading.Thread.__init__(self) self.is_running = False self.is_stopping = False self.event_id = event_id self.subscribe_id = subscribe_id self.num_seats = num_seats - self.price_range = price_range + self.target_price = target_price + self.tolerance = tolerance self.token_gen = token_generator self.scheduler = sched.scheduler(time.time, time.sleep) self.initialDelay = random.randint( @@ -83,17 +85,17 @@ def ticket_scraping(self): # scrape the top-picks from ticketmaster top_picks_url = constants.get_top_picks_url(self.event_id) top_picks_q_params = constants.get_top_picks_query_params( - self.num_seats, self.price_range) + self.num_seats, self.target_price, self.tolerance) top_picks_header = constants.get_top_picks_header() res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, - cookies=dict(reese84=self.token_gen.reese84_token['token'])) - # print(res.json()) + cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore # prune and format the received picks picks_obj = format_seats(res.json(), self.subscribe_id) # periodic task: update collections best_available_seats and best_history_seats - run_periodic_task(picks_obj, self.subscribe_id) + # and automatically spawn async tasks + run_periodic_task(picks_obj, self.subscribe_id, self.target_price) print("Got the ticket info from TM. /", res.status_code) self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL, @@ -102,8 +104,8 @@ def ticket_scraping(self): def get_id(self): # returns id of the respective thread if hasattr(self, '_thread_id'): - return self._thread_id - for id, thread in threading._active.items(): + return self._thread_id # type: ignore + for id, thread in threading._active.items(): # type: ignore if thread is self: return id @@ -135,7 +137,7 @@ def start(): '$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]}) for evt in events: ticket_scraping = TicketScraping( - reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"]) + reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping for scraping_thread in scraping_list.values(): @@ -156,7 +158,7 @@ def start(): # spawn a thread to do scraping operations full_doc = change['fullDocument'] ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() @@ -177,7 +179,7 @@ def start(): # resume scraping if currently paused if doc_id not in scraping_list: ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py new file mode 100644 index 0000000..77ce4d2 --- /dev/null +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -0,0 +1,43 @@ +from .. import constants +from ..models.pick import Pick +import typing, threading +from .util.decorators import StorageInsertionOrder, wrap_fn_return +from .strategies.quarters_seats import QuartersSeats +from ..schedulers.mail_scheduler import mail_scheduler + + +def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int): + picks_size = len(list(picks)) + if picks_size == 0: return + + store = StorageInsertionOrder(size=picks_size) + def fn(arg: tuple[int, Pick]): + idx, pick = arg + thread = threading.Thread(target=wrap_fn_return(run_async_task, store.set, idx), args=(pick, scraping_id, target_price)) + thread.start() + return thread + + threads = list(map(fn, enumerate(picks))) + + for thread in threads: thread.join() + + # filter out new seats that do not need alerts + store.filter() + # better seats first + store.sort() + # get seats used in alerting + alert_seats = store.sublist(0, min(store.size, constants.ALERT_SEATS_MAX_COUNT)) + # get the alert information + alert_contents = list(map(lambda qs: qs.get_alert_content(), alert_seats)) # type: ignore + # send the alert to user + mail_scheduler.send(alert_contents) + + +def run_async_task(pick: Pick, scraping_id: str, target_price: int): + # Alert the user based on alert conditions + qs = QuartersSeats(pick, scraping_id, target_price) + if not qs.shouldAlert(): + return None + + # success + return qs diff --git a/apps/ticketscraping/tasks/periodic.py b/apps/ticketscraping/tasks/periodic.py index d0ce8ac..74cbe3f 100644 --- a/apps/ticketscraping/tasks/periodic.py +++ b/apps/ticketscraping/tasks/periodic.py @@ -1,6 +1,7 @@ from ...storage.storage import find_many, insert_many, delete_many from ...ticketscraping import constants from ..models.pick import Pick +from ..schedulers.async_tasks_scheduler import async_tasks_scheduler def generate_picks_set_from_picks(picks): def __helper(pick: dict): @@ -38,8 +39,7 @@ def insert_history_seats(seats: set[Pick]): return insert_many(constants.DATABASE['BEST_HISTORY_SEATS'], list(map(lambda seat: vars(seat), seats))) - -def run_periodic_task(picks: dict, scraping_id: str): +def run_periodic_task(picks, scraping_id: str, target_price: int): # B the list of new best available seats new_best_avail = generate_picks_set_from_picks(picks) # A be the list of current best available seats @@ -65,7 +65,5 @@ def run_periodic_task(picks: dict, scraping_id: str): # Save C to best_history_seats. insert_history_seats(overwritten_seats) - # TODO # Use D to invoke a handler to analyze them against the best_history_seats asynchronously. - - pass \ No newline at end of file + async_tasks_scheduler.send(new_seats, scraping_id, target_price) diff --git a/apps/ticketscraping/tasks/strategies/quarters_seats.py b/apps/ticketscraping/tasks/strategies/quarters_seats.py new file mode 100644 index 0000000..2e35131 --- /dev/null +++ b/apps/ticketscraping/tasks/strategies/quarters_seats.py @@ -0,0 +1,178 @@ +import pymongo +from ....storage.storage import count_docs, find_many +from ....storage.query import find_max, find_min, find_many_ascending_order +from ....ticketscraping import constants +from ...models.pick import Pick +from ..util.math import percentile + + +class QuartersSeats(): + top_better_history_seats_sort = [ + ('rank', pymongo.DESCENDING), ('price', pymongo.ASCENDING)] + + def __init__(self, pick: Pick, scraping_id: str, target_price: int): + self._pick = pick + self._scraping_id = scraping_id + self.target_price = target_price + self.percentile = 1.0 + + def __lt__(self, other): + # smaller the percentile, better the pick + return self.percentile > other.percentile + + def __eq__(self, other): + return self.percentile == other.percentile + + @property + def pick(self): + return self._pick + + @property + def scraping_id(self): + return self._scraping_id + + def get_alert_content(self): + # alert user + # Find the exact same seat based on(sec, row?, seat?) + same_seats = self.get_exact_same_seats() + # rank = self._num_before + 1 + # top best history = get_top_better_history_seats() + # notify user with info + return '' + pass + + def get_top_better_history_seats(self): + return self.find_better_history_seats(sort=self.top_better_history_seats_sort) + + def shouldAlert(self): + try: + # Find price match + self.target_price_metric() + # Find enough history seats data + percentile = self.percentile_metric() + # Find the % of change + percent_change = self.percent_of_change_metric() + # Find percentile of seats in quarters + self.quarter_percentile_metric() + except Exception as ex: + print(ex) + return False + + # success + print(f"percent change out of max-min: {percent_change*100}") + print(f"all history seats percentile: {percentile*100}") + print( + f"new seat - price: {self.pick.price} rank: {self.pick.quality} section: {self.pick.section} row: {self.pick.row}") + print(f"quarters history seats percentile: {self.percentile*100}") + return True + + + def quarter_percentile_metric(self): + if self.get_percentile() > constants.PERCENTILE_HISTORY_PRICES: + raise Exception('the seat is not recommended') + + def get_percentile(self): + self._num_before = self.count_better_history_seats() + self._num_total = self.count_quarters_history_seats() + self.percentile = percentile(self._num_before, self._num_total) + return self.percentile + + def count_quarters_history_seats(self): + filter_obj = self.__get_quarters_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def __get_quarters_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "$or": [ + { + "price": {"$lte": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + }, + { + "price": {"$gt": self._pick.price}, + "quality": {"$lt": self._pick.quality}, + } + ] + } + + def __get_better_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "$or": [ + { + "price": {"$lt": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + }, + { + "price": {"$lte": self._pick.price}, + "quality": {"$gt": self._pick.quality}, + } + ] + } + + def find_better_history_seats(self, **kwargs): + limit = kwargs.get('limit') + sort_seq = kwargs.get('sort') + filter_obj = self.__get_better_history_seats_filter__() + return find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=sort_seq, limit=limit) + + def count_better_history_seats(self): + filter_obj = self.__get_better_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def target_price_metric(self): + # exceed target price - abort + if self.pick.price > self.target_price: + raise Exception('price of the seat is not low enough') + + + def percent_of_change_metric(self) -> float: + # Find the % of change + max_seat = find_max(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + min_seat = find_min(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + max_price = 0 if type(max_seat) is not dict else max_seat.get('price', 0) + min_price = 0 if type(min_seat) is not dict else min_seat.get('price', 0) + + # min and max are identical - abort + if max_price == min_price: + raise Exception('min and max prices are identical') + + percent_change = (self.pick.price - min_price) / (max_price - min_price) + + # # price of change exceeds the metric value - abort + # if percent_change > constants.PERCENT_OF_CHANGE: + # raise Exception( + # f'price of change ({percent_change}) exceeds the metric value') + + return percent_change + + + def percentile_metric(self) -> float: + rank = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "price": {"$lte": self.pick.price}}) + total_count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": self.scraping_id}) + + # no history seats data - abort + if total_count < constants.MINIMUM_HISTORY_DATA: + raise Exception('no enough history seats data (count < 3)') + + percentile = rank / total_count + + # # percentile of history prices exceeds the metric value - abort + # if percentile > constants.PERCENTILE_HISTORY_PRICES: + # raise Exception( + # 'percentile of history prices ({percentile}) exceeds the metric value') + + return percentile + + + def get_exact_same_seats(self): + return find_many_ascending_order(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "section": self.pick.section, + "row": self.pick.row, "seat_columns": self.pick.seat_columns}, + 'last_modified') diff --git a/apps/ticketscraping/tasks/util/decorators.py b/apps/ticketscraping/tasks/util/decorators.py new file mode 100644 index 0000000..aa2929c --- /dev/null +++ b/apps/ticketscraping/tasks/util/decorators.py @@ -0,0 +1,36 @@ +class StorageInsertionOrder: + def __init__(self, size=0): + self.store = [0] * size + + def __iter__(self): + return iter(self.store) + + @property + def size(self): + return len(self.store) + + def add(self, item): + self.store.append(item) + + def get(self, index: int): + return self.store[index] + + def set(self, index: int, item): + self.store[index] = item + + def sort(self): + sorted(self.store) + + def filter(self): + self.store = list(filter(lambda item: item is not None, self.store)) + + def sublist(self, from_idx: int, to_idx: int): + return self.store[from_idx:to_idx] + + +def wrap_fn_return(fn, storing_fn, index): + def inner_fn(*args, **kwargs): + res = fn(*args, **kwargs) + storing_fn(index, res) + return res + return inner_fn \ No newline at end of file diff --git a/apps/ticketscraping/tasks/util/math.py b/apps/ticketscraping/tasks/util/math.py new file mode 100644 index 0000000..a19642c --- /dev/null +++ b/apps/ticketscraping/tasks/util/math.py @@ -0,0 +1,9 @@ +def percentile(num_before: int, total: int) -> float: + denominator = total + if denominator == 0: + denominator = 1 + return num_before / denominator + +def percentileInBetween(num_before: int, num_after: int)->float: + denominator = num_before + num_after + 1 + return percentile(num_before, denominator) diff --git a/apps/ticketscraping/tasks/util/scheduler.py b/apps/ticketscraping/tasks/util/scheduler.py new file mode 100644 index 0000000..0d171eb --- /dev/null +++ b/apps/ticketscraping/tasks/util/scheduler.py @@ -0,0 +1,23 @@ +import time +from sched import scheduler +from ...constants import AWAKE_SCHEDULER_INTERVAL + +class Scheduler: + def __init__(self, name: str, action): + self.name = name + self.scheduler = scheduler(time.time, time.sleep) + self.awake_scheduler = scheduler(time.time, time.sleep) + self.action = action + + def enter(self, *args, **kwargs): + return self.scheduler.enter(0, 1, self.action, args, kwargs) + + def iter_awake_scheduler(self): + # awake the main scheduler + self.scheduler.run() # blocking + self.awake_scheduler.enter( + AWAKE_SCHEDULER_INTERVAL, 1, self.iter_awake_scheduler) + + def run(self): + self.iter_awake_scheduler() + self.awake_scheduler.run() diff --git a/apps/trackerapi/error_handler.py b/apps/trackerapi/error_handler.py new file mode 100644 index 0000000..b278909 --- /dev/null +++ b/apps/trackerapi/error_handler.py @@ -0,0 +1,18 @@ +from django.http import JsonResponse + +# custom exceptions +class BadRequestException(Exception): + pass + +# error handler decorator +def wrap_error_handler(fn): + def inner_fn(*args, **kwargs): + try: + return fn(*args, **kwargs) + except BadRequestException as ex: + # bad request + return JsonResponse({"error": str(ex)}, status=400) + except Exception as ex: + # internal server error + return JsonResponse({"error": "something went wrong."}, status=500) + return inner_fn \ No newline at end of file diff --git a/apps/trackerapi/views.py b/apps/trackerapi/views.py index 635d7d2..39a98c9 100644 --- a/apps/trackerapi/views.py +++ b/apps/trackerapi/views.py @@ -1,36 +1,40 @@ from django.shortcuts import render from django.http import HttpResponse, HttpRequest from ..storage.storage import insert_one, find_one_and_update +from .error_handler import BadRequestException, wrap_error_handler from ..ticketscraping import constants import json # Create your views here. +@wrap_error_handler def subscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) # validation - for key in ['name', 'price_range', 'ticket_num', 'tm_event_id']: + for key in constants.SUBSCRIBE_REQUEST_PROPS.values(): if key not in body: - return HttpResponse('Request is invalid.', status=400) - doc = { - "name": body["name"], - "price_range": body["price_range"], - "ticket_num": body["ticket_num"], - "tm_event_id": body["tm_event_id"] - } + raise BadRequestException('Request is invalid.') + # validation + target_price = body["target_price"] + tolerance = body["tolerance"] + if target_price - tolerance < 0: + raise BadRequestException('Lowest price cannot be negative.') + + doc = constants.filter_obj_from_attrs(body, constants.SUBSCRIBE_REQUEST_PROPS) + insert_one(constants.DATABASE['EVENTS'], doc) return HttpResponse('OK', status=200) + +@wrap_error_handler def unsubscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) - # validation id = body['subscription_id'] - if not id: - return HttpResponse('Request is invalid.', status=400) - res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True, }}) + + res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True}}) if res: return HttpResponse('OK', status=200) else: - return HttpResponse('Subscription id not found.', status=400) + raise BadRequestException('Subscription id not found.') diff --git a/utils.py b/utils.py index 4f1ced6..70dd09f 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,4 @@ -from pymongo import MongoClient +from pymongo.mongo_client import MongoClient from secret import CONN_SRV client = MongoClient(CONN_SRV)