Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-cdk: Improve Error Handling in Legacy CDK #37576

Merged
merged 51 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
05002a9
Add initial http_request_sender and http_error_handler files, structure
pnilan Apr 24, 2024
d0493c0
Moves error_mapping to http_error_handler, adds error_message and bac…
pnilan Apr 24, 2024
d098074
Update for correct usage of max_tries/max_retries
pnilan Apr 24, 2024
ce3cef7
Updates class initialization and imports to __init__
pnilan Apr 24, 2024
72f9c49
Adds new `ErrorHandler` and `RetryStrategy` ABCs, adds new default `D…
pnilan Apr 26, 2024
93c3d0b
Revert `http.py`
pnilan Apr 26, 2024
e8ad170
chore: format code
pnilan Apr 26, 2024
3e28627
Update exports
pnilan Apr 26, 2024
0c0522b
Rename `RetryStrategy` to `BackoffStrategy`, add `Decoder` and `Error…
pnilan Apr 30, 2024
8559b7d
Update HttpClient & DefaultBackoffStrategy for correctness
pnilan Apr 30, 2024
369bfe0
Update abstractmethods access and error handling
pnilan Apr 30, 2024
b9bd8d1
Moves `raise_on_http_errors` to `HttpClient`
pnilan Apr 30, 2024
e15f59d
Adds unit tests for json decoder
pnilan Apr 30, 2024
efa8c4f
Add tests for default backoff strategy, http status error handler, an…
pnilan May 1, 2024
4ec752a
Update `HttpClient` to accept `authenticator` parameter, update Decod…
pnilan May 1, 2024
f11a4a0
Update `JsonDecoder` and related tests to handle invalid responses
pnilan May 3, 2024
5017a5a
Added testing for error handler and HttpClient
pnilan May 3, 2024
56f6c40
Added caching tests and updated clear_cache in `HttpClient`
pnilan May 4, 2024
91f53dc
Updates HttpStatusErrorHandler for clarity and improved default error…
pnilan May 4, 2024
c94829f
Updated test to remove explicit session setting
pnilan May 4, 2024
96a2e45
chore: format code
pnilan May 4, 2024
7a5e5c2
Moves inner function to private class method
pnilan May 7, 2024
1407d6d
Update per PR comments
pnilan May 8, 2024
1d1578b
Remove decoder
pnilan May 8, 2024
a0cc4f5
chore: format code
pnilan May 8, 2024
bafb8ed
Adds additional test for session.send raising exception.
pnilan May 8, 2024
44b67ca
chore: format code
pnilan May 8, 2024
0800251
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 8, 2024
56003f9
Update test_http_client for explicit error mapping`
pnilan May 10, 2024
b8b52d0
Update response/exc evaluation
pnilan May 13, 2024
7aa4c22
Update test_http_client to explicitly set error mapping.
pnilan May 13, 2024
a0f6035
update HttpClient and rate limiting to include http-client specific b…
pnilan May 13, 2024
c57b6f3
chore: format code
pnilan May 13, 2024
79f12cf
Update rate_limiting for mypy type checking
pnilan May 13, 2024
c8ed520
chore: format code
pnilan May 13, 2024
3676867
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 13, 2024
8a4256a
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 13, 2024
b78dd1c
Update poetry lock
pnilan May 13, 2024
a3aa7f9
Update to ignore type for backoff handler decorators
pnilan May 13, 2024
d68b6fa
chore: format code
pnilan May 13, 2024
023931b
update mypy ignore
pnilan May 13, 2024
c40f1fd
chore: lint
pnilan May 13, 2024
94bd7a4
Update BackoffExceptions for HttpStream compatibility
pnilan May 14, 2024
0854220
Update HttpClient for updated BackoffException
pnilan May 14, 2024
d1278e0
Update `test_using_cache` for http stream to clear cache
pnilan May 14, 2024
1166a12
Update tests to mock injected objects
pnilan May 14, 2024
a17828f
Update for mypy type checking
pnilan May 14, 2024
7f006d5
fix backoff_strategy initialization in http client
pnilan May 14, 2024
e0e5cfb
Update variable instantiation
pnilan May 14, 2024
cd945ce
chore: lint
pnilan May 14, 2024
ce8b8a2
Updates to resolve PR comments
pnilan May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
# Initialize Streams Package
from .exceptions import UserDefinedBackoffException
from .http import HttpStream, HttpSubStream
from .http_request_sender import HttpRequestSender
from .http_error_handler import HttpErrorHandler

__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException", "HttpRequestSender", "HttpErrorHandler"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import requests
from enum import Enum
from typing import Any, Mapping, Optional
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_cdk.models import FailureType

class ResponseAction(Enum):

SUCCESS = "SUCCESS"
RETRY = "RETRY"
FAIL = "FAIL"
IGNORE = "IGNORE"

class HttpErrorHandler():

error_mapping: Mapping[int, Mapping[str, Any]] = {
pnilan marked this conversation as resolved.
Show resolved Hide resolved
400: { "action": ResponseAction.FAIL, "failure_type": FailureType.config_error },
401: { "action": ResponseAction.FAIL, "failure_type": FailureType.config_error },
403: { "action": ResponseAction.FAIL, "failure_type": FailureType.config_error },
404: { "action": ResponseAction.FAIL, "failure_type": FailureType.system_error },
408: { "action": ResponseAction.RETRY, "failure_type": FailureType.transient_error },
429: { "action": ResponseAction.RETRY, "failure_type": FailureType.transient_error },
500: { "action": ResponseAction.RETRY, "failure_type": FailureType.transient_error },
502: { "action": ResponseAction.RETRY, "failure_type": FailureType.transient_error },
503: { "action": ResponseAction.RETRY, "failure_type": FailureType.transient_error },
pnilan marked this conversation as resolved.
Show resolved Hide resolved
}

def __init__(
self,
logger: logging.Logger,
) -> None:
self._logger = logger
self._error_mapping = self.error_mapping

def validate_response(self, response: requests.Response) -> None:

if response.ok:
return response

response_status = self.error_mapping.get(response.status_code, None)

if not response_status:
raise ValueError(f"Unexpected status code: {response.status_code}")

if response_status['action'] == ResponseAction.FAIL:
error_message = (
response_status.get("error_message") or f"Request to {response.request.url} failed with status code {response.status_code} and error message {self.parse_response_error_message(response)}"
)
raise AirbyteTracedException(
internal_message=error_message,
message=error_message,
failure_type=response_status['failure_type']
)

elif response_status['action'] == ResponseAction.IGNORE:
self._logger.info(
f"Ignoring response for failed request with HTTP status {response.status_code} with error message {self.parse_response_error_message(response)}"
)

return response

@classmethod
def parse_response_error_message(cls, response: requests.Response) -> Optional[str]:
"""
Parses the raw response object from a failed request into a user-friendly error message.
By default, this method tries to grab the error message from JSON responses by following common API patterns. Override to parse differently.

:param response:
:return: A user-friendly message that indicates the cause of the error
"""
# default logic to grab error from common fields
def _try_get_error(value: Any) -> Any:
if isinstance(value, str):
return value
elif isinstance(value, list):
error_list = [_try_get_error(v) for v in value]
return ", ".join(v for v in error_list if v is not None)
elif isinstance(value, dict):
new_value = (
value.get("message")
or value.get("messages")
or value.get("error")
or value.get("errors")
or value.get("failures")
or value.get("failure")
or value.get("details")
or value.get("detail")
)
return _try_get_error(new_value)
return None

try:
body = response.json()
error = _try_get_error(body)
return str(error) if error else None
except requests.exceptions.JSONDecodeError:
return None

def should_retry(self, response: requests.Response) -> bool:
return self._error_mapping[response.status_code].get("action", None) == ResponseAction.RETRY if response.status_code in self._error_mapping else False

def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

:param response:
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
return None

def error_message(self, response: requests.Response) -> str:
"""
Override this method to specify a custom error message which can incorporate the HTTP response received

:param response: The incoming HTTP response from the partner API
:return:
"""
return ""

@property
def raise_on_http_errors(self) -> bool:
pnilan marked this conversation as resolved.
Show resolved Hide resolved
"""
Override if needed. If set to False, allows opting-out of raising HTTP code exception.
"""
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@


# _create_prepared_request
# _send_request(all parameters)
# request_kwargs
# _send(request)

import logging
import urllib
from typing import Any, Mapping, Optional, Union, Tuple
import requests

from .exceptions import DefaultBackoffException, RequestBodyException, UserDefinedBackoffException
from .rate_limiting import default_backoff_handler, user_defined_backoff_handler
from .http_error_handler import HttpErrorHandler

BODY_REQUEST_METHODS = ("GET", "POST", "PUT", "PATCH")

class HttpRequestSender():

def __init__(
self,
session: requests.Session,
logger: logging.Logger,
http_error_handler: Optional[HttpErrorHandler] = None
):
self._session = session
self._logger = logger
self._http_error_handler = http_error_handler or HttpErrorHandler(logger)

def _dedupe_query_params(self, url: str, params: Mapping[str, str]) -> Mapping[str, str]:
"""
Remove query parameters from params mapping if they are already encoded in the URL.
:param url: URL with
:param params:
:return:
"""
if params is None:
params = {}
query_string = urllib.parse.urlparse(url).query
query_dict = {k: v[0] for k, v in urllib.parse.parse_qs(query_string).items()}

duplicate_keys_with_same_value = {k for k in query_dict.keys() if str(params.get(k)) == str(query_dict[k])}
return {k: v for k, v in params.items() if k not in duplicate_keys_with_same_value}

def _create_prepared_request(
self,
http_method: str,
url: str,
dedupe_query_params: bool,
headers: Optional[Mapping[str, str]] = None,
params: Optional[Mapping[str, str]] = None,
json: Optional[Mapping[str, Any]] = None,
data: Optional[Union[str, Mapping[str, Any]]] = None,
):
# creates and returns a prepared request

# Public method from HttpStream --> should it be re-implemented here? No guarantee that it's not overridden in existing connectors
if dedupe_query_params:
query_params = self._dedupe_query_params(url, params)
else:
query_params = params or {}
args = {"method": http_method, "url": url, "headers": headers, "params": query_params}
if http_method.upper() in BODY_REQUEST_METHODS:
if json and data:
raise RequestBodyException(
"At the same time only one of the 'request_body_data' and 'request_body_json' functions can return data"
)
elif json:
args["json"] = json
elif data:
args["data"] = data
prepared_request: requests.PreparedRequest = self._session.prepare_request(requests.Request(**args))

return prepared_request

def _send_with_retry(
self,
retry_factor: float,
max_retries: Optional[int],
max_time: Optional[int],
request: requests.PreparedRequest,
request_kwargs: Optional[Mapping[str, Any]] = None
) -> requests.Response:
"""
Backoff package has max_tries parameter that means total number of
tries before giving up, so if this number is 0 no calls expected to be done.
But for this class we call it max_REtries assuming there would be at
least one attempt and some retry attempts, to comply this logic we add
1 to expected retries attempts.
"""
if max_retries is not None:
max_tries = max(0, max_retries) + 1

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, max_time=max_time, factor=retry_factor)
# backoff handlers wrap _send, so it will always return a response
response = backoff_handler(user_backoff_handler)(request, request_kwargs)

return response


def _send(
self,
request: requests.PreparedRequest,
request_kwargs: Optional[Mapping[str, Any]] = None
) -> requests.Response:
# sends prepared request, returns response, invokes error handling on response

self._logger.debug(
"Making outbound API request",
extra={"headers": request.headers, "url": request.url, "request_body": request.body}
)

response: requests.Response = self._session.send(request, **request_kwargs)

# Evaluation of response.text can be heavy, for example, if streaming a large response
# Do it only in debug mode
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(
"Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}
)

response = self._http_error_handler.validate_response(response)

# !!! moves public methods from HttpStream to HttpErrorHandler --> when wired in to HttpStream, connectors will require changes => should section be wrapped in a method in HttpErrorHandler?

if self._http_error_handler.should_retry(response):
custom_backoff_time = self._http_error_handler.backoff_time(response)
error_message = self._http_error_handler.error_message(response)
if custom_backoff_time:
raise UserDefinedBackoffException(
backoff=custom_backoff_time, request=request, response=response, error_message=error_message
)
else:
raise DefaultBackoffException(request=request, response=response, error_message=error_message)
elif self._http_error_handler.raise_on_http_errors:
# Raise any HTTP exceptions that happened in case there were unexpected ones
try:
response.raise_for_status()
except requests.HTTPError as exc:
self._logger.error(response.text)
raise exc

return response

def send_request(
self,
http_method: str,
url: str,
retry_factor: float,
headers: Optional[Mapping[str, str]] = None,
params: Optional[Mapping[str, str]] = None,
json: Optional[Mapping[str, Any]] = None,
data: Optional[Union[str, Mapping[str, Any]]] = None,
max_retries: Optional[int] = None,
max_time: Optional[int] = None,
dedupe_query_params: bool = False,
request_kwargs: Optional[Mapping[str, Any]] = None,
) -> Tuple[requests.PreparedRequest, requests.Response]:
"""
Prepares and sends request and return request and response objects.
"""

request: requests.PreparedRequest = self._create_prepared_request(
http_method=http_method,
url=url,
dedupe_query_params=dedupe_query_params,
headers=headers,
params=params,
json=json,
data=data
)

response: requests.Response = self._send_with_retry(
retry_factor=retry_factor,
max_retries=max_retries,
max_time=max_time,
request=request,
request_kwargs=request_kwargs
)

return request, response
Loading