diff --git a/tests/time_tracker_api/users/users_namespace_test.py b/tests/time_tracker_api/users/users_namespace_test.py index 46ff68fa..95293235 100644 --- a/tests/time_tracker_api/users/users_namespace_test.py +++ b/tests/time_tracker_api/users/users_namespace_test.py @@ -3,6 +3,7 @@ from flask.testing import FlaskClient from flask_restplus._http import HTTPStatus from utils.azure_users import AzureConnection +from pytest import mark def test_users_response_contains_expected_props( @@ -76,3 +77,32 @@ def test_on_delete_update_user_role_is_being_called_with_valid_arguments( assert HTTPStatus.OK == response.status_code update_user_role_mock.assert_called_once_with(user_id, role=None) + + +@patch('utils.azure_users.AzureConnection.update_role', new_callable=Mock) +@mark.parametrize( + 'role_id,action,is_grant', + [ + ('admin', 'grant', True), + ('admin', 'revoke', False), + ('test', 'grant', True), + ('test', 'revoke', False), + ], +) +def test_update_role_is_called_properly_on_each_action( + update_role_mock, + client: FlaskClient, + valid_header: dict, + user_id: str, + role_id, + action, + is_grant, +): + response = client.post( + f'/users/{user_id}/roles/{role_id}/{action}', headers=valid_header, + ) + + assert HTTPStatus.OK == response.status_code + update_role_mock.assert_called_once_with( + user_id, role_id, is_grant=is_grant + ) diff --git a/time_tracker_api/users/users_namespace.py b/time_tracker_api/users/users_namespace.py index 095c93b6..5aacf8bc 100644 --- a/time_tracker_api/users/users_namespace.py +++ b/time_tracker_api/users/users_namespace.py @@ -87,3 +87,32 @@ class UserRole(Resource): def delete(self, user_id, role_id): """Delete user's role""" return AzureConnection().update_user_role(user_id, role=None) + + +@ns.route('//roles//grant') +@ns.param('user_id', 'The user identifier') +@ns.param('role_id', 'The role name identifier') +class GrantRole(Resource): + @ns.doc('grant_role') + @ns.marshal_with(user_response_fields) + def post(self, user_id, role_id): + """ + Grant role to user + Available options for `role_id`: + ``` + - test + - admin + ``` + """ + return AzureConnection().update_role(user_id, role_id, is_grant=True) + + +@ns.route('//roles//revoke') +@ns.param('user_id', 'The user identifier') +@ns.param('role_id', 'The role name identifier') +class RevokeRole(Resource): + @ns.doc('revoke_role') + @ns.marshal_with(user_response_fields) + def post(self, user_id, role_id): + """Revoke role to user""" + return AzureConnection().update_role(user_id, role_id, is_grant=False) diff --git a/utils/azure_users.py b/utils/azure_users.py index 15ec85b6..d3c660ec 100644 --- a/utils/azure_users.py +++ b/utils/azure_users.py @@ -45,6 +45,20 @@ def __init__(self, id, name, email, role): self.role = role +HTTP_PATCH_HEADERS = { + 'Content-type': 'application/json', + 'Accept': 'application/json', +} + +ROLE_FIELD_VALUES = { + 'admin': ( + 'extension_1d76efa96f604499acc0c0ee116a1453_role', + 'time_tracker_admin', + ), + 'test': ('waitforrealvalue', 'waitforrealvalue'), +} + + class AzureConnection: def __init__(self, config=MSConfig): self.client = msal.ConfidentialClientApplication( @@ -78,10 +92,6 @@ def users(self) -> List[AzureUser]: return [self.to_azure_user(item) for item in response.json()['value']] def update_user_role(self, id, role): - headers = { - 'Content-type': 'application/json', - 'Accept': 'application/json', - } endpoint = "{endpoint}/users/{user_id}?api-version=1.6".format( endpoint=self.config.ENDPOINT, user_id=id ) @@ -90,7 +100,7 @@ def update_user_role(self, id, role): endpoint, auth=BearerAuth(self.access_token), data=json.dumps(data), - headers=headers, + headers=HTTP_PATCH_HEADERS, ) assert 204 == response.status_code @@ -108,3 +118,30 @@ def to_azure_user(self, item) -> AzureUser: email = item['otherMails'][0] if there_is_email else '' role = item[self.role_field] if there_is_role else None return AzureUser(id, name, email, role) + + def update_role(self, user_id, role_id, is_grant): + endpoint = "{endpoint}/users/{user_id}?api-version=1.6".format( + endpoint=self.config.ENDPOINT, user_id=user_id + ) + + data = self.get_role_data(role_id, is_grant) + response = requests.patch( + endpoint, + auth=BearerAuth(self.access_token), + data=json.dumps(data), + headers=HTTP_PATCH_HEADERS, + ) + assert 204 == response.status_code + + response = requests.get(endpoint, auth=BearerAuth(self.access_token)) + assert 200 == response.status_code + + return self.to_azure_user(response.json()) + + def get_role_data(self, role_id, is_grant=True): + assert role_id in ROLE_FIELD_VALUES.keys() + field_name, field_value = ROLE_FIELD_VALUES[role_id] + if is_grant: + return {field_name: field_value} + else: + return {field_name: None}