forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
123 lines (95 loc) · 3.85 KB
/
server.py
File metadata and controls
123 lines (95 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import sys
import os
import time
import requests
import logging
import json
import emoji
import asyncio
import traceback
import datetime as dt
from concurrent.futures import TimeoutError
from starlette.applications import Starlette
from starlette.routing import Route, Mount, WebSocketRoute
from starlette.responses import Response
from dns_scanner import DMARCScanner, DKIMScanner
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
QUEUE_URL = os.getenv(
"RESULT_QUEUE_URL", "http://result-queue.scanners.svc.cluster.local"
)
OTS_QUEUE_URL = os.getenv(
"OTS_RESULT_QUEUE_URL", "http://ots-result-queue.scanners.svc.cluster.local"
)
DEST_URL = lambda ots : OTS_QUEUE_URL if ots else QUEUE_URL
def dispatch_results(payload, client, ots):
client.post(DEST_URL(ots) + "/dns", json=payload)
logging.info("Scan results dispatched to result queue")
def Server(server_client=requests):
async def scan(scan_request):
logging.info("Scan request received")
inbound_payload = await scan_request.json()
start_time = dt.datetime.now()
try:
domain = inbound_payload["domain"]
user_key = inbound_payload["user_key"]
selectors = inbound_payload.get("selectors", [])
domain_key = inbound_payload["domain_key"]
shared_id = inbound_payload["shared_id"]
except KeyError:
logging.error(f"Invalid scan request format received: {str(inbound_payload)}")
return Response("Invalid Format", status_code=400)
logging.info(f"Performing scan on {inbound_payload['domain']}...")
try:
scanner = DMARCScanner(domain)
start = time.time()
future = scanner.run()
scan_results = future.result()
if len(selectors) != 0:
scanner = DKIMScanner(domain, selectors)
start = time.time()
future = scanner.run()
scan_results["dkim"] = future.result()
else:
logging.info("No DKIM selector strings provided")
scan_results["dkim"] = {"error": "missing"}
except TimeoutError:
logging.error(f"Timeout while scanning {domain} (Aborted after {round(time.time()-start, 2)} seconds)")
outbound_payload = {
"results": {
"dmarc": {"error": "missing"},
"spf": {"error": "missing"},
"mx": {"error": "missing"},
"dkim": {"error": "missing"},
},
"scan_type": "dns",
"user_key": user_key,
"domain_key": domain_key,
"shared_id": shared_id
}
dispatch_results(outbound_payload, server_client, (user_key is not None))
return Response("Timeout occurred while scanning", status_code=500)
outbound_payload = {
"results": scan_results,
"scan_type": "dns",
"user_key": user_key,
"domain_key": domain_key,
"shared_id": shared_id
}
logging.info(f"Scan results: {str(scan_results)}")
end_time = dt.datetime.now()
elapsed_time = end_time - start_time
dispatch_results(outbound_payload, server_client, (user_key is not None))
logging.info(f"DNS scan completed in {elapsed_time.total_seconds()} seconds.")
return Response("Scan completed")
async def startup():
logging.info(emoji.emojize("ASGI server started :rocket:"))
async def shutdown():
logging.info(emoji.emojize("ASGI server shutting down..."))
routes = [
Route("/", scan, methods=["POST"]),
]
starlette_app = Starlette(
debug=True, routes=routes, on_startup=[startup], on_shutdown=[shutdown]
)
return starlette_app
app = Server()