forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfire_scan.py
More file actions
executable file
·106 lines (92 loc) · 3.69 KB
/
fire_scan.py
File metadata and controls
executable file
·106 lines (92 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import datetime
import requests
from typing import List
from graphql import GraphQLError
from app import logger
from db import db_session
from models import Web_scans, Mail_scans, Domains
QUEUE_URL = "http://scan-queue.scanners.svc.cluster.local"
def fire_scan(
user_id: int, domain_id: int, url: str, scan_type: str, selectors: List[str]
):
"""
Functionality to send request to scanners and request a domain to get scanned
:param user_id: The id of the requesting user
:param domain_id: The id of the domain
:param url: URL passed in through the request
:param scan_type: Type of scan to be performed ("Web" || "Mail")
:param selectors: (Optional) List of DKIM selector strings
:return: Status code returned from request
"""
# Get Time
scan_datetime = datetime.datetime.utcnow()
# Create Scan Object
if scan_type == "mail":
new_scan = Mail_scans(
domain_id=domain_id,
scan_date=scan_datetime,
selectors=selectors,
initiated_by=user_id,
)
db_session.add(new_scan)
elif scan_type == "web":
new_scan = Web_scans(
domain_id=domain_id, scan_date=scan_datetime, initiated_by=user_id
)
db_session.add(new_scan)
else:
logger.warning(
f"User: {user_id} attempted to request a scan with an invalid type."
)
raise GraphQLError("Error, unable to request scan.")
# Update Domain Tables Last Run
Domains.query.filter(Domains.id == domain_id).update({"last_run": scan_datetime})
db_session.commit()
try:
# Get latest scan entry
if scan_type == "mail":
scan_orm = (
db_session.query(Mail_scans).order_by(Mail_scans.id.desc()).first()
)
scan_id = scan_orm.id
payload = {
"scan_id": scan_id,
"domain": url,
"selectors": selectors,
}
dns_status = requests.post(QUEUE_URL + "/dns", json=payload)
assert dns_status.text == "DNS scan request enqueued."
confirmation = "Mail scan request enqueued."
elif scan_type == "web":
scan_orm = db_session.query(Web_scans).order_by(Web_scans.id.desc()).first()
scan_id = scan_orm.id
payload = {
"scan_id": scan_id,
"domain": url,
}
confirmation = "DNS scan request enqueued."
https_status = requests.post(QUEUE_URL + "/https", json=payload)
assert https_status.text == "HTTPS scan request enqueued."
ssl_status = requests.post(QUEUE_URL + "/ssl", json=payload)
assert ssl_status.text == "SSL scan request enqueued."
confirmation = "Web scan requests enqueued."
except Exception as e:
# If scan fails delete latest scan entry to keep db clear of scans with no data
try:
if scan_type == "mail":
db_session.query(Mail_scans).filter(
Mail_scans.id == scan_orm.id
).delete()
elif scan_type == "web":
db_session.query(Web_scans).filter(Web_scans.id == scan_orm.id).delete()
db_session.commit()
except Exception as se:
logger.error(
f"User: {user_id} an error occurred while scan was being deleted from database: {str(se)}"
)
raise GraphQLError("Error, unable to request scan.")
logger.error(
f"User: {user_id} attempted to request a scan but an error arouse with the dispatcher: {str(e)}"
)
raise GraphQLError("Error, unable to request scan.")
return confirmation