Skip to content

Commit

Permalink
fix(source-intercom) raise config error on `Active subscription neede…
Browse files Browse the repository at this point in the history
…d` error, transient error for Companies stream (#42094)

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
darynaishchenko and octavia-squidington-iii authored Jul 29, 2024
1 parent 4628a2c commit 4dc4200
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.6.15
dockerImageTag: 0.6.16
dockerRepository: airbyte/source-intercom
documentationUrl: https://docs.airbyte.com/integrations/sources/intercom
githubIssueLabel: source-intercom
Expand Down
218 changes: 151 additions & 67 deletions airbyte-integrations/connectors/source-intercom/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.6.15"
version = "0.6.16"
name = "source-intercom"
description = "Source implementation for Intercom Yaml."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_intercom"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.86.3"
airbyte-cdk = "^3"

[tool.poetry.scripts]
source-intercom = "source_intercom.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,24 @@
from dataclasses import InitVar, dataclass, field
from functools import wraps
from time import sleep
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Iterable, List, Mapping, Optional, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental.cursor import Cursor
from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import (
InterpolatedNestedRequestInputProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_input_provider import InterpolatedRequestInputProvider
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution

RequestInput = Union[str, Mapping[str, str]]


@dataclass
class IncrementalSingleSliceCursor(Cursor):
class IncrementalSingleSliceCursor(DeclarativeCursor):
cursor_field: Union[InterpolatedString, str]
config: Config
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -82,6 +78,9 @@ def _get_request_option(self, option_type: RequestOptionType, stream_slice: Stre
def get_stream_state(self) -> StreamState:
return self._state

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
return self.get_stream_state()

def set_initial_state(self, stream_state: StreamState):
cursor_field = self.cursor_field.eval(self.config)
cursor_value = stream_state.get(cursor_field)
Expand Down Expand Up @@ -360,69 +359,17 @@ def wrapper_balance_rate_limit(*args, **kwargs):
return decorator


@dataclass(eq=False)
class HttpRequesterWithRateLimiter(HttpRequester):
class ErrorHandlerWithRateLimiter(DefaultErrorHandler):
"""
The difference between the built-in `HttpRequester` and this one is the custom decorator,
applied on top of `interpret_response_status` to preserve the api calls for a defined amount of time,
The difference between the built-in `DefaultErrorHandler` and this one is the custom decorator,
applied on top of `interpret_response` to preserve the api calls for a defined amount of time,
calculated using the rate limit headers and not use the custom backoff strategy,
since we deal with Response.status_code == 200,
the default requester's logic doesn't allow to handle the status of 200 with `should_retry()`.
"""

request_body_json: Optional[RequestInput] = None
request_headers: Optional[RequestInput] = None
request_parameters: Optional[RequestInput] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)

self.request_parameters = self.request_parameters if self.request_parameters else {}
self.request_headers = self.request_headers if self.request_headers else {}
self.request_body_json = self.request_body_json if self.request_body_json else {}

self._parameter_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_parameters, parameters=parameters
)
self._headers_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_headers, parameters=parameters
)
self._body_json_interpolator = InterpolatedNestedRequestInputProvider(
config=self.config, request_inputs=self.request_body_json, parameters=parameters
)

# The RateLimiter is applied to balance the api requests.
@IntercomRateLimiter.balance_rate_limit()
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
# Check for response.headers to define the backoff time before the next api call
return super().interpret_response_status(response)

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
return super().interpret_response(response_or_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ definitions:
field_path: ["{{ parameters.get('data_field', 'data')}}"]
requester:
description: "Base Requester for Full Refresh streams"
type: CustomRequester
class_name: source_intercom.components.HttpRequesterWithRateLimiter
type: HttpRequester
url_base: "https://api.intercom.io/"
http_method: "GET"
authenticator:
Expand All @@ -22,7 +21,14 @@ definitions:
Intercom-Version: "'2.10'"
Accept: "application/json"
error_handler:
type: "DefaultErrorHandler"
type: CustomErrorHandler
class_name: source_intercom.components.ErrorHandlerWithRateLimiter
response_filters:
- type: HttpResponseFilter
error_message_contains: "Active subscription needed"
action: FAIL
failure_type: config_error
error_message: "Failed to perform request. Error: Active subscription needed. Please, validate your current Intercom plan to continue using API."
retriever:
description: "Base Retriever for Full Refresh streams"
record_selector:
Expand Down Expand Up @@ -546,8 +552,13 @@ definitions:
to continue and retry 500 - server-side error, should retry after 60
sec. "
response_filters:
- http_codes: [400, 500]
- http_codes: [400]
action: RETRY
failure_type: transient_error
error_message: "A scroll (export) job is already in progress for this Intercom account, causing the request to fail. Only one active scroll per Intercom account is allowed; ensure no overlap by limiting active connections or scheduling jobs appropriately."
- http_codes: [500]
action: RETRY
failure_type: transient_error
backoff_strategies:
- type: ConstantBackoffStrategy
backoff_time_in_seconds: 60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,9 @@

import pytest
import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig
from airbyte_cdk.sources.streams import Stream
from source_intercom.components import (
HttpRequesterWithRateLimiter,
IncrementalSingleSliceCursor,
IncrementalSubstreamSlicerCursor,
IntercomRateLimiter,
)


def get_requester():
request_options_provider = MagicMock()
request_params = {"param": "value"}
request_body_data = "body_key_1=value_1&body_key_2=value2"
request_body_json = {"body_field": "body_value"}
request_options_provider.get_request_params.return_value = request_params
request_options_provider.get_request_body_data.return_value = request_body_data
request_options_provider.get_request_body_json.return_value = request_body_json

error_handler = MagicMock()
max_retries = 10
backoff_time = 1000
response_status = MagicMock()
response_status.retry_in.return_value = 10
error_handler.max_retries = max_retries
error_handler.interpret_response.return_value = response_status
error_handler.backoff_time.return_value = backoff_time

config = {"url": "https://airbyte.io"}

return HttpRequesterWithRateLimiter(
name="stream_name",
url_base=InterpolatedString.create("{{ config['url'] }}", parameters={}),
path=InterpolatedString.create("v1/{{ stream_slice['id'] }}", parameters={}),
http_method="GET",
request_options_provider=request_options_provider,
authenticator=MagicMock(),
error_handler=error_handler,
config=config,
parameters={},
)
from source_intercom.components import IncrementalSingleSliceCursor, IncrementalSubstreamSlicerCursor, IntercomRateLimiter


def test_slicer():
Expand Down Expand Up @@ -133,18 +94,3 @@ def interpret_response_status(self, response: requests.Response):

# Call a decorated method
requester.interpret_response_status(response)


def test_requester_get_request_params():
requester = get_requester()
assert {} == requester.get_request_params()


def test_requester_get_request_body_json():
requester = get_requester()
assert {} == requester.get_request_body_json()


def test_requester_get_request_headers():
requester = get_requester()
assert {} == requester.get_request_headers()
Loading

0 comments on commit 4dc4200

Please sign in to comment.