Skip to content

Commit

Permalink
🐛 Source Pinterest: Fix backoff waiting time (#32672)
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 authored and git-phu committed Nov 28, 2023
1 parent 57a3075 commit 9604fbc
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003
dockerImageTag: 0.8.1
dockerImageTag: 0.8.2
dockerRepository: airbyte/source-pinterest
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PinterestAnalyticsReportStream(PinterestAnalyticsStream):
Details - https://developers.pinterest.com/docs/api/v5/#operation/analytics/create_report"""

http_method = "POST"
report_wait_timeout = 180
report_wait_timeout = 60 * 10
report_generation_maximum_retries = 5

@property
Expand Down Expand Up @@ -69,17 +69,17 @@ def request_params(

def backoff_max_time(func):
def wrapped(self, *args, **kwargs):
return backoff.on_exception(backoff.constant, RetryableException, max_time=self.report_wait_timeout * 60, interval=10)(func)(
return backoff.on_exception(backoff.constant, RetryableException, max_time=self.report_wait_timeout, interval=10)(func)(
self, *args, **kwargs
)

return wrapped

def backoff_max_tries(func):
def wrapped(self, *args, **kwargs):
return backoff.on_exception(backoff.expo, ReportGenerationFailure, max_tries=self.report_generation_maximum_retries)(func)(
self, *args, **kwargs
)
return backoff.on_exception(
backoff.expo, ReportGenerationFailure, max_tries=self.report_generation_maximum_retries, max_time=self.report_wait_timeout
)(func)(self, *args, **kwargs)

return wrapped

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class NonJSONResponse(Exception):
pass


class RateLimitExceeded(Exception):
pass


class PinterestStream(HttpStream, ABC):
url_base = "https://api.pinterest.com/v5/"
primary_key = "id"
Expand Down Expand Up @@ -90,7 +94,12 @@ def should_retry(self, response: requests.Response) -> bool:
def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == requests.codes.too_many_requests:
self.logger.error(f"For stream {self.name} rate limit exceeded.")
return float(response.headers.get("X-RateLimit-Reset", 0))
sleep_time = float(response.headers.get("X-RateLimit-Reset", 0))
if sleep_time > 600:
raise RateLimitExceeded(
f"Rate limit exceeded for stream {self.name}. Waiting time is longer than 10 minutes: {sleep_time}s."
)
return sleep_time


class PinterestSubStream(HttpSubStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,14 @@ def test_check_wrong_date_connection(wrong_date_config):

@responses.activate
def test_check_connection_expired_token(test_config):
responses.add(
responses.POST,
"https://api.pinterest.com/v5/oauth/token",
status=401
)
responses.add(responses.POST, "https://api.pinterest.com/v5/oauth/token", status=401)
source = SourcePinterest()
logger_mock = MagicMock()
assert source.check_connection(logger_mock, test_config) == (False,
'Try to re-authenticate because current refresh token is not valid. ' \
'401 Client Error: Unauthorized for url: https://api.pinterest.com/v5/oauth/token')
assert source.check_connection(logger_mock, test_config) == (
False,
"Try to re-authenticate because current refresh token is not valid. "
"401 Client Error: Unauthorized for url: https://api.pinterest.com/v5/oauth/token",
)


def test_get_authenticator(test_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Keywords,
PinterestStream,
PinterestSubStream,
RateLimitExceeded,
UserAccountAnalytics,
)

Expand Down Expand Up @@ -148,6 +149,12 @@ def test_non_json_response(requests_mock):
"test_response, test_headers, status_code, expected",
[
({"code": 7, "message": "Some other error message"}, {"X-RateLimit-Reset": "2"}, 429, 2.0),
(
{"code": 7, "message": "Some other error message"},
{"X-RateLimit-Reset": "2000"},
429,
(RateLimitExceeded, "Rate limit exceeded for stream boards. Waiting time is longer than 10 minutes: 2000.0s."),
),
],
)
def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code, test_headers, expected):
Expand All @@ -161,8 +168,13 @@ def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code,
)

response = requests.get(url)
result = stream.backoff_time(response)
assert result == expected

if isinstance(expected, tuple):
with pytest.raises(expected[0], match=expected[1]):
stream.backoff_time(response)
else:
result = stream.backoff_time(response)
assert result == expected


@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit 9604fbc

Please sign in to comment.