From 463378f1a0baba10e4895b40336500b3a1f06ff1 Mon Sep 17 00:00:00 2001 From: pnilan Date: Mon, 21 Oct 2024 10:15:45 -0700 Subject: [PATCH 1/3] calls auth(request) on retry backoff/retry attempts --- .../python/airbyte_cdk/sources/streams/http/http_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py index 0924ef28021f..72b2021f6e19 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py @@ -249,6 +249,8 @@ def _send( self._request_attempt_count[request] = 1 else: self._request_attempt_count[request] += 1 + if isinstance(self._session.auth, AuthBase): + self._session.auth(request) self._logger.debug( "Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body} From 07e618e49cd25c13876cd077801d7e637ef9112a Mon Sep 17 00:00:00 2001 From: pnilan Date: Mon, 21 Oct 2024 10:30:05 -0700 Subject: [PATCH 2/3] chore: type-check --- .../python/airbyte_cdk/sources/streams/http/http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py index 72b2021f6e19..59eacb7cab7d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py @@ -4,6 +4,7 @@ import logging import os +import orjson import urllib from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union @@ -45,7 +46,6 @@ from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from orjson import orjson from requests.auth import AuthBase BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH") From cd3e214e2c96cd4d5230b764d270159235bdd7d3 Mon Sep 17 00:00:00 2001 From: pnilan Date: Mon, 21 Oct 2024 11:08:40 -0700 Subject: [PATCH 3/3] reduce _send complexity by pulling our error resolution handling --- .../sources/streams/http/http_client.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py index 59eacb7cab7d..cccbc4b8c01c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http_client.py @@ -4,11 +4,11 @@ import logging import os -import orjson import urllib from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +import orjson import requests import requests_cache from airbyte_cdk.models import ( @@ -249,7 +249,7 @@ def _send( self._request_attempt_count[request] = 1 else: self._request_attempt_count[request] += 1 - if isinstance(self._session.auth, AuthBase): + if hasattr(self._session, "auth") and isinstance(self._session.auth, AuthBase): self._session.auth(request) self._logger.debug( @@ -287,6 +287,20 @@ def _send( lambda: formatter(response), # type: ignore # log_formatter is always cast to a callable ) + self._handle_error_resolution( + response=response, exc=exc, request=request, error_resolution=error_resolution, exit_on_rate_limit=exit_on_rate_limit + ) + + return response # type: ignore # will either return a valid response of type requests.Response or raise an exception + + def _handle_error_resolution( + self, + response: Optional[requests.Response], + exc: Optional[requests.RequestException], + request: requests.PreparedRequest, + error_resolution: ErrorResolution, + exit_on_rate_limit: Optional[bool] = False, + ) -> None: # Emit stream status RUNNING with the reason RATE_LIMITED to log that the rate limit has been reached if error_resolution.response_action == ResponseAction.RATE_LIMITED: # TODO: Update to handle with message repository when concurrent message repository is ready @@ -364,8 +378,6 @@ def _send( self._logger.error(response.text) raise e - return response # type: ignore # will either return a valid response of type requests.Response or raise an exception - @property def name(self) -> str: return self._name