forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresult_queue.py
More file actions
185 lines (155 loc) · 7.01 KB
/
result_queue.py
File metadata and controls
185 lines (155 loc) · 7.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""A WSGI app that receives JSON scan results via HTTP and enqueues them to be dispatched to the results processor"""
import os
import sys
import time
import requests
import logging
import json
import emoji
import traceback
import asyncio
from threading import Thread
from flask import Flask, request
from redis import Redis, ConnectionPool
from rq import Queue, Retry, Worker
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
pool = ConnectionPool(host="127.0.0.1", port=6379, db=0)
PROCESSOR_URL = os.getenv("PROCESSOR_URL", "http://result-processor.scanners.svc.cluster.local")
redis = Redis(connection_pool=pool)
https_queue = Queue("https", connection=redis)
ssl_queue = Queue("ssl", connection=redis)
dns_queue = Queue("dns", connection=redis)
default_queues = {"https": https_queue, "ssl": ssl_queue, "dns": dns_queue}
def Server(process_name, queues=default_queues):
"""Flask app that adds incoming JSON scan results to Redis queues to be dispatched later.
Routes are /https, /ssl and /dns.
Needs a Redis server to function and RQ workers must be started for results to be dispatched.
:param str process_name: process name to run flask app as.
:param dict queues: dict with RQ Queues for each scanner.
:return: This Flask app.
:rtype: Flask
"""
flask_app = Flask(process_name)
flask_app.config["queues"] = queues
@flask_app.route("/https", methods=["POST"])
def enqueue_https():
"""Enqueues a result processing request received at /https to the HTTPS Redis Queue.
:return: a message indicating whether the result processing request was enqueued successfully.
:rtype: str
"""
logging.info("HTTPS result processing request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("https", None)
designated_queue.enqueue(
dispatch_https,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "HTTPS result processing request enqueued."
logging.info(msg)
except Exception as e:
msg = f"An unexpected error occurred while attempting to enqueue HTTPS result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
@flask_app.route("/ssl", methods=["POST"])
def enqueue_ssl():
"""Enqueues a result processing request received at /ssl to the SSL Redis Queue.
:return: a message indicating whether the result processing request was enqueued successfully.
:rtype: str
"""
logging.info("SSL result processing request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("ssl", None)
designated_queue.enqueue(
dispatch_ssl,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "SSL result processing request enqueued."
logging.info(msg)
except Exception as e:
msg = f"An unexpected error occurred while attempting to enqueue SSL result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
@flask_app.route("/dns", methods=["POST"])
def enqueue_dns():
"""Enqueues a result processing request received at /dns to the DNS Redis Queue.
:return: a message indicating whether the result processing request was enqueued successfully.
:rtype: str
"""
logging.info("DNS result processing request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("dns", None)
designated_queue.enqueue(
dispatch_dns,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "DNS result processing request enqueued."
logging.info(msg)
except Exception as e:
msg = f"An unexpected error occurred while attempting to enqueue DNS result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
return flask_app
app = Server(__name__)
def dispatch_https(payload):
"""Dispatches an HTTPS result processing request to the result processor.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON HTTPS scan results for a domain from a scanner.
:return: A message indicating whether the request was dispatched successfully.
:rtype: str
"""
logging.info("Dispatching HTTPS result processing request")
try:
requests.post(PROCESSOR_URL, json=payload)
return "Dispatched HTTPS result processing request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch HTTPS result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
def dispatch_ssl(payload):
"""Dispatches an SSL result processing request to the result processor.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON SSL scan results for a domain from a scanner.
:return: A message indicating whether the request was dispatched successfully.
:rtype: str
"""
logging.info("Dispatching SSL result processing request")
try:
requests.post(PROCESSOR_URL, json=payload)
return "Dispatched SSL result processing request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch SSL result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
def dispatch_dns(payload):
"""Dispatches a DNS result processing request to the result processor.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON DNS scan results for a domain from a scanner.
:return: A message indicating whether the request was dispatched successfully.
:rtype: str
"""
logging.info("Dispatching DNS result processing request")
try:
requests.post(PROCESSOR_URL, json=payload)
return "Dispatched DNS result processing request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch DNS result processing request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg