Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-cdk: Improve Error Handling in Legacy CDK #37576

Merged
merged 51 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
05002a9
Add initial http_request_sender and http_error_handler files, structure
pnilan Apr 24, 2024
d0493c0
Moves error_mapping to http_error_handler, adds error_message and bac…
pnilan Apr 24, 2024
d098074
Update for correct usage of max_tries/max_retries
pnilan Apr 24, 2024
ce3cef7
Updates class initialization and imports to __init__
pnilan Apr 24, 2024
72f9c49
Adds new `ErrorHandler` and `RetryStrategy` ABCs, adds new default `D…
pnilan Apr 26, 2024
93c3d0b
Revert `http.py`
pnilan Apr 26, 2024
e8ad170
chore: format code
pnilan Apr 26, 2024
3e28627
Update exports
pnilan Apr 26, 2024
0c0522b
Rename `RetryStrategy` to `BackoffStrategy`, add `Decoder` and `Error…
pnilan Apr 30, 2024
8559b7d
Update HttpClient & DefaultBackoffStrategy for correctness
pnilan Apr 30, 2024
369bfe0
Update abstractmethods access and error handling
pnilan Apr 30, 2024
b9bd8d1
Moves `raise_on_http_errors` to `HttpClient`
pnilan Apr 30, 2024
e15f59d
Adds unit tests for json decoder
pnilan Apr 30, 2024
efa8c4f
Add tests for default backoff strategy, http status error handler, an…
pnilan May 1, 2024
4ec752a
Update `HttpClient` to accept `authenticator` parameter, update Decod…
pnilan May 1, 2024
f11a4a0
Update `JsonDecoder` and related tests to handle invalid responses
pnilan May 3, 2024
5017a5a
Added testing for error handler and HttpClient
pnilan May 3, 2024
56f6c40
Added caching tests and updated clear_cache in `HttpClient`
pnilan May 4, 2024
91f53dc
Updates HttpStatusErrorHandler for clarity and improved default error…
pnilan May 4, 2024
c94829f
Updated test to remove explicit session setting
pnilan May 4, 2024
96a2e45
chore: format code
pnilan May 4, 2024
7a5e5c2
Moves inner function to private class method
pnilan May 7, 2024
1407d6d
Update per PR comments
pnilan May 8, 2024
1d1578b
Remove decoder
pnilan May 8, 2024
a0cc4f5
chore: format code
pnilan May 8, 2024
bafb8ed
Adds additional test for session.send raising exception.
pnilan May 8, 2024
44b67ca
chore: format code
pnilan May 8, 2024
0800251
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 8, 2024
56003f9
Update test_http_client for explicit error mapping`
pnilan May 10, 2024
b8b52d0
Update response/exc evaluation
pnilan May 13, 2024
7aa4c22
Update test_http_client to explicitly set error mapping.
pnilan May 13, 2024
a0f6035
update HttpClient and rate limiting to include http-client specific b…
pnilan May 13, 2024
c57b6f3
chore: format code
pnilan May 13, 2024
79f12cf
Update rate_limiting for mypy type checking
pnilan May 13, 2024
c8ed520
chore: format code
pnilan May 13, 2024
3676867
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 13, 2024
8a4256a
Merge branch 'master' into pnilan/airbyte-cdk-improved-errors-and-sends
pnilan May 13, 2024
b78dd1c
Update poetry lock
pnilan May 13, 2024
a3aa7f9
Update to ignore type for backoff handler decorators
pnilan May 13, 2024
d68b6fa
chore: format code
pnilan May 13, 2024
023931b
update mypy ignore
pnilan May 13, 2024
c40f1fd
chore: lint
pnilan May 13, 2024
94bd7a4
Update BackoffExceptions for HttpStream compatibility
pnilan May 14, 2024
0854220
Update HttpClient for updated BackoffException
pnilan May 14, 2024
d1278e0
Update `test_using_cache` for http stream to clear cache
pnilan May 14, 2024
1166a12
Update tests to mock injected objects
pnilan May 14, 2024
a17828f
Update for mypy type checking
pnilan May 14, 2024
7f006d5
fix backoff_strategy initialization in http client
pnilan May 14, 2024
e0e5cfb
Update variable instantiation
pnilan May 14, 2024
cd945ce
chore: lint
pnilan May 14, 2024
ce8b8a2
Updates to resolve PR comments
pnilan May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
#

# Initialize Streams Package
from .exceptions import UserDefinedBackoffException
from .http_client import HttpClient
from .http import HttpStream, HttpSubStream
from .exceptions import UserDefinedBackoffException

__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from .backoff_strategy import BackoffStrategy
from .default_backoff_strategy import DefaultBackoffStrategy
from .error_handler import ErrorHandler
from .error_message_parser import ErrorMessageParser
from .http_status_error_handler import HttpStatusErrorHandler
from .json_error_message_parser import JsonErrorMessageParser
from .response_models import ResponseAction, ErrorResolution

__all__ = [
"BackoffStrategy",
"DefaultBackoffStrategy",
"ErrorHandler",
"ErrorMessageParser",
"HttpStatusErrorHandler",
"JsonErrorMessageParser",
"ResponseAction",
"ErrorResolution"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Optional, Union

import requests


class BackoffStrategy(ABC):
@abstractmethod
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]]) -> Optional[float]:
"""
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.

This method is called only if should_backoff() returns True for the input request.

:param response:
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from datetime import timedelta
from typing import Optional, Union

import requests

from .backoff_strategy import BackoffStrategy


class DefaultBackoffStrategy(BackoffStrategy):
def __init__(
self,
max_retries: int = 5,
max_time: timedelta = timedelta(seconds=600),
):
self.max_retries = max_retries
self.max_time = max_time.total_seconds()

def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]]) -> Optional[float]:
"""
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
pnilan marked this conversation as resolved.
Show resolved Hide resolved

:param response:
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from abc import ABC, abstractmethod
from typing import Optional, Union

import requests

from .response_models import ErrorResolution


class ErrorHandler(ABC):
"""
Abstract base class to determine how to handle a failed HTTP request.
"""

@abstractmethod
def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
"""
Interpret the response or exception and return the corresponding response action, failure type, and error message.

:param response: The HTTP response object or exception raised during the request.
:return: A tuple containing the response action, failure type, and error message.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Optional

import requests


class ErrorMessageParser(ABC):
@abstractmethod
def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
"""
Parse error message from response.
:param response: response received for the request
:return: error message
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import logging
from typing import Mapping, Optional, Type, Union

import requests
from airbyte_cdk.models import FailureType
from requests import RequestException

from .error_handler import ErrorHandler
from .response_models import ErrorResolution, ResponseAction


class HttpStatusErrorHandler(ErrorHandler):

_DEFAULT_ERROR_MAPPING: Mapping[Union[int, str, Type[Exception]], ErrorResolution] = {
RequestException: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="An exception occurred when making the request.",
),
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Bad request. Please check your request parameters.",
),
401: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Unauthorized. Please ensure you are authenticated correctly.",
),
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
),
404: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Not found. The requested resource was not found on the server.",
),
405: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Method not allowed. Please check your request method.",
),
408: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Request timeout.",
),
429: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
),
500: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Internal server error.",
),
502: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Bad gateway.",
),
503: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Service unavailable.",
),
pnilan marked this conversation as resolved.
Show resolved Hide resolved
504: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Gateway timeout.",
),
}

def __init__(
self,
logger: logging.Logger,
error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
) -> None:
"""
Initialize the HttpStatusErrorHandler.

:param error_mapping: Custom error mappings to extend or override the default mappings.
"""
self._logger = logger
self._error_mapping = error_mapping or self._DEFAULT_ERROR_MAPPING

def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
"""
Interpret the response and return the corresponding response action, failure type, and error message.

:param response: The HTTP response object.
:return: A tuple containing the response action, failure type, and error message.
"""

if isinstance(response_or_exception, Exception):
mapped_error: Optional[ErrorResolution] = self._error_mapping.get(response_or_exception.__class__)

if mapped_error is not None:
return mapped_error
else:
self._logger.error(f"Unexpected exception in error handler: {response_or_exception}")
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.system_error,
error_message=f"Unexpected exception in error handler: {response_or_exception}",
)

elif isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code is None:
self._logger.error("Response does not include an HTTP status code.")
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Response does not include an HTTP status code.",
)

if response_or_exception.ok:
return ErrorResolution(
response_action=ResponseAction.SUCCESS,
failure_type=None,
error_message=None,
)

error_key = response_or_exception.status_code

mapped_error = self._error_mapping.get(error_key)

if mapped_error is not None:
return mapped_error
else:
self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'")
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.system_error,
error_message=f"Unexpected HTTP Status Code in error handler: {error_key}",
)
else:
self._logger.error(f"Received unexpected response type: {type(response_or_exception)}")
return ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message=f"Received unexpected response type: {type(response_or_exception)}",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Optional

import requests
from airbyte_cdk.sources.utils.types import JsonType

from .error_message_parser import ErrorMessageParser


class JsonErrorMessageParser(ErrorMessageParser):
def _try_get_error(self, value: Optional[JsonType]) -> Optional[str]:
if isinstance(value, str):
return value
elif isinstance(value, list):
errors_in_value = [self._try_get_error(v) for v in value]
return ", ".join(v for v in errors_in_value if v is not None)
elif isinstance(value, dict):
new_value = (
value.get("message")
or value.get("messages")
or value.get("error")
or value.get("errors")
or value.get("failures")
or value.get("failure")
or value.get("detail")
)
return self._try_get_error(new_value)
return None

def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
"""
Parses the raw response object from a failed request into a user-friendly error message.

:param response:
:return: A user-friendly message that indicates the cause of the error
"""
try:
body = response.json()
return self._try_get_error(body)
except requests.exceptions.JSONDecodeError:
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from enum import Enum
from typing import Optional

from airbyte_cdk.models import FailureType


class ResponseAction(Enum):
SUCCESS = "SUCCESS"
RETRY = "RETRY"
FAIL = "FAIL"
IGNORE = "IGNORE"


@dataclass
class ErrorResolution:
response_action: Optional[ResponseAction] = None
failure_type: Optional[FailureType] = None
error_message: Optional[str] = None
30 changes: 23 additions & 7 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@
#


from typing import Union
from typing import Optional, Union

import requests


class BaseBackoffException(requests.exceptions.HTTPError):
def __init__(self, request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
error_message = (
error_message or f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
)
super().__init__(error_message, request=request, response=response)
def __init__(
self,
request: requests.PreparedRequest,
response: Optional[Union[requests.Response, Exception]],
error_message: str = "",
):

if isinstance(response, requests.Response):
error_message = (
error_message or f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
)
super().__init__(error_message, request=request, response=response)
else:
error_message = error_message or f"Request URL: {request.url}, Exception: {response}"
pnilan marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(error_message, request=request, response=None)


class RequestBodyException(Exception):
Expand All @@ -27,7 +37,13 @@ class UserDefinedBackoffException(BaseBackoffException):
An exception that exposes how long it attempted to backoff
"""

def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
def __init__(
self,
backoff: Union[int, float],
request: requests.PreparedRequest,
response: Optional[Union[requests.Response, Exception]],
error_message: str = "",
):
"""
:param backoff: how long to backoff in seconds
:param request: the request that triggered this backoff exception
Expand Down
Loading
Loading