|
2 | 2 | This is where we handle everything regarding to authorization
|
3 | 3 | and authentication. Also stores helper functions related to it.
|
4 | 4 | """
|
| 5 | +import re |
| 6 | + |
| 7 | +import jwt |
5 | 8 | from faker import Faker
|
| 9 | +from flask import request |
| 10 | +from flask_restplus import abort |
| 11 | +from flask_restplus._http import HTTPStatus |
| 12 | +from jwt import DecodeError, ExpiredSignatureError |
6 | 13 |
|
7 | 14 | fake = Faker()
|
8 | 15 |
|
9 | 16 | dev_secret_key: str = None
|
10 | 17 |
|
| 18 | +authorizations = { |
| 19 | + "TimeTracker JWT": { |
| 20 | + 'type': 'apiKey', |
| 21 | + 'in': 'header', |
| 22 | + 'name': 'Authorization', |
| 23 | + 'description': "Specify in the value **'Bearer <JWT>'**, where JWT is the token", |
| 24 | + } |
| 25 | +} |
| 26 | + |
| 27 | +iss_claim_pattern = re.compile( |
| 28 | + r"securityioet.b2clogin.com/(?P<tenant_id>[0-9a-f]{8}\-[0-9a-f]{4}\-4[0-9a-f]{3}\-[89ab][0-9a-f]{3}\-[0-9a-f]{12})") |
| 29 | + |
11 | 30 |
|
12 | 31 | def current_user_id() -> str:
|
13 |
| - """ |
14 |
| - Returns the id of the authenticated user in |
15 |
| - Azure Active Directory |
16 |
| - """ |
17 |
| - return 'anonymous' |
| 32 | + oid_claim = get_token_json().get("oid") |
| 33 | + if oid_claim is None: |
| 34 | + abort(message='The claim "oid" is missing in the JWT', code=HTTPStatus.UNAUTHORIZED) |
| 35 | + |
| 36 | + return oid_claim |
18 | 37 |
|
19 | 38 |
|
20 | 39 | def current_user_tenant_id() -> str:
|
21 |
| - # TODO Get this from the JWT |
22 |
| - return "ioet" |
| 40 | + iss_claim = get_token_json().get("iss") |
| 41 | + if iss_claim is None: |
| 42 | + abort(message='The claim "iss" is missing in the JWT', code=HTTPStatus.UNAUTHORIZED) |
| 43 | + |
| 44 | + tenant_id = parse_tenant_id_from_iss_claim(iss_claim) |
| 45 | + if tenant_id is None: |
| 46 | + abort(message='The format of the claim "iss" cannot be understood. ' |
| 47 | + 'Please contact the development team.', |
| 48 | + code=HTTPStatus.UNAUTHORIZED) |
23 | 49 |
|
| 50 | + return tenant_id |
24 | 51 |
|
25 |
| -def generate_dev_secret_key(): |
26 |
| - from time_tracker_api import flask_app as app |
27 |
| - """ |
28 |
| - Generates a security key for development purposes |
29 |
| - :return: str |
30 |
| - """ |
| 52 | + |
| 53 | +def get_or_generate_dev_secret_key(): |
31 | 54 | global dev_secret_key
|
32 |
| - dev_secret_key = fake.password(length=16, special_chars=True, digits=True, upper_case=True, lower_case=True) |
33 |
| - if app.config.get("FLASK_DEBUG", False): # pragma: no cover |
34 |
| - print('*********************************************************') |
35 |
| - print("The generated secret is \"%s\"" % dev_secret_key) |
36 |
| - print('*********************************************************') |
| 55 | + if dev_secret_key is None: |
| 56 | + from time_tracker_api import flask_app as app |
| 57 | + """ |
| 58 | + Generates a security key for development purposes |
| 59 | + :return: str |
| 60 | + """ |
| 61 | + dev_secret_key = fake.password(length=16, special_chars=True, digits=True, upper_case=True, lower_case=True) |
| 62 | + if app.config.get("FLASK_DEBUG", False): # pragma: no cover |
| 63 | + print('*********************************************************') |
| 64 | + print("The generated secret is \"%s\"" % dev_secret_key) |
| 65 | + print('*********************************************************') |
37 | 66 | return dev_secret_key
|
| 67 | + |
| 68 | + |
| 69 | +def parse_jwt(authentication_header_content): |
| 70 | + if authentication_header_content is not None: |
| 71 | + parsed_content = authentication_header_content.split("Bearer ") |
| 72 | + |
| 73 | + if len(parsed_content) > 1: |
| 74 | + return jwt.decode(parsed_content[1], verify=False) |
| 75 | + |
| 76 | + return None |
| 77 | + |
| 78 | + |
| 79 | +def get_authorization_jwt(): |
| 80 | + auth_header = request.headers.get('Authorization') |
| 81 | + return parse_jwt(auth_header) |
| 82 | + |
| 83 | + |
| 84 | +def get_token_json(): |
| 85 | + try: |
| 86 | + return get_authorization_jwt() |
| 87 | + except DecodeError: |
| 88 | + abort(message='Malformed token', code=HTTPStatus.UNAUTHORIZED) |
| 89 | + except ExpiredSignatureError: |
| 90 | + abort(message='Expired token', code=HTTPStatus.UNAUTHORIZED) |
| 91 | + |
| 92 | + |
| 93 | +def parse_tenant_id_from_iss_claim(iss_claim: str) -> str: |
| 94 | + m = iss_claim_pattern.search(iss_claim) |
| 95 | + if m is not None: |
| 96 | + return m.group('tenant_id') |
| 97 | + |
| 98 | + return None |
0 commit comments