forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathremove_organization.py
More file actions
166 lines (147 loc) · 6.47 KB
/
remove_organization.py
File metadata and controls
166 lines (147 loc) · 6.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import graphene
from graphql import GraphQLError
from app import logger
from db import db_session
from functions.auth_wrappers import require_token
from functions.auth_functions import is_super_admin
from functions.input_validators import cleanse_input
from models import (
Organizations,
User_affiliations,
Domains,
Web_scans,
Mail_scans,
Ssl_scans,
Spf_scans,
Https_scans,
Dkim_scans,
Dmarc_scans,
Mx_scans,
)
from scalars.slug import Slug
class RemoveOrganizationInput(graphene.InputObjectType):
"""
Input object containing the required fields for the removeOrganization
mutation.
"""
slug = Slug(
required=True,
description="The slugified organization name of the organization you"
" wish to remove.",
)
class RemoveOrganization(graphene.Mutation):
"""
Mutation allows the removal of an organization inside the database.
"""
class Arguments:
input = RemoveOrganizationInput(
required=True,
description="Input fields required for the removeOrganization mutation.",
)
status = graphene.Boolean()
@require_token
def mutate(self, info, **kwargs):
# Get arguments from mutation
user_id = kwargs.get("user_id")
user_roles = kwargs.get("user_roles")
slug = cleanse_input(kwargs.get("input", {}).get("slug"))
# Restrict the deletion of SA Org
if slug == "super-admin":
logger.warning(f"User: {user_id} tried to remove super-admin org.")
raise GraphQLError("Error, unable to remove organization.")
# Check to see if org exists
org_orm = (
db_session.query(Organizations).filter(Organizations.slug == slug).first()
)
if org_orm is None:
logger.warning(
f"User: {user_id} tried to remove {slug} but org does not exist."
)
raise GraphQLError("Error, unable to remove organization.")
# Check Permissions
if is_super_admin(user_roles=user_roles):
# XXX shouldn't cascade delete do all of this for us?
try:
# Get All Domains
domain_orm = Domains.query.filter(
Domains.organization_id == org_orm.id
).all()
if len(domain_orm) > 0:
# Loop Through All Domains
for domain in domain_orm:
# Get All Web Scans
web_scan_orm = Web_scans.query.filter(
Web_scans.domain_id == domain.id
).all()
# Delete All Related Scans
for scan in web_scan_orm:
try:
Https_scans.query.filter(
Https_scans.id == scan.id
).delete()
Ssl_scans.query.filter(Ssl_scans.id == scan.id).delete()
Web_scans.query.filter(Web_scans.id == scan.id).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried removing {slug}, but error occured when removing web scans {str(e)}"
)
return RemoveOrganization(status=False)
# Get All Web Scans
mail_scan_orm = Mail_scans.query.filter(
Mail_scans.domain_id == domain.id
).all()
# Delete All Related Scans
for scan in mail_scan_orm:
try:
Dkim_scans.query.filter(
Dkim_scans.id == scan.id
).delete()
Dmarc_scans.query.filter(
Dmarc_scans.id == scan.id
).delete()
Mx_scans.query.filter(Mx_scans.id == scan.id).delete()
Spf_scans.query.filter(Spf_scans.id == scan.id).delete()
Mail_scans.query.filter(
Mail_scans.id == scan.id
).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried removing {slug}, but error occured when removing mail scans {str(e)}"
)
return RemoveOrganization(status=False)
# Delete Domains
try:
Domains.query.filter(Domains.id == domain.id).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried removing {slug}, but error occured when removing domains {str(e)}"
)
return RemoveOrganization(status=False)
try:
# Get all user aff
User_affiliations.query.filter(
User_affiliations.organization_id == org_orm.id
).delete()
except Exception as e:
logger.error(
f"User: {user_id} tried removing {slug}, but error occured when removing user affiliations {str(e)}"
)
return RemoveOrganization(status=False)
db_session.delete(org_orm)
db_session.commit()
logger.info(
f"User: {user_id} successfully removed {slug} organization."
)
return RemoveOrganization(status=True)
except Exception as e:
db_session.rollback()
db_session.flush()
logger.error(
f"User: {user_id} tried removing {slug}, but error occured when removing the organization {str(e)}"
)
return RemoveOrganization(status=False)
else:
logger.warning(
f"User: {user_id} tried to remove {slug} organization but does not have access to remove organizations."
)
raise GraphQLError("Error, unable to remove organization.")