Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source-intercom) raise config error on Active subscription needed error, transient error for Companies stream #42094

Merged
merged 12 commits into from
Jul 29, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.6.13
dockerImageTag: 0.6.14
dockerRepository: airbyte/source-intercom
documentationUrl: https://docs.airbyte.com/integrations/sources/intercom
githubIssueLabel: source-intercom
Expand Down
235 changes: 160 additions & 75 deletions airbyte-integrations/connectors/source-intercom/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.6.13"
version = "0.6.14"
name = "source-intercom"
description = "Source implementation for Intercom Yaml."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_intercom"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.86.3"
airbyte-cdk = "^3"

[tool.poetry.scripts]
source-intercom = "source_intercom.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental.cursor import Cursor
from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import (
Expand All @@ -21,12 +21,13 @@
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution

RequestInput = Union[str, Mapping[str, str]]


@dataclass
class IncrementalSingleSliceCursor(Cursor):
class IncrementalSingleSliceCursor(DeclarativeCursor):
cursor_field: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -82,6 +83,9 @@ def _get_request_option(self, option_type: RequestOptionType, stream_slice: Stre
def get_stream_state(self) -> StreamState:
return self._state

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
return self.get_stream_state()

def set_initial_state(self, stream_state: StreamState):
cursor_field = self.cursor_field.eval(self.config)
cursor_value = stream_state.get(cursor_field)
Expand Down Expand Up @@ -361,14 +365,7 @@ def wrapper_balance_rate_limit(*args, **kwargs):


@dataclass(eq=False)
class HttpRequesterWithRateLimiter(HttpRequester):
"""
The difference between the built-in `HttpRequester` and this one is the custom decorator,
applied on top of `interpret_response_status` to preserve the api calls for a defined amount of time,
calculated using the rate limit headers and not use the custom backoff strategy,
since we deal with Response.status_code == 200,
the default requester's logic doesn't allow to handle the status of 200 with `should_retry()`.
"""
class IntercomHttpRequester(HttpRequester):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something that seems unclear to me is where this custom IntercomHttpRequester differs from the regular HttpRequester that is supplied by the low-code CDK, since we removed interpret_response_status into a custom error handler, it doesn't seem like this class is overriding anything meaningful.

I see that we instantiate a bunch of interpolators, but the overwritten get_request_params() and other methods seem to just do the same behavior as interpolated_request_options_provider.py which makes me think this class is not needed anymore.

What do you think @darynaishchenko ? If this is in fact needed, let's add a docstring explaining the need for this custom class. Otherwise we should just swap this out and use HttpRequester in the manifest

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, here is no need to overwrite HttpRequester now. Changed it manifest file.
Run regression tests here: request urls looks good.


request_body_json: Optional[RequestInput] = None
request_headers: Optional[RequestInput] = None
Expand All @@ -391,12 +388,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
config=self.config, request_inputs=self.request_body_json, parameters=parameters
)

# The RateLimiter is applied to balance the api requests.
@IntercomRateLimiter.balance_rate_limit()
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
# Check for response.headers to define the backoff time before the next api call
return super().interpret_response_status(response)

def get_request_params(
self,
*,
Expand Down Expand Up @@ -426,3 +417,19 @@ def get_request_body_json(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)


class ErrorHandlerWithRateLimiter(DefaultErrorHandler):
"""
The difference between the built-in `DefaultErrorHandler` and this one is the custom decorator,
applied on top of `interpret_response` to preserve the api calls for a defined amount of time,
calculated using the rate limit headers and not use the custom backoff strategy,
since we deal with Response.status_code == 200,
the default requester's logic doesn't allow to handle the status of 200 with `should_retry()`.
"""

# The RateLimiter is applied to balance the api requests.
@IntercomRateLimiter.balance_rate_limit()
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
# Check for response.headers to define the backoff time before the next api call
return super().interpret_response(response_or_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ definitions:
requester:
description: "Base Requester for Full Refresh streams"
type: CustomRequester
class_name: source_intercom.components.HttpRequesterWithRateLimiter
class_name: source_intercom.components.IntercomHttpRequester
url_base: "https://api.intercom.io/"
http_method: "GET"
authenticator:
Expand All @@ -22,7 +22,14 @@ definitions:
Intercom-Version: "'2.10'"
Accept: "application/json"
error_handler:
type: "DefaultErrorHandler"
type: CustomErrorHandler
class_name: source_intercom.components.ErrorHandlerWithRateLimiter
response_filters:
- type: HttpResponseFilter
error_message_contains: "Active subscription needed"
action: FAIL
failure_type: config_error
error_message: "Failed to perform request. Error: Active subscription needed. Please, validate your current Intercom plan to continue using API."
retriever:
description: "Base Retriever for Full Refresh streams"
record_selector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.streams import Stream
from source_intercom.components import (
HttpRequesterWithRateLimiter,
IncrementalSingleSliceCursor,
IncrementalSubstreamSlicerCursor,
IntercomHttpRequester,
IntercomRateLimiter,
)

Expand All @@ -37,7 +37,7 @@ def get_requester():

config = {"url": "https://airbyte.io"}

return HttpRequesterWithRateLimiter(
return IntercomHttpRequester(
name="stream_name",
url_base=InterpolatedString.create("{{ config['url'] }}", parameters={}),
path=InterpolatedString.create("v1/{{ stream_slice['id'] }}", parameters={}),
Expand Down
Loading
Loading