Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support custom error messaging for error response + retryable errors #18204

Merged
merged 8 commits into from
Oct 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def __post_init__(self, options: Mapping[str, Any]):
def max_retries(self) -> Union[int, None]:
return self.error_handlers[0].max_retries

def should_retry(self, response: requests.Response) -> ResponseStatus:
def interpret_response(self, response: requests.Response) -> ResponseStatus:
should_retry = None
for retrier in self.error_handlers:
should_retry = retrier.should_retry(response)
should_retry = retrier.interpret_response(response)
if should_retry.action == ResponseAction.SUCCESS:
return response_status.SUCCESS
if should_retry == response_status.IGNORE or should_retry.action == ResponseAction.RETRY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin


Expand Down Expand Up @@ -91,9 +92,10 @@ class DefaultErrorHandler(ErrorHandler, JsonSchemaMixin):

DEFAULT_BACKOFF_STRATEGY = ExponentialBackoffStrategy

config: Config
options: InitVar[Mapping[str, Any]]
response_filters: Optional[List[HttpResponseFilter]] = None
max_retries: Optional[int] = 5
max_retries: Optional[int] = 1
_max_retries: int = field(init=False, repr=False, default=5)
backoff_strategies: Optional[List[BackoffStrategy]] = None

Expand All @@ -102,9 +104,11 @@ def __post_init__(self, options: Mapping[str, Any]):

if not self.response_filters:
self.response_filters.append(
HttpResponseFilter(ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, options={})
HttpResponseFilter(
ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, config=self.config, options={}
)
)
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, options={}))
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config={}, options={}))

if not self.backoff_strategies:
self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()]
Expand All @@ -122,7 +126,7 @@ def max_retries(self, value: Union[int, None]):
if not isinstance(value, property):
self._max_retries = value

def should_retry(self, response: requests.Response) -> ResponseStatus:
def interpret_response(self, response: requests.Response) -> ResponseStatus:
request = response.request

if request not in self._last_request_to_attempt_count:
Expand All @@ -132,10 +136,15 @@ def should_retry(self, response: requests.Response) -> ResponseStatus:
for response_filter in self.response_filters:
filter_action = response_filter.matches(response)
if filter_action is not None:
error_message = response_filter.create_error_message(response)
if filter_action == ResponseAction.RETRY:
return ResponseStatus(ResponseAction.RETRY, self._backoff_time(response, self._last_request_to_attempt_count[request]))
return ResponseStatus(
ResponseAction.RETRY,
self._backoff_time(response, self._last_request_to_attempt_count[request]),
error_message=error_message,
)
else:
return ResponseStatus(filter_action)
return ResponseStatus(filter_action, error_message=error_message)
if response.ok:
return response_status.SUCCESS
# Fail if the response matches no filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def max_retries(self) -> Union[int, None]:
pass

@abstractmethod
def should_retry(self, response: requests.Response) -> ResponseStatus:
def interpret_response(self, response: requests.Response) -> ResponseStatus:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this method because in simple_retriever.parse_response() we're not just getting the retryability, we're interpreting what the response status which is probably a more accurate name

"""
Evaluate response status describing whether a failing request should be retried or ignored.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from typing import Any, Mapping, Optional, Set, Union

import requests
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.http import HttpStream
from dataclasses_jsonschema import JsonSchemaMixin

Expand All @@ -22,23 +24,27 @@ class HttpResponseFilter(JsonSchemaMixin):
http_codes (Set[int]): http code of matching requests
error_message_contains (str): error substring of matching requests
predicate (str): predicate to apply to determine if a request is matching
error_message (Union[InterpolatedString, str): error message to display if the response matches the filter
"""

TOO_MANY_REQUESTS_ERRORS = {429}
DEFAULT_RETRIABLE_ERRORS = set([x for x in range(500, 600)]).union(TOO_MANY_REQUESTS_ERRORS)

action: Union[ResponseAction, str]
config: Config
options: InitVar[Mapping[str, Any]]
http_codes: Set[int] = None
error_message_contains: str = None
predicate: Union[InterpolatedBoolean, str] = ""
error_message: Union[InterpolatedString, str] = ""

def __post_init__(self, options: Mapping[str, Any]):
if isinstance(self.action, str):
self.action = ResponseAction[self.action]
self.http_codes = self.http_codes or set()
if isinstance(self.predicate, str):
self.predicate = InterpolatedBoolean(condition=self.predicate, options=options)
self.error_message = InterpolatedString.create(string_or_interpolated=self.error_message, options=options)

def matches(self, response: requests.Response) -> Optional[ResponseAction]:
"""
Expand All @@ -55,6 +61,16 @@ def matches(self, response: requests.Response) -> Optional[ResponseAction]:
else:
return None

def create_error_message(self, response: requests.Response) -> str:
"""
Construct an error message based on the specified message template of the filter.
:param response: The HTTP response which can be used during interpolation
:return: The evaluated error message string to be emitted
"""
if self.error_message:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice yeah this can basically just get condensed into a one liner

return self.error_message.eval(self.config, response=response.json(), headers=response.headers)
return ""

def _response_matches_predicate(self, response: requests.Response) -> bool:
return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@

class ResponseStatus:
"""
ResponseAction amended with backoff time if a action is RETRY
ResponseAction amended with backoff time if an action is RETRY
"""

def __init__(self, response_action: Union[ResponseAction, str], retry_in: Optional[float] = None):
def __init__(self, response_action: Union[ResponseAction, str], retry_in: Optional[float] = None, error_message: str = ""):
"""
:param response_action: response action to execute
:param retry_in: backoff time (if action is RETRY)
:param error_message: the error to be displayed back to the customer
"""
if isinstance(response_action, str):
response_action = ResponseAction[response_action]
if retry_in and response_action != ResponseAction.RETRY:
raise ValueError(f"Unexpected backoff time ({retry_in} for non-retryable response action {response_action}")
self._retry_in = retry_in
self._action = response_action
self._error_message = error_message

@property
def action(self):
Expand All @@ -34,6 +36,11 @@ def retry_in(self) -> Optional[float]:
"""How long to backoff before retrying a response. None if no wait required."""
return self._retry_in

@property
def error_message(self) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this always return a str since error_message defaults to ""

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah good point will just make this a string

"""The message to be displayed when an error response is received"""
return self._error_message

@classmethod
def retry(cls, retry_in: Optional[float]) -> "ResponseStatus":
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __post_init__(self, options: Mapping[str, Any]):
if type(self.http_method) == str:
self.http_method = HttpMethod[self.http_method]
self._method = self.http_method
self.error_handler = self.error_handler or DefaultErrorHandler(options=options)
self.error_handler = self.error_handler or DefaultErrorHandler(options=options, config=self.config)
self._options = options

# We are using an LRU cache in should_retry() method which requires all incoming arguments (including self) to be hashable.
Expand Down Expand Up @@ -88,9 +88,9 @@ def get_method(self):
# use a tiny cache to limit the memory footprint. It doesn't have to be large because we mostly
# only care about the status of the last response received
@lru_cache(maxsize=10)
def should_retry(self, response: requests.Response) -> ResponseStatus:
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
# Cache the result because the HttpStream first checks if we should retry before looking at the backoff time
return self.error_handler.should_retry(response)
return self.error_handler.interpret_response(response)

def get_request_params(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def get_request_params(
"""

@abstractmethod
def should_retry(self, response: requests.Response) -> ResponseStatus:
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
"""
Specifies conditions for backoff based on the response from the server.
Specifies conditions for backoff, error handling and reporting based on the response from the server.

By default, back off on the following HTTP response statuses:
- 429 (Too Many Requests) indicating rate limiting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def should_retry(self, response: requests.Response) -> bool:

Unexpected but transient exceptions (connection timeout, DNS resolution failed, etc..) are retried by default.
"""
return self.requester.should_retry(response).action == ResponseAction.RETRY
return self.requester.interpret_response_status(response).action == ResponseAction.RETRY

def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Expand All @@ -107,12 +107,21 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
to the default backoff behavior (e.g using an exponential algorithm).
"""
should_retry = self.requester.should_retry(response)
should_retry = self.requester.interpret_response_status(response)
if should_retry.action != ResponseAction.RETRY:
raise ValueError(f"backoff_time can only be applied on retriable response action. Got {should_retry.action}")
assert should_retry.action == ResponseAction.RETRY
return should_retry.retry_in

def error_message(self, response: requests.Response) -> str:
"""
Constructs an error message which can incorporate the HTTP response received from the partner API.

:param response: The incoming HTTP response from the partner API
:return The error message string to be emitted
"""
return self.requester.interpret_response_status(response).error_message

def _get_request_options(
self,
stream_slice: Optional[StreamSlice],
Expand Down Expand Up @@ -299,9 +308,10 @@ def parse_response(
# if fail -> raise exception
# if ignore -> ignore response and return no records
# else -> delegate to record selector
response_status = self.requester.should_retry(response)
response_status = self.requester.interpret_response_status(response)
if response_status.action == ResponseAction.FAIL:
raise ReadException(f"Request {response.request} failed with response {response}")
error_message = response_status.error_message or f"Request {response.request} failed with response {response}"
raise ReadException(error_message)
elif response_status.action == ResponseAction.IGNORE:
self.logger.info(f"Ignoring response for failed request with error message {HttpStream.parse_response_error_message(response)}")
return []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@


class BaseBackoffException(requests.exceptions.HTTPError):
def __init__(self, request: requests.PreparedRequest, response: requests.Response):
error_message = f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
def __init__(self, request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
error_message = (
error_message or f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
)
super().__init__(error_message, request=request, response=response)


Expand All @@ -25,14 +27,14 @@ class UserDefinedBackoffException(BaseBackoffException):
An exception that exposes how long it attempted to backoff
"""

def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response):
def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
"""
:param backoff: how long to backoff in seconds
:param request: the request that triggered this backoff exception
:param response: the response that triggered the backoff exception
"""
self.backoff = backoff
super().__init__(request=request, response=response)
super().__init__(request=request, response=response, error_message=error_message)


class DefaultBackoffException(BaseBackoffException):
Expand Down
16 changes: 14 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
return None

def error_message(self, response: requests.Response) -> str:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the idea of a new method to the CDK to support a low-code use case, but it could still be useful for other connectors that need to override retryable errors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interfacing with HttpStream is always tricky.

I'm in favor of those refactors so long as the default behavior does not change so we don't impact existing connectors.

"""
Override this method to specify a custom error message which can incorporate the HTTP response received

:param response: The incoming HTTP response from the partner API
:return:
"""
return ""

def _create_prepared_request(
self,
path: str,
Expand Down Expand Up @@ -296,10 +305,13 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
)
if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
error_message = self.error_message(response)
if custom_backoff_time:
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
raise UserDefinedBackoffException(
backoff=custom_backoff_time, request=request, response=response, error_message=error_message
)
else:
raise DefaultBackoffException(request=request, response=response)
raise DefaultBackoffException(request=request, response=response, error_message=error_message)
elif self.raise_on_http_errors:
# Raise any HTTP exceptions that happened in case there were unexpected ones
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@
)
def test_composite_error_handler(test_name, first_handler_behavior, second_handler_behavior, expected_behavior):
first_error_handler = MagicMock()
first_error_handler.should_retry.return_value = first_handler_behavior
first_error_handler.interpret_response.return_value = first_handler_behavior
second_error_handler = MagicMock()
second_error_handler.should_retry.return_value = second_handler_behavior
second_error_handler.should_retry.return_value = second_handler_behavior
second_error_handler.interpret_response.return_value = second_handler_behavior
second_error_handler.interpret_response.return_value = second_handler_behavior
retriers = [first_error_handler, second_error_handler]
retrier = CompositeErrorHandler(error_handlers=retriers, options={})
response_mock = MagicMock()
response_mock.ok = first_handler_behavior == response_status.SUCCESS or second_handler_behavior == response_status.SUCCESS
assert retrier.should_retry(response_mock) == expected_behavior
assert retrier.interpret_response(response_mock) == expected_behavior


def test_composite_error_handler_no_handlers():
Expand Down
Loading