Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions apps/pushnotification/msg_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import datetime
from ..ticketscraping.models.pick import Pick

def price_formatter(price):
price_str = ''
if type(price) is float or int:
price_str = "$" + "{:.2f}".format(price)
elif type(price) is str:
price_str = price
return price_str

def decimal_to_percent(num: float):
return "{:.2f}".format(num*100) + "%"

def format_date(date: datetime):
return date.isoformat()

def default_formatter(s: str):
return s

def format_seat_columns(cols):
if type(cols) is str:
return cols
elif type(cols) is list:
return "(" + ",".join(cols) + ")"
return '-'

def apply_format(s, formatter)->str:
return formatter(s)

def apply(values: list, formatters: list, delimiter="\t"):
if len(values) != len(formatters):
raise Exception('values and formatters must have the same length')
s = []
for i in range(len(values)):
s.append(apply_format(values[i], formatters[i]))
return delimiter.join(s)

def format_full_seat(seat: dict, delimiter="\t"):
price = seat.get("price", "n/a")
section = seat.get("section", "n/a")
row = seat.get("row", "n/a")
seat_columns = seat.get("seat_columns", "n/a")
last_modified = seat.get("last_modified", "n/a")
return apply(
[price, section, row, seat_columns, last_modified],
[price_formatter, default_formatter, default_formatter,
format_seat_columns, format_date],
delimiter)

def format_price_only_seat(seat: dict, delimiter="\t"):
price = seat.get("price", "n/a")
last_modified = seat.get("last_modified", "n/a")
return apply([price, last_modified], [price_formatter, format_date], delimiter)

def format_seat(seat: dict, price_only=False, delimiter="\t"):
if price_only:
return format_price_only_seat(seat, delimiter)
else:
return format_full_seat(seat, delimiter)

def format_seats(seats: list, price_only=False, delimiter="\t"):
return "\n".join([format_seat(seat, price_only, delimiter) for seat in seats])


def format_entire_mail(pick: Pick, target_price: int, percentile: float, rank: int, num_total: int, top_history_seats: list, same_seats: list):
"""
structure of message:
1. greetings
2. attributes of new seats
3. top 3 comparable history seats
4. exact same seats if possible
5. signature
"""
p1 = (
f"Hi!"
)
p2 = (
f"Congratulations! Ticket tracker reminds you that your ticket subscription request with target price {price_formatter(target_price)} "
f"found better budget seats (price, section, row, seats) at ({format_full_seat(vars(pick), delimiter=', ')}). "
f"{decimal_to_percent(percentile)} of all comparable seats in the history are better than the newly found seats, that is, "
f"they rank no.{rank} out of {num_total} comparable seats in the history."
)
p3 = (
f"You can compare to history seats that are better than the newly found seats:"
f"{chr(10)}"
f"{format_seats(top_history_seats, price_only=False)}"
) if len(top_history_seats) > 0 else ""
p4 = (
f"The newly found seats have history prices:"
f"{chr(10)}"
f"{format_seats(same_seats, price_only=True)}"
) if len(same_seats) > 0 else ""
p5 = (
f"Bests,"
f"{chr(10)}"
f"Ticketmaster Ticket Tracker"
)
paras = list(filter(lambda p: len(p) > 0, [p1, p2, p3, p4, p5]))
return "\n\n".join(paras)
23 changes: 13 additions & 10 deletions apps/pushnotification/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ def server_login(server: SMTP_SSL, password: str):
return server.login(constants.sender_email, password)


def auth_server(server: SMTP_SSL):
server_login(server, constants.app_password)


server = init_server()
# auth_server(server)


def server_send_email(server: SMTP_SSL, receiver_emails: list[str], message: str):
em = EmailMessage()
em['From'] = constants.sender_email
Expand All @@ -32,10 +24,21 @@ def server_send_email(server: SMTP_SSL, receiver_emails: list[str], message: str
return server.sendmail(constants.sender_email, receiver_emails, em.as_string())


def send_email(receiver_emails: list[str], message: str):
def send_email(receiver_emails: list[str], messages: list[str]):
if len(messages) == 0:
return
# print(messages[0])
try:
err = server_send_email(server, receiver_emails, message)
err = server_send_email(server, receiver_emails, messages[0])
if err is not None:
raise Exception('could not send email to the receiver')
except Exception as ex:
print(ex)


server = init_server()


def auth_server():
global server
server_login(server, constants.app_password)
1 change: 0 additions & 1 deletion apps/startup/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def run():
from apps.ticketscraping.connection.asyn_tasks_receiver import run
conn_process = Process(target=run)
conn_process.start()
conn_process.join()


class MyAppConfig(AppConfig):
Expand Down
6 changes: 3 additions & 3 deletions apps/ticketscraping/connection/mail_receiver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from apps.ticketscraping.connection.receiver_process import ReceiverProcess
# from ..tasks.asynchronous import run_async_tasks
from apps.pushnotification.smtp import send_email, auth_server
from apps.ticketscraping.constants import SERVICE_LOCALHOST, MAIL_RECEIVER_PORT

def run():
# start itself
receiver = ReceiverProcess(lambda x: print(
x), SERVICE_LOCALHOST, MAIL_RECEIVER_PORT)
auth_server()
receiver = ReceiverProcess(send_email, SERVICE_LOCALHOST, MAIL_RECEIVER_PORT)
receiver.connect()
receiver.serve_forever()

Expand Down
9 changes: 8 additions & 1 deletion apps/ticketscraping/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def get_top_picks_url(

SUBSCRIBE_REQUEST_PROPS = {
'NAME': 'name',
'CLIENT_NAME': 'client_name',
'CLIENT_EMAILS': 'client_emails',
'TARGET_PRICE': 'target_price',
'TOLERANCE': 'tolerance',
'TICKET_NUM': 'ticket_num',
Expand All @@ -36,14 +38,19 @@ def filter_obj_from_attrs(obj, atts: dict[str,str]):
for key in atts.values():
if key in obj:
res[key] = obj[key]
if len(res) != len(atts):
raise Exception('lack of attributes')
return res


# metric thresholds
MINIMUM_HISTORY_DATA = 3
PERCENT_OF_CHANGE = 0.5
PERCENTILE_HISTORY_PRICES = 0.25

# alert content constants
ALERT_SEATS_MAX_COUNT = 3
TOP_COMPARED_HISTORY_SEATS = 3

def get_top_picks_header(): return {
**BASIC_REQ_HEADER,
Expand Down Expand Up @@ -71,7 +78,7 @@ def get_top_picks_query_params(qty: int, target_price: int, tolerance: int): ret
TOKEN_RENEW_PRIORITY = 1
TICKET_SCRAPING_PRIORITY = 3
TICKET_SCRAPING_INTERVAL = 60
TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 5
TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL = 10

INJECTOR_LOCATION = "js/injector.js"
INJECTOR_HEADER_LOCATION = "js/injector-header.js"
Expand Down
56 changes: 37 additions & 19 deletions apps/ticketscraping/scraping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,35 @@
from ..storage.storage import *
from .seat_analysis import format_seats
from .tasks.periodic import run_periodic_task
import traceback

class Reese84TokenUpdating():
def __init__(self):
self.is_running = False
self.reese84_token = ''
self._reese84_token = ''
self.reese84_renewInSec = 0
self.token_access_semaphore = Semaphore(1) # one can access at a time
self.token_semaphore = Semaphore(0)
self.scheduler = sched.scheduler(time.time, time.sleep)

@property
def reese84_token(self):
self.token_semaphore.acquire()
self.token_access_semaphore.acquire()
token = self._reese84_token
self.token_semaphore.release()
self.token_access_semaphore.release()

return token

def initialize_reese84_token(self):
"""
This method should not be called directly.
"""
self.reese84_token, self.reese84_renewInSec = getReese84Token()
self.token_semaphore.release() # produce a new token
self.token_access_semaphore.acquire()
self._reese84_token, self.reese84_renewInSec = getReese84Token()
self.token_semaphore.release() # produce a new token
self.token_access_semaphore.release()
self.scheduler.enter(self.reese84_renewInSec -
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)

Expand All @@ -34,8 +48,10 @@ def renew_reese84_token(self):
"""
print("renewing token")
self.token_semaphore.acquire() # invalidate a token
self.reese84_token = getReese84Token()
self.token_semaphore.release()
self.token_access_semaphore.acquire()
self._reese84_token, self.reese84_renewInSec = getReese84Token()
self.token_semaphore.release() # produce a token
self.token_access_semaphore.release()
self.scheduler.enter(self.reese84_renewInSec -
constants.TOKEN_RENEW_SEC_OFFSET, constants.TOKEN_RENEW_PRIORITY, self.renew_reese84_token)

Expand All @@ -49,7 +65,7 @@ def start(self):


class TicketScraping(threading.Thread):
def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int):
def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscribe_id: str, num_seats: int, target_price: int, tolerance: int, emails: list[str]):
threading.Thread.__init__(self)
self.is_running = False
self.is_stopping = False
Expand All @@ -58,6 +74,7 @@ def __init__(self, token_generator: Reese84TokenUpdating, event_id: str, subscri
self.num_seats = num_seats
self.target_price = target_price
self.tolerance = tolerance
self.emails = emails
self.token_gen = token_generator
self.scheduler = sched.scheduler(time.time, time.sleep)
self.initialDelay = random.randint(
Expand All @@ -77,25 +94,26 @@ def flag_for_termination(self):
print(f'Failed to terminate the thread with id={thread_id}')

def ticket_scraping(self):
if self.token_gen.token_semaphore._value <= 0:
# phase: retry after a delay
self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL,
constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping)
return
# scrape the top-picks from ticketmaster
top_picks_url = constants.get_top_picks_url(self.event_id)
top_picks_q_params = constants.get_top_picks_query_params(
self.num_seats, self.target_price, self.tolerance)
top_picks_header = constants.get_top_picks_header()
res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params,
cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore

res = None
try:
res = requests.get(top_picks_url, headers=top_picks_header, params=top_picks_q_params,
cookies=dict(reese84=self.token_gen.reese84_token)) # type: ignore
except Exception:
# retry after a delay
self.scheduler.enter(constants.TICKET_SCRAPING_TOKEN_AWAIT_MAX_INTERVAL,
constants.TICKET_SCRAPING_PRIORITY, self.ticket_scraping)
return
# prune and format the received picks
picks_obj = format_seats(res.json(), self.subscribe_id)

# periodic task: update collections best_available_seats and best_history_seats
# and automatically spawn async tasks
run_periodic_task(picks_obj, self.subscribe_id, self.target_price)
run_periodic_task(picks_obj, self.subscribe_id, self.target_price, self.emails)

print("Got the ticket info from TM. /", res.status_code)
self.scheduler.enter(constants.TICKET_SCRAPING_INTERVAL,
Expand All @@ -116,9 +134,9 @@ def run(self):
return
self.is_running = True
self.is_stopping = False
self.ticket_scraping()
# randomize start time to scatter out event of API fetching
time.sleep(self.initialDelay)
self.ticket_scraping()
self.scheduler.run()
finally:
print(
Expand All @@ -137,7 +155,7 @@ def start():
'$or': [{'markPaused': {'$exists': False}}, {'markPaused': False}]})
for evt in events:
ticket_scraping = TicketScraping(
reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"])
reese_token_gen, evt["tm_event_id"], evt["_id"], evt["ticket_num"], evt["target_price"], evt["tolerance"], evt["client_emails"])
print(ticket_scraping.initialDelay, "s")
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
for scraping_thread in scraping_list.values():
Expand All @@ -158,7 +176,7 @@ def start():
# spawn a thread to do scraping operations
full_doc = change['fullDocument']
ticket_scraping = TicketScraping(
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"])
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"])
print(ticket_scraping.initialDelay, "s")
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
ticket_scraping.start()
Expand All @@ -179,7 +197,7 @@ def start():
# resume scraping if currently paused
if doc_id not in scraping_list:
ticket_scraping = TicketScraping(
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"])
reese_token_gen, full_doc["tm_event_id"], full_doc["_id"], full_doc["ticket_num"], full_doc["target_price"], full_doc["tolerance"], full_doc["client_emails"])
print(ticket_scraping.initialDelay, "s")
scraping_list[ticket_scraping.subscribe_id] = ticket_scraping
ticket_scraping.start()
Expand Down
4 changes: 2 additions & 2 deletions apps/ticketscraping/tasks/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ..schedulers.mail_scheduler import mail_scheduler


def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int):
def run_async_tasks(picks: typing.Iterable[Pick], scraping_id: str, target_price: int, emails: list[str]):
picks_size = len(list(picks))
if picks_size == 0: return

Expand All @@ -30,7 +30,7 @@ def fn(arg: tuple[int, Pick]):
# get the alert information
alert_contents = list(map(lambda qs: qs.get_alert_content(), alert_seats)) # type: ignore
# send the alert to user
mail_scheduler.send(alert_contents)
mail_scheduler.send(emails, alert_contents)


def run_async_task(pick: Pick, scraping_id: str, target_price: int):
Expand Down
4 changes: 2 additions & 2 deletions apps/ticketscraping/tasks/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def insert_history_seats(seats: set[Pick]):
return insert_many(constants.DATABASE['BEST_HISTORY_SEATS'], list(map(lambda seat: vars(seat), seats)))


def run_periodic_task(picks, scraping_id: str, target_price: int):
def run_periodic_task(picks, scraping_id: str, target_price: int, emails: list[str]):
# B the list of new best available seats
new_best_avail = generate_picks_set_from_picks(picks)
# A be the list of current best available seats
Expand All @@ -66,4 +66,4 @@ def run_periodic_task(picks, scraping_id: str, target_price: int):
insert_history_seats(overwritten_seats)

# Use D to invoke a handler to analyze them against the best_history_seats asynchronously.
async_tasks_scheduler.send(new_seats, scraping_id, target_price)
async_tasks_scheduler.send(new_seats, scraping_id, target_price, emails)
Loading