diff --git a/apps/pushnotification/msg_formatter.py b/apps/pushnotification/msg_formatter.py new file mode 100644 index 0000000..527795c --- /dev/null +++ b/apps/pushnotification/msg_formatter.py @@ -0,0 +1,100 @@ +from datetime import datetime +from ..ticketscraping.models.pick import Pick + +def price_formatter(price): + price_str = '' + if type(price) is float or int: + price_str = "$" + "{:.2f}".format(price) + elif type(price) is str: + price_str = price + return price_str + +def decimal_to_percent(num: float): + return "{:.2f}".format(num*100) + "%" + +def format_date(date: datetime): + return date.isoformat() + +def default_formatter(s: str): + return s + +def format_seat_columns(cols): + if type(cols) is str: + return cols + elif type(cols) is list: + return "(" + ",".join(cols) + ")" + return '-' + +def apply_format(s, formatter)->str: + return formatter(s) + +def apply(values: list, formatters: list, delimiter="\t"): + if len(values) != len(formatters): + raise Exception('values and formatters must have the same length') + s = [] + for i in range(len(values)): + s.append(apply_format(values[i], formatters[i])) + return delimiter.join(s) + +def format_full_seat(seat: dict, delimiter="\t"): + price = seat.get("price", "n/a") + section = seat.get("section", "n/a") + row = seat.get("row", "n/a") + seat_columns = seat.get("seat_columns", "n/a") + last_modified = seat.get("last_modified", "n/a") + return apply( + [price, section, row, seat_columns, last_modified], + [price_formatter, default_formatter, default_formatter, + format_seat_columns, format_date], + delimiter) + +def format_price_only_seat(seat: dict, delimiter="\t"): + price = seat.get("price", "n/a") + last_modified = seat.get("last_modified", "n/a") + return apply([price, last_modified], [price_formatter, format_date], delimiter) + +def format_seat(seat: dict, price_only=False, delimiter="\t"): + if price_only: + return format_price_only_seat(seat, delimiter) + else: + return format_full_seat(seat, delimiter) + +def format_seats(seats: list, price_only=False, delimiter="\t"): + return "\n".join([format_seat(seat, price_only, delimiter) for seat in seats]) + + +def format_entire_mail(pick: Pick, target_price: int, percentile: float, rank: int, num_total: int, top_history_seats: list, same_seats: list): + """ + structure of message: + 1. greetings + 2. attributes of new seats + 3. top 3 comparable history seats + 4. exact same seats if possible + 5. signature + """ + p1 = ( + f"Hi!" + ) + p2 = ( + f"Congratulations! Ticket tracker reminds you that your ticket subscription request with target price {price_formatter(target_price)} " + f"found better budget seats (price, section, row, seats) at ({format_full_seat(vars(pick), delimiter=', ')}). " + f"{decimal_to_percent(percentile)} of all comparable seats in the history are better than the newly found seats, that is, " + f"they rank no.{rank} out of {num_total} comparable seats in the history." + ) + p3 = ( + f"You can compare to history seats that are better than the newly found seats:" + f"{chr(10)}" + f"{format_seats(top_history_seats, price_only=False)}" + ) if len(top_history_seats) > 0 else "" + p4 = ( + f"The newly found seats have history prices:" + f"{chr(10)}" + f"{format_seats(same_seats, price_only=True)}" + ) if len(same_seats) > 0 else "" + p5 = ( + f"Bests," + f"{chr(10)}" + f"Ticketmaster Ticket Tracker" + ) + paras = list(filter(lambda p: len(p) > 0, [p1, p2, p3, p4, p5])) + return "\n\n".join(paras) diff --git a/apps/pushnotification/smtp.py b/apps/pushnotification/smtp.py index b07b527..3578339 100644 --- a/apps/pushnotification/smtp.py +++ b/apps/pushnotification/smtp.py @@ -14,14 +14,6 @@ def server_login(server: SMTP_SSL, password: str): return server.login(constants.sender_email, password) -def auth_server(server: SMTP_SSL): - server_login(server, constants.app_password) - - -server = init_server() -# auth_server(server) - - def server_send_email(server: SMTP_SSL, receiver_emails: list[str], message: str): em = EmailMessage() em['From'] = constants.sender_email @@ -32,10 +24,21 @@ def server_send_email(server: SMTP_SSL, receiver_emails: list[str], message: str return server.sendmail(constants.sender_email, receiver_emails, em.as_string()) -def send_email(receiver_emails: list[str], message: str): +def send_email(receiver_emails: list[str], messages: list[str]): + if len(messages) == 0: + return + # print(messages[0]) try: - err = server_send_email(server, receiver_emails, message) + err = server_send_email(server, receiver_emails, messages[0]) if err is not None: raise Exception('could not send email to the receiver') except Exception as ex: print(ex) + + +server = init_server() + + +def auth_server(): + global server + server_login(server, constants.app_password) diff --git a/apps/startup/apps.py b/apps/startup/apps.py index 5de7b92..87613c1 100644 --- a/apps/startup/apps.py +++ b/apps/startup/apps.py @@ -29,7 +29,6 @@ def run(): from apps.ticketscraping.connection.asyn_tasks_receiver import run conn_process = Process(target=run) conn_process.start() - conn_process.join() class MyAppConfig(AppConfig): diff --git a/apps/ticketscraping/connection/mail_receiver.py b/apps/ticketscraping/connection/mail_receiver.py index 6ea71be..d5fff51 100644 --- a/apps/ticketscraping/connection/mail_receiver.py +++ b/apps/ticketscraping/connection/mail_receiver.py @@ -1,11 +1,11 @@ from apps.ticketscraping.connection.receiver_process import ReceiverProcess -# from ..tasks.asynchronous import run_async_tasks +from apps.pushnotification.smtp import send_email, auth_server from apps.ticketscraping.constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT def run(): # start itself - receiver = ReceiverProcess(lambda x: print( - x), SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) + auth_server() + receiver = ReceiverProcess(send_email, SERVICE_LOCALHOST, MAIL_RECEIVER_PORT) receiver.connect() receiver.serve_forever() diff --git a/apps/ticketscraping/constants.py b/apps/ticketscraping/constants.py index 116a20f..42ad566 100644 --- a/apps/ticketscraping/constants.py +++ b/apps/ticketscraping/constants.py @@ -25,6 +25,8 @@ def get_top_picks_url( SUBSCRIBE_REQUEST_PROPS = { 'NAME': 'name', + 'CLIENT_NAME': 'client_name', + 'CLIENT_EMAILS': 'client_emails', 'TARGET_PRICE': 'target_price', 'TOLERANCE': 'tolerance', 'TICKET_NUM': 'ticket_num', @@ -36,6 +38,8 @@ def filter_obj_from_attrs(obj, atts: dict[str,str]): for key in atts.values(): if key in obj: res[key] = obj[key] + if len(res) != len(atts): + raise Exception('lack of attributes') return res @@ -43,7 +47,10 @@ def filter_obj_from_attrs(obj, atts: dict[str,str]): MINIMUM_HISTORY_DATA = 3 PERCENT_OF_CHANGE = 0.5 PERCENTILE_HISTORY_PRICES = 0.25 + +# alert content constants ALERT_SEATS_MAX_COUNT = 3 +TOP_COMPARED_HISTORY_SEATS = 3 def get_top_picks_header(): return { **BASIC_REQ_HEADER, @@ -71,7 +78,7 @@ def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): ret TOKEN_RENEW_PRIORITY = 1 TICKET_SCRAPING_PRIORITY = 3 TICKET_SCRAPING_INTERVAL = 60 -TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 5 +TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 10 INJECTOR_LOCATION = "js/injector.js" INJECTOR_HEADER_LOCATION = "js/injector-header.js" diff --git a/apps/ticketscraping/scraping.py b/apps/ticketscraping/scraping.py index 2e3d3d4..f72d020 100644 --- a/apps/ticketscraping/scraping.py +++ b/apps/ticketscraping/scraping.py @@ -10,21 +10,35 @@ from ..storage.storage import * from .seat_analysis import format_seats from .tasks.periodic import run_periodic_task +import traceback class Reese84TokenUpdating(): def __init__(self): self.is_running = False - self.reese84_token = '' + self._reese84_token = '' self.reese84_renewInSec = 0 + self.token_access_semaphore = Semaphore(1) # one can access at a time self.token_semaphore = Semaphore(0) self.scheduler = sched.scheduler(time.time, time.sleep) + @property + def reese84_token(self): + self.token_semaphore.acquire() + self.token_access_semaphore.acquire() + token = self._reese84_token + self.token_semaphore.release() + self.token_access_semaphore.release() + + return token + def initialize_reese84_token(self): """ This method should not be called directly. """ - self.reese84_token, self.reese84_renewInSec = getReese84Token() - self.token_semaphore.release() # produce a new token + self.token_access_semaphore.acquire() + self._reese84_token, self.reese84_renewInSec = getReese84Token() + self.token_semaphore.release() # produce a new token + self.token_access_semaphore.release() self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) @@ -34,8 +48,10 @@ def renew_reese84_token(self): """ print("renewing token") self.token_semaphore.acquire() # invalidate a token - self.reese84_token = getReese84Token() - self.token_semaphore.release() + self.token_access_semaphore.acquire() + self._reese84_token, self.reese84_renewInSec = getReese84Token() + self.token_semaphore.release() # produce a token + self.token_access_semaphore.release() self.scheduler.enter(self.reese84_renewInSec - constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token) @@ -49,7 +65,7 @@ def start(self): class TicketScraping(threading.Thread): - def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int): + def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int, emails: list[str]): threading.Thread.__init__(self) self.is_running = False self.is_stopping = False @@ -58,6 +74,7 @@ def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscri self.num_seats = num_seats self.target_price = target_price self.tolerance = tolerance + self.emails = emails self.token_gen = token_generator self.scheduler = sched.scheduler(time.time, time.sleep) self.initialDelay = random.randint( @@ -77,25 +94,26 @@ def flag_for_termination(self): print(f'Failed to terminate the thread with id={thread_id}') def ticket_scraping(self): - if self.token_gen.token_semaphore._value <= 0: - # phase: retry after a delay - self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL, - constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping) - return # scrape the top-picks from ticketmaster top_picks_url = constants.get_top_picks_url(self.event_id) top_picks_q_params = constants.get_top_picks_query_params( self.num_seats, self.target_price, self.tolerance) top_picks_header = constants.get_top_picks_header() - res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, - cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore - + res = None + try: + res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params, + cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore + except Exception: + # retry after a delay + self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL, + constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping) + return # prune and format the received picks picks_obj = format_seats(res.json(), self.subscribe_id) # periodic task: update collections best_available_seats and best_history_seats # and automatically spawn async tasks - run_periodic_task(picks_obj, self.subscribe_id, self.target_price) + run_periodic_task(picks_obj, self.subscribe_id, self.target_price, self.emails) print("Got the ticket info from TM. /", res.status_code) self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL, @@ -116,9 +134,9 @@ def run(self): return self.is_running = True self.is_stopping = False - self.ticket_scraping() # randomize start time to scatter out event of API fetching time.sleep(self.initialDelay) + self.ticket_scraping() self.scheduler.run() finally: print( @@ -137,7 +155,7 @@ def start(): '$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]}) for evt in events: ticket_scraping = TicketScraping( - reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"]) + reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"], evt["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping for scraping_thread in scraping_list.values(): @@ -158,7 +176,7 @@ def start(): # spawn a thread to do scraping operations full_doc = change['fullDocument'] ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() @@ -179,7 +197,7 @@ def start(): # resume scraping if currently paused if doc_id not in scraping_list: ticket_scraping = TicketScraping( - reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"]) + reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"]) print(ticket_scraping.initialDelay, "s") scraping_list[ticket_scraping.subscribe_id] = ticket_scraping ticket_scraping.start() diff --git a/apps/ticketscraping/tasks/asynchronous.py b/apps/ticketscraping/tasks/asynchronous.py index 77ce4d2..8ef3e36 100644 --- a/apps/ticketscraping/tasks/asynchronous.py +++ b/apps/ticketscraping/tasks/asynchronous.py @@ -6,7 +6,7 @@ from ..schedulers.mail_scheduler import mail_scheduler -def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int): +def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int, emails: list[str]): picks_size = len(list(picks)) if picks_size == 0: return @@ -30,7 +30,7 @@ def fn(arg: tuple[int, Pick]): # get the alert information alert_contents = list(map(lambda qs: qs.get_alert_content(), alert_seats)) # type: ignore # send the alert to user - mail_scheduler.send(alert_contents) + mail_scheduler.send(emails, alert_contents) def run_async_task(pick: Pick, scraping_id: str, target_price: int): diff --git a/apps/ticketscraping/tasks/periodic.py b/apps/ticketscraping/tasks/periodic.py index 74cbe3f..0f0fcda 100644 --- a/apps/ticketscraping/tasks/periodic.py +++ b/apps/ticketscraping/tasks/periodic.py @@ -39,7 +39,7 @@ def insert_history_seats(seats: set[Pick]): return insert_many(constants.DATABASE['BEST_HISTORY_SEATS'], list(map(lambda seat: vars(seat), seats))) -def run_periodic_task(picks, scraping_id: str, target_price: int): +def run_periodic_task(picks, scraping_id: str, target_price: int, emails: list[str]): # B the list of new best available seats new_best_avail = generate_picks_set_from_picks(picks) # A be the list of current best available seats @@ -66,4 +66,4 @@ def run_periodic_task(picks, scraping_id: str, target_price: int): insert_history_seats(overwritten_seats) # Use D to invoke a handler to analyze them against the best_history_seats asynchronously. - async_tasks_scheduler.send(new_seats, scraping_id, target_price) + async_tasks_scheduler.send(new_seats, scraping_id, target_price, emails) diff --git a/apps/ticketscraping/tasks/strategies/quarters_seats.py b/apps/ticketscraping/tasks/strategies/quarters_seats.py index 2e35131..5ac1d8c 100644 --- a/apps/ticketscraping/tasks/strategies/quarters_seats.py +++ b/apps/ticketscraping/tasks/strategies/quarters_seats.py @@ -4,7 +4,7 @@ from ....ticketscraping import constants from ...models.pick import Pick from ..util.math import percentile - +from ....pushnotification.msg_formatter import format_entire_mail class QuartersSeats(): top_better_history_seats_sort = [ @@ -15,6 +15,8 @@ def __init__(self, pick: Pick, scraping_id: str, target_price: int): self._scraping_id = scraping_id self.target_price = target_price self.percentile = 1.0 + self._num_before = 0 + self._num_total = 0 def __lt__(self, other): # smaller the percentile, better the pick @@ -32,17 +34,23 @@ def scraping_id(self): return self._scraping_id def get_alert_content(self): - # alert user - # Find the exact same seat based on(sec, row?, seat?) + # alert user with content + # new seat info (price, sec, row?, seats?) + # new seat rank, total, and percentile + cur_rank_new_seat = self._num_before + 1 + cur_total = self._num_total + 1 + percentile = self.percentile + # top best history seats: used for comparison + top_best_history_seats = self.get_top_better_history_seats() + # all history seats: used for comparison + + # exact same seat in the history based on (sec, row?, seat?) same_seats = self.get_exact_same_seats() - # rank = self._num_before + 1 - # top best history = get_top_better_history_seats() # notify user with info - return '' - pass + return format_entire_mail(pick=self.pick, target_price=self.target_price, percentile=percentile, rank=cur_rank_new_seat, num_total=cur_total, top_history_seats=top_best_history_seats, same_seats=same_seats) - def get_top_better_history_seats(self): - return self.find_better_history_seats(sort=self.top_better_history_seats_sort) + def get_top_better_history_seats(self, limit=constants.TOP_COMPARED_HISTORY_SEATS): + return self.find_better_history_seats(sort=self.top_better_history_seats_sort, limit=limit) def shouldAlert(self): try: @@ -74,6 +82,8 @@ def quarter_percentile_metric(self): def get_percentile(self): self._num_before = self.count_better_history_seats() self._num_total = self.count_quarters_history_seats() + if self._num_total < constants.MINIMUM_HISTORY_DATA: + raise Exception('no enough history seats data(count < 3)') self.percentile = percentile(self._num_before, self._num_total) return self.percentile @@ -99,16 +109,8 @@ def __get_quarters_history_seats_filter__(self): def __get_better_history_seats_filter__(self): return { "scraping_id": self._scraping_id, - "$or": [ - { - "price": {"$lt": self._pick.price}, - "quality": {"$gte": self._pick.quality}, - }, - { - "price": {"$lte": self._pick.price}, - "quality": {"$gt": self._pick.quality}, - } - ] + "price": {"$lte": self._pick.price}, + "quality": {"$gte": self._pick.quality}, } def find_better_history_seats(self, **kwargs): diff --git a/apps/ticketscraping/tasks/util/scheduler.py b/apps/ticketscraping/tasks/util/scheduler.py deleted file mode 100644 index 0d171eb..0000000 --- a/apps/ticketscraping/tasks/util/scheduler.py +++ /dev/null @@ -1,23 +0,0 @@ -import time -from sched import scheduler -from ...constants import AWAKE_SCHEDULER_INTERVAL - -class Scheduler: - def __init__(self, name: str, action): - self.name = name - self.scheduler = scheduler(time.time, time.sleep) - self.awake_scheduler = scheduler(time.time, time.sleep) - self.action = action - - def enter(self, *args, **kwargs): - return self.scheduler.enter(0, 1, self.action, args, kwargs) - - def iter_awake_scheduler(self): - # awake the main scheduler - self.scheduler.run() # blocking - self.awake_scheduler.enter( - AWAKE_SCHEDULER_INTERVAL, 1, self.iter_awake_scheduler) - - def run(self): - self.iter_awake_scheduler() - self.awake_scheduler.run()