forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremove_domain.py
More file actions
126 lines (107 loc) · 4.52 KB
/
remove_domain.py
File metadata and controls
126 lines (107 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import graphene
from graphql import GraphQLError
from app import logger
from db import db_session
from functions.auth_wrappers import require_token
from functions.auth_functions import is_user_write
from functions.input_validators import cleanse_input
from models import (
Domains,
Web_scans,
Mail_scans,
Dkim_scans,
Dmarc_scans,
Https_scans,
Mx_scans,
Spf_scans,
Ssl_scans,
)
from scalars.url import URL
class RemoveDomainInputObject(graphene.InputObjectType):
"""
Input object containing all required fields for the removeDomain mutation
"""
url = URL(description="URL of domain that is being removed", required=True,)
class RemoveDomain(graphene.Mutation):
"""
This mutation allows the removal of unused domains
"""
class Arguments:
input = RemoveDomainInputObject(
required=True,
description="Input object containing all required fields for "
"mutation to run.",
)
status = graphene.Boolean()
@require_token
def mutate(self, info, **kwargs):
user_id = kwargs.get("user_id")
user_roles = kwargs.get("user_roles")
domain = cleanse_input(kwargs.get("input", {}).get("url"))
# Check to see if domain exists
domain_orm = Domains.query.filter(Domains.domain == domain).first()
# Check to see if domain exists
if domain_orm is None:
logger.warning(
f"User: {user_id} tried to remove domain: {domain}, but it does not exist."
)
raise GraphQLError("Error, unable to remove domain.")
# Check permissions
if is_user_write(user_roles=user_roles, org_id=domain_orm.organization_id):
try:
# Get Domain Id
domain_id = Domains.query.filter(Domains.domain == domain).first().id
# Get All Web Scans
webscans = (
db_session.query(Web_scans)
.filter(Web_scans.domain_id == domain_id)
.all()
)
# Remove all related web scans
for scan in webscans:
try:
Https_scans.query.filter(Https_scans.id == scan.id).delete()
Ssl_scans.query.filter(Ssl_scans.id == scan.id).delete()
Web_scans.query.filter(Web_scans.id == scan.id).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried to remove {domain} web scans but a database error occurred: {str(e)}"
)
return RemoveDomain(status=False)
# Get all Mail Scans
mailscans = (
db_session.query(Mail_scans)
.filter(Mail_scans.domain_id == domain_id)
.all()
)
# Remove all related mail scans
for scan in mailscans:
try:
Dkim_scans.query.filter(Dkim_scans.id == scan.id).delete()
Dmarc_scans.query.filter(Dmarc_scans.id == scan.id).delete()
Mx_scans.query.filter(Mx_scans.id == scan.id).delete()
Spf_scans.query.filter(Spf_scans.id == scan.id).delete()
Mail_scans.query.filter(Mail_scans.id == scan.id).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried to remove {domain} mail scans but a database error occurred: {str(e)}"
)
return RemoveDomain(status=False)
Domains.query.filter(Domains.domain == domain).delete()
db_session.commit()
logger.info(
f"User: {user_id} successfully removed {domain} and all related scans."
)
return RemoveDomain(status=True)
except Exception as e:
db_session.rollback()
db_session.flush()
logger.error(
f"User: {user_id} tried to remove {domain} but a database error occurred: {str(e)}"
)
return RemoveDomain(status=False)
else:
logger.warning(
f"User: {user_id} tried to remove {domain} but does not have proper access to remove this domain."
)
raise GraphQLError("Error, unable to remove domain.")