-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
90 lines (66 loc) · 2.6 KB
/
database.py
File metadata and controls
90 lines (66 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
Agnostic database assets
Put here your utils and class independent of
the database solution.
To know more about protocols and subtyping check out PEP-0544
"""
import abc
from flask import Flask
from commons.data_access_layer.cosmos_db import CosmosDBDao
from commons.data_access_layer.database import EventContext
from time_tracker_api.security import current_user_id, current_user_tenant_id, current_role_user, roles
class CRUDDao(abc.ABC):
@abc.abstractmethod
def get_all(self, conditions: dict = None, **kwargs):
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def get(self, id):
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def create(self, project):
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def update(self, id, data, description=None):
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def delete(self, id):
raise NotImplementedError # pragma: no cover
class ApiEventContext(EventContext):
def __init__(self, container_id: str, action: str, description: str = None, user_id: str = None,
tenant_id: str = None, session_id: str = None, user_role: str = None):
super(ApiEventContext, self).__init__(container_id, action, description)
self._user_id = user_id
self._user_role = user_role
self._tenant_id = tenant_id
self._session_id = session_id
@property
def user_id(self) -> str:
if self._user_id is None:
self._user_id = current_user_id()
return self._user_id
@property
def user_role(self) -> str:
return self._user_role if self._user_role else current_role_user()
@property
def tenant_id(self) -> str:
if self._tenant_id is None:
self._tenant_id = current_user_tenant_id()
return self._tenant_id
@property
def session_id(self) -> str:
return self._session_id
@property
def is_admin(self):
return True if self.user_role == roles.get("admin").get("name") else False
class APICosmosDBDao(CosmosDBDao):
def create_event_context(self, action: str = None, description: str = None):
return ApiEventContext(self.repository.container.id, action,
description=description)
def init_app(app: Flask) -> None:
init_cosmos_db(app)
def init_sql(app: Flask) -> None:
from commons.data_access_layer.sql import init_app
init_app(app)
def init_cosmos_db(app: Flask) -> None:
from commons.data_access_layer.cosmos_db import init_app
init_app(app)