-
Notifications
You must be signed in to change notification settings - Fork 296
/
Copy pathclient.py
173 lines (137 loc) · 6.32 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import logging
import typing
from typing import Optional, Tuple
from django.utils import timezone
from rest_framework import status
from slack_sdk.errors import SlackApiError as SlackSDKApiError
from slack_sdk.http_retry import HttpRequest, HttpResponse, RetryHandler, RetryState, default_retry_handlers
from slack_sdk.web import SlackResponse, WebClient
from apps.slack.errors import (
SlackAPIError,
SlackAPIRatelimitError,
SlackAPIServerError,
SlackAPITokenError,
UnexpectedResponse,
)
if typing.TYPE_CHECKING:
from apps.slack.models import SlackTeamIdentity
logger = logging.getLogger(__name__)
class SlackServerErrorRetryHandler(RetryHandler):
"""Retry failed Slack API calls on Slack server errors"""
def _can_retry(
self,
*,
state: RetryState,
request: HttpRequest,
response: Optional[HttpResponse] = None,
error: Optional[Exception] = None,
) -> bool:
# Retry Slack API call on 5xx errors
if response and response.status_code in [
status.HTTP_500_INTERNAL_SERVER_ERROR,
status.HTTP_503_SERVICE_UNAVAILABLE,
status.HTTP_504_GATEWAY_TIMEOUT,
]:
return True
# Retry Slack API call on "internal_error" and "fatal_error" errors
if response and response.body and response.body.get("error") in SlackAPIServerError.errors:
return True
return False
server_error_retry_handler = SlackServerErrorRetryHandler(max_retry_count=2)
class SlackClient(WebClient):
def __init__(self, slack_team_identity: "SlackTeamIdentity", timeout: int = 30) -> None:
super().__init__(
token=slack_team_identity.bot_access_token,
timeout=timeout,
retry_handlers=default_retry_handlers() + [server_error_retry_handler],
)
self.slack_team_identity = slack_team_identity
def paginated_api_call(self, method: str, paginated_key: str, **kwargs):
"""
`paginated_key` represents a key from the response which is paginated. For example "users" or "channels"
"""
api_method = getattr(self, method)
response = api_method(**kwargs)
cumulative_response = response.data
while (
"response_metadata" in response
and "next_cursor" in response["response_metadata"]
and response["response_metadata"]["next_cursor"] != ""
):
kwargs["cursor"] = response["response_metadata"]["next_cursor"]
response = api_method(**kwargs).data
cumulative_response[paginated_key] += response[paginated_key]
return cumulative_response
def paginated_api_call_with_ratelimit(
self, method: str, paginated_key: str, **kwargs
) -> Tuple[dict, Optional[str], bool]:
"""
This method does paginated api calls and handle slack rate limit errors in order to return collected data
and have the ability to continue doing paginated requests from the last successful cursor.
Return last successful cursor instead of next cursor to avoid data loss during delay time.
`paginated_key` represents a key from the response which is paginated. For example "users" or "channels"
"""
api_method = getattr(self, method)
cumulative_response = {}
cursor = kwargs["cursor"]
rate_limited = False
try:
response = api_method(**kwargs).data
cumulative_response = response
cursor = response["response_metadata"]["next_cursor"]
while (
"response_metadata" in response
and "next_cursor" in response["response_metadata"]
and response["response_metadata"]["next_cursor"] != ""
):
next_cursor = response["response_metadata"]["next_cursor"]
kwargs["cursor"] = next_cursor
response = api_method(**kwargs).data
cumulative_response[paginated_key] += response[paginated_key]
cursor = next_cursor
except SlackAPIRatelimitError:
rate_limited = True
return cumulative_response, cursor, rate_limited
def api_call(self, *args, **kwargs) -> SlackResponse:
"""Wrap Slack SDK api_call with more granular error handling and logging"""
try:
response = super().api_call(*args, **kwargs)
self._unmark_token_revoked() # unmark token as revoked if the API call was successful
return response
except SlackSDKApiError as e:
logger.error(
"Slack API call error! slack_team_identity={} args={} kwargs={} status={} error={} response={}".format(
self.slack_team_identity.pk,
args,
kwargs,
e.response["status"] if isinstance(e.response, dict) else e.response.status_code,
e.response.get("error"),
e.response,
)
)
# narrow down the error
error_class = self._get_error_class(e.response)
# mark / unmark token as revoked
if error_class is SlackAPITokenError:
self._mark_token_revoked()
else:
self._unmark_token_revoked()
# raise the narrowed down error class
raise error_class(e.response) from e
@staticmethod
def _get_error_class(response: UnexpectedResponse | SlackResponse) -> typing.Type[SlackAPIError]:
"""Get an appropriate error class for the response"""
if isinstance(response, dict): # UnexpectedResponse
return SlackAPIServerError
for error_class in SlackAPIError.__subclasses__():
if response["error"] in error_class.errors:
return error_class
return SlackAPIError
def _mark_token_revoked(self) -> None:
if not self.slack_team_identity.detected_token_revoked:
self.slack_team_identity.detected_token_revoked = timezone.now()
self.slack_team_identity.save(update_fields=["detected_token_revoked"])
def _unmark_token_revoked(self) -> None:
if self.slack_team_identity.detected_token_revoked:
self.slack_team_identity.detected_token_revoked = None
self.slack_team_identity.save(update_fields=["detected_token_revoked"])