Skip to content

Commit

Permalink
✨ Source Stripe: Events stream concurrent on incremental syncs (#34619)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Feb 6, 2024
1 parent 20bc4b4 commit a28aab9
Show file tree
Hide file tree
Showing 23 changed files with 176 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ acceptance_tests:
basic_read:
tests:
- config_path: "secrets/config.json"
fail_on_extra_columns: false # CATs are failing since https://github.com/airbytehq/airbyte/commit/dccb2fa7165f031fa1233d695897b07f9aacb39c, API Source team to fix this
timeout_seconds: 3600
empty_streams:
- name: "application_fees"
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.2.1
dockerImageTag: 5.2.2
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk==0.59.1", "stripe==2.56.0", "pendulum==2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk==0.60.1", "stripe==2.56.0", "pendulum==2.1.2"]

# we set `requests-mock~=1.11.0` to ensure concurrency is supported
TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock~=1.11.0", "requests_mock~=1.8", "freezegun==1.2.2"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceStripe(
SourceStripe.read_catalog(catalog_path) if catalog_path else None,
SourceStripe.read_config(config_path) if config_path else None,
SourceStripe.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Comparable, ConcurrentCursor, CursorField, NoopCursor
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import SyncMode
Expand All @@ -42,15 +45,23 @@
_MAX_CONCURRENCY = 20
_DEFAULT_CONCURRENCY = 10
_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
_REFUND_STREAM_NAME = "refunds"
_INCREMENTAL_CONCURRENCY_EXCLUSION = {
_REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332
}
USE_CACHE = not _CACHE_DISABLED
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"


class SourceStripe(ConcurrentSourceAdapter):

message_repository = InMemoryMessageRepository(entrypoint_logger.level)
_SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = {
Events: ("created[gte]", "created[lte]"),
CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"),
}

def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
if config:
concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY)
else:
Expand All @@ -60,6 +71,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
concurrency_level, concurrency_level // 2, logger, self._slice_logger, self.message_repository
)
super().__init__(concurrent_source)
self._state = state
if catalog:
self._streams_configured_as_full_refresh = {
configured_stream.stream.name
Expand All @@ -71,9 +83,8 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional
self._streams_configured_as_full_refresh = set()

@staticmethod
def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
start_date, lookback_window_days, slice_range = (
config.get("start_date"),
def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
lookback_window_days, slice_range = (
config.get("lookback_window_days"),
config.get("slice_range"),
)
Expand All @@ -86,9 +97,9 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
internal_message=message,
failure_type=FailureType.config_error,
)
if start_date:
# verifies the start_date is parseable
SourceStripe._start_date_to_timestamp(start_date)

# verifies the start_date in the config is valid
SourceStripe._start_date_to_timestamp(config)
if slice_range is None:
config["slice_range"] = 365
elif not isinstance(slice_range, int) or slice_range < 1:
Expand All @@ -100,7 +111,7 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping:
)
return config

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]:
self.validate_and_fill_with_defaults(config)
stripe.api_key = config["client_secret"]
try:
Expand Down Expand Up @@ -167,14 +178,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:

return HttpAPIBudget(policies=policies)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])

if "start_date" in config:
start_timestamp = self._start_date_to_timestamp(config["start_date"])
else:
start_timestamp = pendulum.datetime(2017, 1, 25).int_timestamp
start_timestamp = self._start_date_to_timestamp(config)
args = {
"authenticator": authenticator,
"account_id": config["account_id"],
Expand Down Expand Up @@ -289,7 +297,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs.
# Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe.
# See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428
CreatedCursorIncrementalStripeStream(name="refunds", path="refunds", **incremental_args),
CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args),
UpdatedCursorIncrementalStripeStream(
name="payment_methods",
path="payment_methods",
Expand Down Expand Up @@ -511,21 +519,44 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
),
]

return [
StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())
if stream.name in self._streams_configured_as_full_refresh
else stream
for stream in streams
]
state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state)
return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams]

def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream:
if stream.name in self._streams_configured_as_full_refresh:
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor())

state = state_manager.get_stream_state(stream.name, stream.namespace)
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION:
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
converter = EpochValueConcurrentStreamStateConverter()
cursor = ConcurrentCursor(
stream.name,
stream.namespace,
state_manager.get_stream_state(stream.name, stream.namespace),
self.message_repository,
state_manager,
converter,
cursor_field,
slice_boundary_fields,
fallback_start,
)
return StreamFacade.create_from_stream(stream, self, entrypoint_logger, state, cursor)

return stream

def _create_empty_state(self) -> MutableMapping[str, Any]:
# The state is known to be empty because concurrent CDK is currently only used for full refresh
return {}

@staticmethod
def _start_date_to_timestamp(start_date: str) -> int:
def _start_date_to_timestamp(config: Mapping[str, Any]) -> int:
if "start_date" not in config:
return pendulum.datetime(2017, 1, 25).int_timestamp # type: ignore # pendulum not typed

start_date = config["start_date"]
try:
return pendulum.parse(start_date).int_timestamp
return pendulum.parse(start_date).int_timestamp # type: ignore # pendulum not typed
except pendulum.parsing.exceptions.ParserError as e:
message = f"Invalid start date {start_date}. Please use YYYY-MM-DDTHH:MM:SSZ format."
raise AirbyteTracedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import os

import pytest
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.test.state_builder import StateBuilder

os.environ["CACHE_DISABLED"] = "true"
os.environ["DEPLOYMENT_MODE"] = "testing"
Expand Down Expand Up @@ -40,10 +42,14 @@ def stream_by_name(config):
from source_stripe.source import SourceStripe

def mocker(stream_name, source_config=config):
source = SourceStripe(None, source_config)
source = SourceStripe(None, source_config, StateBuilder().build())
streams = source.streams(source_config)
for stream in streams:
if stream.name == stream_name:
if isinstance(stream, StreamFacade):
# to avoid breaking changes for tests, we will return the legacy test. Tests that would be affected by not having this
# would probably need to be moved to integration tests or unit tests
return stream._legacy_stream
return stream

return mocker
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand All @@ -19,7 +19,7 @@
find_template,
)
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode
from airbyte_protocol.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType, SyncMode
from integration.config import ConfigBuilder
from integration.pagination import StripePaginationStrategy
from integration.request_builder import StripeRequestBuilder
Expand Down Expand Up @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -110,12 +110,12 @@ def _given_events_availability_check(http_mocker: HttpMocker) -> None:
def _read(
config_builder: ConfigBuilder,
sync_mode: SyncMode,
state: Optional[Dict[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
expecting_exception: bool = False
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
Expand Down Expand Up @@ -372,5 +372,5 @@ def test_given_state_earlier_than_30_days_when_read_then_query_events_using_type
def _an_application_fee_event(self) -> RecordBuilder:
return _an_event().with_field(_DATA_FIELD, _an_application_fee().build())

def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput:
def _read(self, config: ConfigBuilder, state: Optional[List[AirbyteStateMessage]], expecting_exception: bool = False) -> EntrypointOutput:
return _read(config, SyncMode.incremental, state, expecting_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -62,8 +63,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -143,7 +144,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _assert_not_available(output: EntrypointOutput) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -115,7 +115,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
Expand Down Expand Up @@ -66,8 +67,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe:
return SourceStripe(catalog, config)
def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
Expand Down Expand Up @@ -146,7 +147,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config), config, catalog, state, expecting_exception)
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _assert_not_available(output: EntrypointOutput) -> None:
Expand Down
Loading

0 comments on commit a28aab9

Please sign in to comment.