forked from canada-ca/tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexternal_graphql_api_request.py
More file actions
72 lines (60 loc) · 2.39 KB
/
external_graphql_api_request.py
File metadata and controls
72 lines (60 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
import os
from gql import Client
from gql.transport.requests import RequestsHTTPTransport
from graphql import GraphQLError
from app import logger
def create_transport(api_domain, auth_token) -> RequestsHTTPTransport:
"""
This function creates the transport object to send requests to an external
API
:param api_domain: The domain on which the external api is running on
:param auth_token: Authorization token for privileged access on external
api
:return: This function returns a newly created RequestsHTTPTransport object
"""
transport = RequestsHTTPTransport(
url=api_domain, verify=True, retries=3, headers={"Authorization": auth_token},
)
return transport
def create_client(api_domain, auth_token) -> Client:
"""
This function is used to create the client that will execute the query
:param api_domain: External API URL used to create the transport object
:param auth_token: Authorization token used in the creation of the transport object
:return: A gql Client that allows the execution of queries on an graphql API
"""
client = Client(
transport=create_transport(api_domain=api_domain, auth_token=auth_token),
fetch_schema_from_transport=True,
)
return client
def send_request(
variables: dict,
query,
summary_table=False,
api_domain=os.getenv("DMARC_REPORT_API_URL"),
auth_token=os.getenv("DMARC_REPORT_API_TOKEN"),
) -> dict:
"""
This function sends the request to the external API, with a pre-determined
query
:param api_domain: The domain on which the external API is running on
:param auth_token: Authorization token allowing privileged access to the
external API
:param request_domain: The domain slug that the user is requesting
information about
:param start_date: Pre-determined value of january 1st of last year
:param end_date: Pre-determined value of december 31st of next year
:return: Dictionary of data retrieved from external API
"""
try:
client = create_client(api_domain=api_domain, auth_token=auth_token)
data = client.execute(query, variables)
return data
except Exception as e:
logger.error(f"Error occurred on the dmarc-report-api side: {str(e)}")
if summary_table:
return {}
else:
raise GraphQLError("Error when querying dmarc-report-api.")