forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscan_queue.py
More file actions
203 lines (167 loc) · 7.48 KB
/
scan_queue.py
File metadata and controls
203 lines (167 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""A WSGI app that receives JSON scan requests via HTTP and enqueues them to be dispatched to the appropriate scanner"""
import os
import sys
import time
import requests
import logging
import json
import emoji
import traceback
import asyncio
from threading import Thread
from flask import Flask, request
from redis import Redis, ConnectionPool
from rq import Queue, Retry, Worker
HTTPS_URL = os.getenv("HTTPS_URL", "http://https-scanner.scanners.svc.cluster.local")
SSL_URL = os.getenv("SSL_URL", "http://ssl-scanner.scanners.svc.cluster.local")
DNS_URL = os.getenv("DNS_URL", "http://dns-scanner.scanners.svc.cluster.local")
PUBSUB_HOST = os.getenv("PUBSUB_HOST")
PUBSUB_PORT = os.getenv("PUBSUB_PORT")
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# ConnectionPool for Redis server running in the same container as this app; see Dockerfile
pool = ConnectionPool(host="127.0.0.1", port=6379, db=0)
queue_redis = Redis(connection_pool=pool)
# pubsub_redis = Redis(host=PUBSUB_HOST, port=PUBSUB_PORT, db=0)
# RQ queues for scan dispatches, RQ workers must be running for jobs to be executed
https_queue = Queue("https", connection=queue_redis)
ssl_queue = Queue("ssl", connection=queue_redis)
dns_queue = Queue("dns", connection=queue_redis)
default_queues = {"https": https_queue, "ssl": ssl_queue, "dns": dns_queue}
# def publish_update(scan_type, user_key, message):
# try:
# pubsub_redis.publish(f"scan/{scan_type}/{user_key}", message)
# except Exception as e:
# logging.error(f"Unexpected error occurred while attempting to publish update to redis queue: {traceback.format_exc()}")
def Server(process_name, queues=default_queues):
"""Flask app that adds incoming JSON scan requests to Redis queues to be dispatched later.
Routes are /https, /ssl and /dns.
Needs a Redis server to function and RQ workers must be started for scans to be dispatched.
:param str process_name: process name to run flask app as.
:param dict queues: dict with RQ Queues for each scanner.
:return: This Flask app.
:rtype: Flask
"""
flask_app = Flask(process_name)
flask_app.config["queues"] = queues
@flask_app.route("/https", methods=["POST"])
def enqueue_https():
"""Enqueues a request received at /https to the HTTPS Redis Queue
:return: a message indicating whether the request was enqueued successfully.
:rtype: str
"""
logging.info("HTTPS scan request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("https", None)
designated_queue.enqueue(
dispatch_https,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "HTTPS scan request enqueued."
logging.info(msg)
except Exception as e:
msg = "An unexpected error occurred while attempting to enqueue HTTPS scan request"
logging.error(msg + f"\nFull traceback: {traceback.format_exc()}")
# publish_update("https", json.loads(payload)["user_key"], msg)
return msg
@flask_app.route("/ssl", methods=["POST"])
def enqueue_ssl():
"""Enqueues a request received at /ssl to the SSL Redis Queue
:return: a message indicating whether the request was enqueued successfully.
:rtype: str
"""
logging.info("SSL scan request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("ssl", None)
designated_queue.enqueue(
dispatch_ssl,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "SSL scan request enqueued."
logging.info(msg)
except Exception as e:
msg = "An unexpected error occurred while attempting to enqueue SSL scan request"
logging.error(msg + f"\nFull traceback: {traceback.format_exc()}")
# publish_update("ssl", json.loads(payload)["user_key"], msg)
return msg
@flask_app.route("/dns", methods=["POST"])
def enqueue_dns():
"""Enqueues a request received at /dns to the DNS Redis Queue
:return: a message indicating whether the request was enqueued successfully.
:rtype: str
"""
logging.info("DNS scan request received.")
try:
payload = request.get_json(force=True)
designated_queue = flask_app.config["queues"].get("dns", None)
designated_queue.enqueue(
dispatch_dns,
payload,
retry=Retry(max=3),
job_timeout=86400,
result_ttl=86400,
)
msg = "DNS scan request enqueued."
logging.info(msg)
except Exception as e:
msg = "An unexpected error occurred while attempting to enqueue DNS scan request"
logging.error(msg + f"\nFull traceback: {traceback.format_exc()}")
# publish_update("dns", json.loads(payload)["user_key"], msg)
return msg
return flask_app
app = Server(__name__)
def dispatch_https(payload):
"""Dispatches a scan request to be performed by the HTTPS scanner.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON request containing info about the domain to be scanned
:return: A message indicating whether the request was dispatched successfully
:rtype: str
"""
logging.info("Dispatching HTTPS scan request")
try:
requests.post(HTTPS_URL, json=payload)
return "Dispatched HTTPS scan request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch HTTPS scan request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
def dispatch_ssl(payload):
"""Dispatches a scan request to be performed by the SSL scanner.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON request containing info about the domain to be scanned
:return: A message indicating whether the request was dispatched successfully
:rtype: str
"""
logging.info("Dispatching SSL scan request")
try:
requests.post(SSL_URL, json=payload)
return "Dispatched SSL scan request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch SSL scan request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg
def dispatch_dns(payload):
"""Dispatches a scan request to be performed by the DNS scanner.
Enqueued alongside the request to be executed by an RQ worker.
:param dict payload: JSON request containing info about the domain to be scanned.
:return: A message indicating whether the request was dispatched successfully.
:rtype: str
"""
logging.info("Dispatching DNS scan request")
try:
requests.post(DNS_URL, json=payload)
return "Dispatched DNS scan request."
except Exception as e:
msg = f"An unexpected error occurred while attempting to dispatch DNS scan request: ({type(e).__name__}: {str(e)})"
logging.error(msg)
logging.error(f"Full traceback: {traceback.format_exc()}")
return msg