diff --git a/.gitignore b/.gitignore index 4fd8978..ab5d546 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /**/js/node_modules /**/js/antibot-simulation.js /**/tmp +/**/.DS_Store .vscode docker-compose.yml entrypoint.sh diff --git a/apps/pushnotification/constants.py b/apps/pushnotification/constants.py new file mode 100644 index 0000000..4623b0c --- /dev/null +++ b/apps/pushnotification/constants.py @@ -0,0 +1,7 @@ +from ...secret import MAILER_PW + +port = 465 # For starttls +smtp_server = "smtp.gmail.com" +sender_email = "noreply.ticketmasterbestseat@gmail.com" +subject = "Message from Ticketmaster Ticket Tracker" +app_password = MAILER_PW diff --git a/apps/pushnotification/msg_formatter.py b/apps/pushnotification/msg_formatter.py new file mode 100644 index 0000000..527795c --- /dev/null +++ b/apps/pushnotification/msg_formatter.py @@ -0,0 +1,100 @@ +from datetime import datetime +from ..ticketscraping.models.pick import Pick + +def price_formatter(price): + price_str = '' + if type(price) is float or int: + price_str = "$" + "{:.2f}".format(price) + elif type(price) is str: + price_str = price + return price_str + +def decimal_to_percent(num: float): + return "{:.2f}".format(num*100) + "%" + +def format_date(date: datetime): + return date.isoformat() + +def default_formatter(s: str): + return s + +def format_seat_columns(cols): + if type(cols) is str: + return cols + elif type(cols) is list: + return "(" + ",".join(cols) + ")" + return '-' + +def apply_format(s, formatter)->str: + return formatter(s) + +def apply(values: list, formatters: list, delimiter="\t"): + if len(values) != len(formatters): + raise Exception('values and formatters must have the same length') + s = [] + for i in range(len(values)): + s.append(apply_format(values[i], formatters[i])) + return delimiter.join(s) + +def format_full_seat(seat: dict, delimiter="\t"): + price = seat.get("price", "n/a") + section = seat.get("section", "n/a") + row = seat.get("row", "n/a") + seat_columns = seat.get("seat_columns", "n/a") + last_modified = seat.get("last_modified", "n/a") + return apply( + [price, section, row, seat_columns, last_modified], + [price_formatter, default_formatter, default_formatter, + format_seat_columns, format_date], + delimiter) + +def format_price_only_seat(seat: dict, delimiter="\t"): + price = seat.get("price", "n/a") + last_modified = seat.get("last_modified", "n/a") + return apply([price, last_modified], [price_formatter, format_date], delimiter) + +def format_seat(seat: dict, price_only=False, delimiter="\t"): + if price_only: + return format_price_only_seat(seat, delimiter) + else: + return format_full_seat(seat, delimiter) + +def format_seats(seats: list, price_only=False, delimiter="\t"): + return "\n".join([format_seat(seat, price_only, delimiter) for seat in seats]) + + +def format_entire_mail(pick: Pick, target_price: int, percentile: float, rank: int, num_total: int, top_history_seats: list, same_seats: list): + """ + structure of message: + 1. greetings + 2. attributes of new seats + 3. top 3 comparable history seats + 4. exact same seats if possible + 5. signature + """ + p1 = ( + f"Hi!" + ) + p2 = ( + f"Congratulations! Ticket tracker reminds you that your ticket subscription request with target price {price_formatter(target_price)} " + f"found better budget seats (price, section, row, seats) at ({format_full_seat(vars(pick), delimiter=', ')}). " + f"{decimal_to_percent(percentile)} of all comparable seats in the history are better than the newly found seats, that is, " + f"they rank no.{rank} out of {num_total} comparable seats in the history." + ) + p3 = ( + f"You can compare to history seats that are better than the newly found seats:" + f"{chr(10)}" + f"{format_seats(top_history_seats, price_only=False)}" + ) if len(top_history_seats) > 0 else "" + p4 = ( + f"The newly found seats have history prices:" + f"{chr(10)}" + f"{format_seats(same_seats, price_only=True)}" + ) if len(same_seats) > 0 else "" + p5 = ( + f"Bests," + f"{chr(10)}" + f"Ticketmaster Ticket Tracker" + ) + paras = list(filter(lambda p: len(p) > 0, [p1, p2, p3, p4, p5])) + return "\n\n".join(paras) diff --git a/apps/pushnotification/smtp.py b/apps/pushnotification/smtp.py new file mode 100644 index 0000000..3578339 --- /dev/null +++ b/apps/pushnotification/smtp.py @@ -0,0 +1,44 @@ +from smtplib import SMTP_SSL +from ssl import create_default_context +from email.message import EmailMessage +from . import constants + + +def init_server(): + context = create_default_context() + server = SMTP_SSL(constants.smtp_server, constants.port, context=context) + return server + + +def server_login(server: SMTP_SSL, password: str): + return server.login(constants.sender_email, password) + + +def server_send_email(server: SMTP_SSL, receiver_emails: list[str], message: str): + em = EmailMessage() + em['From'] = constants.sender_email + em['To'] = receiver_emails + em['subject'] = constants.subject + + em.set_content(message) + return server.sendmail(constants.sender_email, receiver_emails, em.as_string()) + + +def send_email(receiver_emails: list[str], messages: list[str]): + if len(messages) == 0: + return + # print(messages[0]) + try: + err = server_send_email(server, receiver_emails, messages[0]) + if err is not None: + raise Exception('could not send email to the receiver') + except Exception as ex: + print(ex) + + +server = init_server() + + +def auth_server(): + global server + server_login(server, constants.app_password) diff --git a/apps/startup/apps.py b/apps/startup/apps.py index da34cfa..87613c1 100644 --- a/apps/startup/apps.py +++ b/apps/startup/apps.py @@ -1,7 +1,34 @@ from django.apps import AppConfig -from ..ticketscraping.scraping import start from datetime import datetime from threading import Thread +from multiprocessing import Process + + +def run_prepare(): + # import module inside the child process to prevent execution in the parent process + print( + f"ticket scraping service started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") + + # start sender socket + from apps.ticketscraping.schedulers.async_tasks_scheduler import async_tasks_scheduler + conn_thread = Thread(target=async_tasks_scheduler.connect) + conn_thread.start() + # wait for async tasks handler to connect + conn_thread.join() + + # start itself (scraping) + from apps.ticketscraping.scraping import start + start() + + +def run(): + # starter + p = Process(target=run_prepare, daemon=True) + p.start() + # start receiver socket + from apps.ticketscraping.connection.asyn_tasks_receiver import run + conn_process = Process(target=run) + conn_process.start() class MyAppConfig(AppConfig): @@ -9,6 +36,4 @@ class MyAppConfig(AppConfig): verbose_name = "start tmtracker" def ready(self): - print( - f"server started at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}") - Thread(target=start).start() + run() diff --git a/apps/storage/base.py b/apps/storage/base.py index 24bab63..abd6eb4 100644 --- a/apps/storage/base.py +++ b/apps/storage/base.py @@ -30,12 +30,18 @@ def find_one_and_update__(coll: collection.Collection, filter: dict, update=dict def find_one_and_delete__(coll: collection.Collection, filter: dict): return coll.find_one_and_delete(filter) -def find_one__(coll: collection.Collection, filter: dict, projection): - return coll.find_one(filter=filter, projection=projection) +def find_one__(coll: collection.Collection, filter: dict, projection, **kwargs): + return coll.find_one(filter=filter, projection=projection, **kwargs) def find_many__(coll: collection.Collection, filter: dict, projection, **kwargs): return coll.find(filter=filter, projection=projection, **kwargs) +def count_docs__(coll: collection.Collection, filter: dict): + return coll.count_documents(filter=filter) + +def estimated_count_docs__(coll: collection.Collection): + return coll.estimated_document_count() + def watch__(coll: collection.Collection, **kwargs): return coll.watch(**kwargs) diff --git a/apps/storage/query.py b/apps/storage/query.py new file mode 100644 index 0000000..96f18a7 --- /dev/null +++ b/apps/storage/query.py @@ -0,0 +1,16 @@ +from .storage import * +import pymongo + +# find the max value in a collection +def find_max(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.DESCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +# find the min value in a collection +def find_min(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_one(collection_name, filter, db_name=db_name, sort=sort_seq) + +def find_many_ascending_order(collection_name, filter: dict, sort_key: str, db_name="tickets"): + sort_seq = [(sort_key, pymongo.ASCENDING)] + return find_many(collection_name, filter, db_name=db_name, sort=sort_seq) diff --git a/apps/storage/storage.py b/apps/storage/storage.py index ef1d48d..f9c3256 100644 --- a/apps/storage/storage.py +++ b/apps/storage/storage.py @@ -51,10 +51,10 @@ def find_one_and_delete(collection_name, filter: dict, db_name="tickets"): return find_one_and_delete__(coll, filter) # find one -def find_one(collection_name, filter: dict, projection=None, db_name="tickets"): +def find_one(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): db = get_db_handle(db_name) coll = db[collection_name] - return find_one__(coll, filter, projection) + return find_one__(coll, filter, projection, **kwargs) # find many def find_many(collection_name, filter: dict, projection=None, db_name="tickets", **kwargs): @@ -62,6 +62,18 @@ def find_many(collection_name, filter: dict, projection=None, db_name="tickets", coll = db[collection_name] return list(find_many__(coll, filter, projection, **kwargs)) +# count with filter +def count_docs(collection_name, filter: dict, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return count_docs__(coll, filter) + +# count all docs in a collection +def estimated_count_docs(collection_name, db_name="tickets"): + db = get_db_handle(db_name) + coll = db[collection_name] + return estimated_count_docs__(coll) + # watch changes def watch(collection_name, db_name="tickets", **kwargs): db = get_db_handle(db_name) diff --git a/apps/ticketscraping/connection/asyn_tasks_receiver.py b/apps/ticketscraping/connection/asyn_tasks_receiver.py new file mode 100644 index 0000000..bc5efe1 --- /dev/null +++ b/apps/ticketscraping/connection/asyn_tasks_receiver.py @@ -0,0 +1,35 @@ +# start sockets +from threading import Thread +from multiprocessing import Process +from apps.ticketscraping.connection.receiver_process import ReceiverProcess +from apps.ticketscraping.constants import SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT + + +def run_prepare(): + # start receiver socket + from apps.ticketscraping.connection.mail_receiver import run + conn_process = Process(target=run, daemon=True) + conn_process.start() + + # start sender socket + from apps.ticketscraping.schedulers.mail_scheduler import mail_scheduler + conn_thread = Thread(target=mail_scheduler.connect) + conn_thread.start() + # wait for mailer to connect + conn_thread.join() + + # start itself + from apps.ticketscraping.tasks.asynchronous import run_async_tasks + receiver = ReceiverProcess(run_async_tasks, SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT) + receiver.connect() + receiver.serve_forever() + + +def run(): + # starter + p = Process(target=run_prepare) + p.start() + + +if __name__ == '__main__': + run() diff --git a/apps/ticketscraping/connection/mail_receiver.py b/apps/ticketscraping/connection/mail_receiver.py new file mode 100644 index 0000000..d5fff51 --- /dev/null +++ b/apps/ticketscraping/connection/mail_receiver.py @@ -0,0 +1,13 @@ +from apps.ticketscraping.connection.receiver_process import ReceiverProcess +from apps.pushnotification.smtp import send_email, auth_server +from apps.ticketscraping.constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT + +def run(): + # start itself + auth_server() + receiver = ReceiverProcess(send_email, SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) + receiver.connect() + receiver.serve_forever() + +if __name__ == '__main__': + run() diff --git a/apps/ticketscraping/connection/receiver.py b/apps/ticketscraping/connection/receiver.py new file mode 100644 index 0000000..2216267 --- /dev/null +++ b/apps/ticketscraping/connection/receiver.py @@ -0,0 +1,24 @@ +from multiprocessing.connection import Client +from threading import Semaphore + +class Receiver: + def __init__(self, hostname: str, port: int): + self.lock = Semaphore(1) + self.hostname = hostname + self.port = port + self.conn = None + + def connect(self): + self.conn = Client(address=(self.hostname, self.port,)) + + def recv(self): + if self.conn is None: + raise Exception('connection is not established') + self.lock.acquire() + res = self.conn.recv() + self.lock.release() + return res + + def __del__(self): + if self.conn is not None: self.conn.close() + diff --git a/apps/ticketscraping/connection/receiver_process.py b/apps/ticketscraping/connection/receiver_process.py new file mode 100644 index 0000000..dc2bf08 --- /dev/null +++ b/apps/ticketscraping/connection/receiver_process.py @@ -0,0 +1,11 @@ +from .receiver import Receiver + +class ReceiverProcess(Receiver): + def __init__(self, action, hostname: str, port: int): + super().__init__(hostname, port) + self.action = action + + def serve_forever(self): + while True: + res = self.recv() + self.action(*res) diff --git a/apps/ticketscraping/connection/sender.py b/apps/ticketscraping/connection/sender.py new file mode 100644 index 0000000..aa15e5e --- /dev/null +++ b/apps/ticketscraping/connection/sender.py @@ -0,0 +1,26 @@ +from multiprocessing.connection import Listener +from threading import Semaphore + +class Sender: + def __init__(self, hostname: str, port: int): + self.lock = Semaphore(1) + self.hostname = hostname + self.port = port + self.conn = None + + def connect(self): + listener = Listener(address=(self.hostname, self.port)) + self.conn = listener.accept() + print("conn accepted ", self.port) + + def send(self, *args): + if self.conn is None: + raise Exception('connection is not established') + self.lock.acquire() + self.conn.send(args) + self.lock.release() + return True + + def __del__(self): + if self.conn is not None: + self.conn.close() diff --git a/apps/ticketscraping/constants.py b/apps/ticketscraping/constants.py index 87d60d7..42ad566 100644 --- a/apps/ticketscraping/constants.py +++ b/apps/ticketscraping/constants.py @@ -1,5 +1,10 @@ from uuid import uuid4 +# services - async action handlers +ASYNC_TASKS_RECEIVER_PORT = 8100 +MAIL_RECEIVER_PORT = 8200 +SERVICE_LOCALHOST = 'localhost' + ANTIBOT_JS_CODE_URL = "https://epsf.ticketmaster.com/eps-d" TOKEN_INTERROGATION_URL = "https://epsf.ticketmaster.com/eps-d?d=www.ticketmaster.com" @@ -17,16 +22,46 @@ def get_top_picks_url( "BEST_AVAILABLE_SEATS": "best-available-seats", "BEST_HISTORY_SEATS": "best-history-seats" } + +SUBSCRIBE_REQUEST_PROPS = { + 'NAME': 'name', + 'CLIENT_NAME': 'client_name', + 'CLIENT_EMAILS': 'client_emails', + 'TARGET_PRICE': 'target_price', + 'TOLERANCE': 'tolerance', + 'TICKET_NUM': 'ticket_num', + 'TM_EVENT_ID': 'tm_event_id' +} + +def filter_obj_from_attrs(obj, atts: dict[str,str]): + res = {} + for key in atts.values(): + if key in obj: + res[key] = obj[key] + if len(res) != len(atts): + raise Exception('lack of attributes') + return res + + +# metric thresholds +MINIMUM_HISTORY_DATA = 3 +PERCENT_OF_CHANGE = 0.5 +PERCENTILE_HISTORY_PRICES = 0.25 + +# alert content constants +ALERT_SEATS_MAX_COUNT = 3 +TOP_COMPARED_HISTORY_SEATS = 3 + def get_top_picks_header(): return { **BASIC_REQ_HEADER, "tmps-correlation-id": str(uuid4()) } -def get_top_picks_query_params(qty, priceInterval): return { +def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): return { 'show': 'places maxQuantity sections', 'mode': 'primary:ppsectionrow resale:ga_areas platinum:all', 'qty': qty, - 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{priceInterval[0]}),lte(@,{priceInterval[1]}))))", + 'q': f"and(not(\'accessible\'),any(listprices,$and(gte(@,{target_price - tolerance}),lte(@,{target_price + tolerance}))))", 'includeStandard': 'true', 'includeResale': 'true', 'includePlatinumInventoryType': 'false', @@ -43,7 +78,7 @@ def get_top_picks_query_params(qty, priceInterval): return { TOKEN_RENEW_PRIORITY = 1 TICKET_SCRAPING_PRIORITY = 3 TICKET_SCRAPING_INTERVAL = 60 -TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 5 +TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 10 INJECTOR_LOCATION = "js/injector.js" INJECTOR_HEADER_LOCATION = "js/injector-header.js" diff --git a/apps/ticketscraping/prepare_reese84token.py b/apps/ticketscraping/prepare_reese84token.py index 2fbde23..0b23f2a 100644 --- a/apps/ticketscraping/prepare_reese84token.py +++ b/apps/ticketscraping/prepare_reese84token.py @@ -6,7 +6,7 @@ from . import constants -def getReese84Token(): +def getReese84Token()->tuple[str, int]: def readFileContentToString(filename): f = open(filename, 'r') content = f.read() @@ -19,7 +19,7 @@ def readFileContentToString(filename): # trim the code to the function that is only used match_obj = re.search(constants.FN_MATCHING_REGEX, antibot_js_code_full) if not match_obj: - return None + raise Exception('reese84 manufacture fails') start, end = match_obj.span() antibot_js_code_trim = antibot_js_code_full[start:end] @@ -51,4 +51,5 @@ def readFileContentToString(filename): # invoke the get token api to get the reese84 token token_json_res = requests.post( constants.TOKEN_INTERROGATION_URL, headers=constants.BASIC_REQ_HEADER, json=token) - return token_json_res.json() + json_obj = token_json_res.json() + return json_obj['token'], json_obj['renewInSec'] diff --git a/apps/ticketscraping/schedulers/async_tasks_scheduler.py b/apps/ticketscraping/schedulers/async_tasks_scheduler.py new file mode 100644 index 0000000..3c4ec81 --- /dev/null +++ b/apps/ticketscraping/schedulers/async_tasks_scheduler.py @@ -0,0 +1,4 @@ +from ..connection.sender import Sender +from ..constants import SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT + +async_tasks_scheduler = Sender(SERVICE_LOCALHOST, ASYNC_TASKS_RECEIVER_PORT) diff --git a/apps/ticketscraping/schedulers/mail_scheduler.py b/apps/ticketscraping/schedulers/mail_scheduler.py new file mode 100644 index 0000000..6b3b954 --- /dev/null +++ b/apps/ticketscraping/schedulers/mail_scheduler.py @@ -0,0 +1,5 @@ +from ..connection.sender import Sender +from ..constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT + + +mail_scheduler = Sender(SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) diff --git a/apps/ticketscraping/scraping.py b/apps/ticketscraping/scraping.py index 2a7efed..f72d020 100644 --- a/apps/ticketscraping/scraping.py +++ b/apps/ticketscraping/scraping.py @@ -10,21 +10,36 @@ from ..storage.storage import * from .seat_analysis import format_seats from .tasks.periodic import run_periodic_task +import traceback class Reese84TokenUpdating(): def __init__(self): self.is_running = False - self.reese84_token = {} + self._reese84_token = '' + self.reese84_renewInSec = 0 + self.token_access_semaphore = Semaphore(1) # one can access at a time self.token_semaphore = Semaphore(0) self.scheduler = sched.scheduler(time.time, time.sleep) + @property + def reese84_token(self): + self.token_semaphore.acquire() + self.token_access_semaphore.acquire() + token = self._reese84_token + self.token_semaphore.release() + self.token_access_semaphore.release() + + return token + def initialize_reese84_token(self): """ This method should not be called directly. """ - self.reese84_token = getReese84Token() - self.token_semaphore.release() # produce a new token - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.token_access_semaphore.acquire() + self._reese84_token, self.reese84_renewInSec = getReese84Token() + self.token_semaphore.release() # produce a new token + self.token_access_semaphore.release() + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def renew_reese84_token(self): @@ -33,9 +48,11 @@ def renew_reese84_token(self): """ print("renewing token") self.token_semaphore.acquire() # invalidate a token - self.reese84_token = getReese84Token() - self.token_semaphore.release() - self.scheduler.enter(self.reese84_token['renewInSec'] - + self.token_access_semaphore.acquire() + self._reese84_token, self.reese84_renewInSec = getReese84Token() + self.token_semaphore.release() # produce a token + self.token_access_semaphore.release() + self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) def start(self): @@ -48,14 +65,16 @@ def start(self): class TicketScraping(threading.Thread): - def __init__(self, token_generator: Reese84TokenUpdating, event_id, subscribe_id, num_seats=2, price_range=(0, 200)): + def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int, emails: list[str]): threading.Thread.__init__(self) self.is_running = False self.is_stopping = False self.event_id = event_id self.subscribe_id = subscribe_id self.num_seats = num_seats - self.price_range = price_range + self.target_price = target_price + self.tolerance = tolerance + self.emails = emails self.token_gen = token_generator self.scheduler = sched.scheduler(time.time, time.sleep) self.initialDelay = random.randint( @@ -75,25 +94,26 @@ def flag_for_termination(self): print(f'Failed to terminate the thread with id={thread_id}') def ticket_scraping(self): - if self.token_gen.token_semaphore._value <= 0: - # phase: retry after a delay - self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL, - constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping) - return # scrape the top-picks from ticketmaster top_picks_url = constants.get_top_picks_url(self.event_id) top_picks_q_params = constants.get_top_picks_query_params( - self.num_seats, self.price_range) + self.num_seats, self.target_price, self.tolerance) top_picks_header = constants.get_top_picks_header() - res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, - cookies=dict(reese84=self.token_gen.reese84_token['token'])) - # print(res.json()) - + res = None + try: + res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, + cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore + except Exception: + # retry after a delay + self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL, + constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping) + return # prune and format the received picks picks_obj = format_seats(res.json(), self.subscribe_id) # periodic task: update collections best_available_seats and best_history_seats - run_periodic_task(picks_obj, self.subscribe_id) + # and automatically spawn async tasks + run_periodic_task(picks_obj, self.subscribe_id, self.target_price, self.emails) print("Got the ticket info from TM. /", res.status_code) self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL, @@ -102,8 +122,8 @@ def ticket_scraping(self): def get_id(self): # returns id of the respective thread if hasattr(self, '_thread_id'): - return self._thread_id - for id, thread in threading._active.items(): + return self._thread_id # type: ignore + for id, thread in threading._active.items(): # type: ignore if thread is self: return id @@ -114,9 +134,9 @@ def run(self): return self.is_running = True self.is_stopping = False - self.ticket_scraping() # randomize start time to scatter out event of API fetching time.sleep(self.initialDelay) + self.ticket_scraping() self.scheduler.run() finally: print( @@ -135,7 +155,7 @@ def start(): '$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]}) for evt in events: ticket_scraping = TicketScraping( - reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["price_range"]) + reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"], evt["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping for scraping_thread in scraping_list.values(): @@ -156,7 +176,7 @@ def start(): # spawn a thread to do scraping operations full_doc = change['fullDocument'] ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() @@ -177,7 +197,7 @@ def start(): # resume scraping if currently paused if doc_id not in scraping_list: ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["price_range"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py new file mode 100644 index 0000000..8ef3e36 --- /dev/null +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -0,0 +1,43 @@ +from .. import constants +from ..models.pick import Pick +import typing, threading +from .util.decorators import StorageInsertionOrder, wrap_fn_return +from .strategies.quarters_seats import QuartersSeats +from ..schedulers.mail_scheduler import mail_scheduler + + +def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int, emails: list[str]): + picks_size = len(list(picks)) + if picks_size == 0: return + + store = StorageInsertionOrder(size=picks_size) + def fn(arg: tuple[int, Pick]): + idx, pick = arg + thread = threading.Thread(target=wrap_fn_return(run_async_task, store.set, idx), args=(pick, scraping_id, target_price)) + thread.start() + return thread + + threads = list(map(fn, enumerate(picks))) + + for thread in threads: thread.join() + + # filter out new seats that do not need alerts + store.filter() + # better seats first + store.sort() + # get seats used in alerting + alert_seats = store.sublist(0, min(store.size, constants.ALERT_SEATS_MAX_COUNT)) + # get the alert information + alert_contents = list(map(lambda qs: qs.get_alert_content(), alert_seats)) # type: ignore + # send the alert to user + mail_scheduler.send(emails, alert_contents) + + +def run_async_task(pick: Pick, scraping_id: str, target_price: int): + # Alert the user based on alert conditions + qs = QuartersSeats(pick, scraping_id, target_price) + if not qs.shouldAlert(): + return None + + # success + return qs diff --git a/apps/ticketscraping/tasks/periodic.py b/apps/ticketscraping/tasks/periodic.py index d0ce8ac..0f0fcda 100644 --- a/apps/ticketscraping/tasks/periodic.py +++ b/apps/ticketscraping/tasks/periodic.py @@ -1,6 +1,7 @@ from ...storage.storage import find_many, insert_many, delete_many from ...ticketscraping import constants from ..models.pick import Pick +from ..schedulers.async_tasks_scheduler import async_tasks_scheduler def generate_picks_set_from_picks(picks): def __helper(pick: dict): @@ -38,8 +39,7 @@ def insert_history_seats(seats: set[Pick]): return insert_many(constants.DATABASE['BEST_HISTORY_SEATS'], list(map(lambda seat: vars(seat), seats))) - -def run_periodic_task(picks: dict, scraping_id: str): +def run_periodic_task(picks, scraping_id: str, target_price: int, emails: list[str]): # B the list of new best available seats new_best_avail = generate_picks_set_from_picks(picks) # A be the list of current best available seats @@ -65,7 +65,5 @@ def run_periodic_task(picks: dict, scraping_id: str): # Save C to best_history_seats. insert_history_seats(overwritten_seats) - # TODO # Use D to invoke a handler to analyze them against the best_history_seats asynchronously. - - pass \ No newline at end of file + async_tasks_scheduler.send(new_seats, scraping_id, target_price, emails) diff --git a/apps/ticketscraping/tasks/strategies/quarters_seats.py b/apps/ticketscraping/tasks/strategies/quarters_seats.py new file mode 100644 index 0000000..5ac1d8c --- /dev/null +++ b/apps/ticketscraping/tasks/strategies/quarters_seats.py @@ -0,0 +1,180 @@ +import pymongo +from ....storage.storage import count_docs, find_many +from ....storage.query import find_max, find_min, find_many_ascending_order +from ....ticketscraping import constants +from ...models.pick import Pick +from ..util.math import percentile +from ....pushnotification.msg_formatter import format_entire_mail + +class QuartersSeats(): + top_better_history_seats_sort = [ + ('rank', pymongo.DESCENDING), ('price', pymongo.ASCENDING)] + + def __init__(self, pick: Pick, scraping_id: str, target_price: int): + self._pick = pick + self._scraping_id = scraping_id + self.target_price = target_price + self.percentile = 1.0 + self._num_before = 0 + self._num_total = 0 + + def __lt__(self, other): + # smaller the percentile, better the pick + return self.percentile > other.percentile + + def __eq__(self, other): + return self.percentile == other.percentile + + @property + def pick(self): + return self._pick + + @property + def scraping_id(self): + return self._scraping_id + + def get_alert_content(self): + # alert user with content + # new seat info (price, sec, row?, seats?) + # new seat rank, total, and percentile + cur_rank_new_seat = self._num_before + 1 + cur_total = self._num_total + 1 + percentile = self.percentile + # top best history seats: used for comparison + top_best_history_seats = self.get_top_better_history_seats() + # all history seats: used for comparison + + # exact same seat in the history based on (sec, row?, seat?) + same_seats = self.get_exact_same_seats() + # notify user with info + return format_entire_mail(pick=self.pick, target_price=self.target_price, percentile=percentile, rank=cur_rank_new_seat, num_total=cur_total, top_history_seats=top_best_history_seats, same_seats=same_seats) + + def get_top_better_history_seats(self, limit=constants.TOP_COMPARED_HISTORY_SEATS): + return self.find_better_history_seats(sort=self.top_better_history_seats_sort, limit=limit) + + def shouldAlert(self): + try: + # Find price match + self.target_price_metric() + # Find enough history seats data + percentile = self.percentile_metric() + # Find the % of change + percent_change = self.percent_of_change_metric() + # Find percentile of seats in quarters + self.quarter_percentile_metric() + except Exception as ex: + print(ex) + return False + + # success + print(f"percent change out of max-min: {percent_change*100}") + print(f"all history seats percentile: {percentile*100}") + print( + f"new seat - price: {self.pick.price} rank: {self.pick.quality} section: {self.pick.section} row: {self.pick.row}") + print(f"quarters history seats percentile: {self.percentile*100}") + return True + + + def quarter_percentile_metric(self): + if self.get_percentile() > constants.PERCENTILE_HISTORY_PRICES: + raise Exception('the seat is not recommended') + + def get_percentile(self): + self._num_before = self.count_better_history_seats() + self._num_total = self.count_quarters_history_seats() + if self._num_total < constants.MINIMUM_HISTORY_DATA: + raise Exception('no enough history seats data(count < 3)') + self.percentile = percentile(self._num_before, self._num_total) + return self.percentile + + def count_quarters_history_seats(self): + filter_obj = self.__get_quarters_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def __get_quarters_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "$or": [ + { + "price": {"$lte": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + }, + { + "price": {"$gt": self._pick.price}, + "quality": {"$lt": self._pick.quality}, + } + ] + } + + def __get_better_history_seats_filter__(self): + return { + "scraping_id": self._scraping_id, + "price": {"$lte": self._pick.price}, + "quality": {"$gte": self._pick.quality}, + } + + def find_better_history_seats(self, **kwargs): + limit = kwargs.get('limit') + sort_seq = kwargs.get('sort') + filter_obj = self.__get_better_history_seats_filter__() + return find_many(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj, sort=sort_seq, limit=limit) + + def count_better_history_seats(self): + filter_obj = self.__get_better_history_seats_filter__() + return count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], filter_obj) + + def target_price_metric(self): + # exceed target price - abort + if self.pick.price > self.target_price: + raise Exception('price of the seat is not low enough') + + + def percent_of_change_metric(self) -> float: + # Find the % of change + max_seat = find_max(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + min_seat = find_min(constants.DATABASE['BEST_HISTORY_SEATS'], { + "scraping_id": self.scraping_id}, 'price') + max_price = 0 if type(max_seat) is not dict else max_seat.get('price', 0) + min_price = 0 if type(min_seat) is not dict else min_seat.get('price', 0) + + # min and max are identical - abort + if max_price == min_price: + raise Exception('min and max prices are identical') + + percent_change = (self.pick.price - min_price) / (max_price - min_price) + + # # price of change exceeds the metric value - abort + # if percent_change > constants.PERCENT_OF_CHANGE: + # raise Exception( + # f'price of change ({percent_change}) exceeds the metric value') + + return percent_change + + + def percentile_metric(self) -> float: + rank = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "price": {"$lte": self.pick.price}}) + total_count = count_docs(constants.DATABASE['BEST_HISTORY_SEATS'], + { + "scraping_id": self.scraping_id}) + + # no history seats data - abort + if total_count < constants.MINIMUM_HISTORY_DATA: + raise Exception('no enough history seats data (count < 3)') + + percentile = rank / total_count + + # # percentile of history prices exceeds the metric value - abort + # if percentile > constants.PERCENTILE_HISTORY_PRICES: + # raise Exception( + # 'percentile of history prices ({percentile}) exceeds the metric value') + + return percentile + + + def get_exact_same_seats(self): + return find_many_ascending_order(constants.DATABASE['BEST_HISTORY_SEATS'], + {"scraping_id": self.scraping_id, "section": self.pick.section, + "row": self.pick.row, "seat_columns": self.pick.seat_columns}, + 'last_modified') diff --git a/apps/ticketscraping/tasks/util/decorators.py b/apps/ticketscraping/tasks/util/decorators.py new file mode 100644 index 0000000..aa2929c --- /dev/null +++ b/apps/ticketscraping/tasks/util/decorators.py @@ -0,0 +1,36 @@ +class StorageInsertionOrder: + def __init__(self, size=0): + self.store = [0] * size + + def __iter__(self): + return iter(self.store) + + @property + def size(self): + return len(self.store) + + def add(self, item): + self.store.append(item) + + def get(self, index: int): + return self.store[index] + + def set(self, index: int, item): + self.store[index] = item + + def sort(self): + sorted(self.store) + + def filter(self): + self.store = list(filter(lambda item: item is not None, self.store)) + + def sublist(self, from_idx: int, to_idx: int): + return self.store[from_idx:to_idx] + + +def wrap_fn_return(fn, storing_fn, index): + def inner_fn(*args, **kwargs): + res = fn(*args, **kwargs) + storing_fn(index, res) + return res + return inner_fn \ No newline at end of file diff --git a/apps/ticketscraping/tasks/util/math.py b/apps/ticketscraping/tasks/util/math.py new file mode 100644 index 0000000..a19642c --- /dev/null +++ b/apps/ticketscraping/tasks/util/math.py @@ -0,0 +1,9 @@ +def percentile(num_before: int, total: int) -> float: + denominator = total + if denominator == 0: + denominator = 1 + return num_before / denominator + +def percentileInBetween(num_before: int, num_after: int)->float: + denominator = num_before + num_after + 1 + return percentile(num_before, denominator) diff --git a/apps/trackerapi/error_handler.py b/apps/trackerapi/error_handler.py new file mode 100644 index 0000000..b278909 --- /dev/null +++ b/apps/trackerapi/error_handler.py @@ -0,0 +1,18 @@ +from django.http import JsonResponse + +# custom exceptions +class BadRequestException(Exception): + pass + +# error handler decorator +def wrap_error_handler(fn): + def inner_fn(*args, **kwargs): + try: + return fn(*args, **kwargs) + except BadRequestException as ex: + # bad request + return JsonResponse({"error": str(ex)}, status=400) + except Exception as ex: + # internal server error + return JsonResponse({"error": "something went wrong."}, status=500) + return inner_fn \ No newline at end of file diff --git a/apps/trackerapi/views.py b/apps/trackerapi/views.py index 635d7d2..39a98c9 100644 --- a/apps/trackerapi/views.py +++ b/apps/trackerapi/views.py @@ -1,36 +1,40 @@ from django.shortcuts import render from django.http import HttpResponse, HttpRequest from ..storage.storage import insert_one, find_one_and_update +from .error_handler import BadRequestException, wrap_error_handler from ..ticketscraping import constants import json # Create your views here. +@wrap_error_handler def subscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) # validation - for key in ['name', 'price_range', 'ticket_num', 'tm_event_id']: + for key in constants.SUBSCRIBE_REQUEST_PROPS.values(): if key not in body: - return HttpResponse('Request is invalid.', status=400) - doc = { - "name": body["name"], - "price_range": body["price_range"], - "ticket_num": body["ticket_num"], - "tm_event_id": body["tm_event_id"] - } + raise BadRequestException('Request is invalid.') + # validation + target_price = body["target_price"] + tolerance = body["tolerance"] + if target_price - tolerance < 0: + raise BadRequestException('Lowest price cannot be negative.') + + doc = constants.filter_obj_from_attrs(body, constants.SUBSCRIBE_REQUEST_PROPS) + insert_one(constants.DATABASE['EVENTS'], doc) return HttpResponse('OK', status=200) + +@wrap_error_handler def unsubscribe_tm_event_price_tracking(req: HttpRequest): if req.method == 'POST': body = json.loads(req.body) - # validation id = body['subscription_id'] - if not id: - return HttpResponse('Request is invalid.', status=400) - res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True, }}) + + res = find_one_and_update(constants.DATABASE['EVENTS'], {"_id": id}, {'$set': {'markPaused': True}}) if res: return HttpResponse('OK', status=200) else: - return HttpResponse('Subscription id not found.', status=400) + raise BadRequestException('Subscription id not found.') diff --git a/secret.py b/secret.py index ed02f63..e396b52 100644 --- a/secret.py +++ b/secret.py @@ -1,3 +1,4 @@ import os -CONN_SRV = os.environ.get('CONN_SRV') \ No newline at end of file +CONN_SRV = os.environ.get('CONN_SRV') +MAILER_PW = os.environ.get('MAILER_PW', '') diff --git a/utils.py b/utils.py index 4f1ced6..70dd09f 100644 --- a/utils.py +++ b/utils.py @@ -1,4 +1,4 @@ -from pymongo import MongoClient +from pymongo.mongo_client import MongoClient from secret import CONN_SRV client = MongoClient(CONN_SRV)