forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser.py
More file actions
133 lines (115 loc) · 4.95 KB
/
user.py
File metadata and controls
133 lines (115 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import pyotp
import os
from sqlalchemy.orm import load_only
from graphql import GraphQLError
from app import logger
from db import db_session
from functions.auth_wrappers import require_token
from functions.auth_functions import is_super_admin, is_admin
from models import (
Users,
User_affiliations,
)
from schemas.user.user import User
@require_token
def resolve_user(self, info, **kwargs):
"""
This function is used to resolve a users information such as user profile,
etc. If an email address argument is present then the user will have to be
an admin belonging to the same org to see the users information.
:param self: User SQLAlchemyObject type defined in the schemas directory
:param info: Request information sent to the sever from a client
:param kwargs: Field arguments and user_roles
:return: Filtered User SQLAlchemyObject Type
"""
# Get information from kwargs
user_id = kwargs.get("user_id")
user_roles = kwargs.get("user_roles")
user_name = kwargs.get("user_name", None)
# Generate user Org ID list
org_ids = []
for role in user_roles:
org_ids.append(role["org_id"])
# Get initial query
query = User.get_query(info)
# Check to see if user is requesting a specific user
if user_name is not None:
# Get the requested user
req_user_orm = (
db_session.query(Users)
.filter(Users.user_name == user_name)
.options(load_only("id"))
.first()
)
# Check to see if user actually exists
if req_user_orm is None:
logger.warning(
f"User: {user_id}, tried to find this user {user_name} but the account does not exist."
)
raise GraphQLError("Error, user cannot be found.")
else:
req_user_id = req_user_orm.id
# Get org id's that the user belongs to
req_org_orms = (
db_session.query(User_affiliations)
.filter(User_affiliations.user_id == req_user_id)
.options(load_only("organization_id"))
.all()
)
# Check to ensure the user belongs to at least one organization
if req_org_orms is None:
logger.warning(
f"User: {user_id}, tried to find this user {user_name} but the account does not belong to any organization."
)
raise GraphQLError("Error, user cannot be found.")
else:
# Compile list of org id's the user belongs to
req_user_org_ids = []
for org_id in req_org_orms:
req_user_org_ids.append(org_id.organization_id)
# Check to see if the requested user is a super admin and if true return
if is_super_admin(user_roles=user_roles):
logger.info(
f"Super admin {user_id} successfully retrieved the user information for this user {user_name}."
)
return query.filter(Users.id == req_user_id)
elif user_id == req_user_id:
logger.info(
f"User {user_id} successfully retrieved their own user information using this username {user_name}."
)
return query.filter(Users.id == req_user_id)
# Declare return list, and check
rtn_query = []
# Go through each org id that the user belongs to
for req_org_id in req_user_org_ids:
# Check to see if the requesting user and requested user belong to
# the same org
if req_org_id in org_ids:
# Check to see if the requesting user has admin rights to org
if is_admin(user_roles=user_roles, org_id=req_org_id):
# If admin and user share multiple orgs compile list and
# return
rtn_query.append(query.filter(Users.id == req_user_id).first())
logger.info(
f"User {user_id} successfully retrieved the user information for this user {user_name}."
)
return rtn_query
# Give error if requesting user and requested user do not share an org
logger.warning(
f"User: {user_id}, tried to find this user {user_name} but does not have admin access to any similar organizations."
)
raise GraphQLError("Error, user cannot be found.")
# Return user profile of requesting user
else:
logger.info(f"User {user_id} successfully retrieved their own information.")
return query.filter(Users.id == user_id)
def resolve_generate_otp_url(self, info, email):
"""
This resolver adds an api endpoint that returns a url for OTP validation
:param email: The email address that will be associated with the URL generated
:returns: The url that was generated.
"""
totp = pyotp.totp.TOTP(
os.getenv("BASE32_SECRET")
) # This needs to be a 16 char base32 secret key
return totp.provisioning_uri(email, issuer_name="Tracker")