diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 8f40faed76db..bc5f90db6dea 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 4.5.1 + dockerImageTag: 4.5.2 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index d8193c1796f9..9ebd06df8026 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import os from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -21,7 +22,6 @@ CreatedCursorIncrementalStripeStream, CustomerBalanceTransactions, Events, - FilteringRecordExtractor, IncrementalStripeStream, Persons, SetupAttempts, @@ -33,6 +33,8 @@ ) _MAX_CONCURRENCY = 3 +_CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +USE_CACHE = not _CACHE_DISABLED class SourceStripe(AbstractSource): @@ -97,6 +99,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return False, str(e) return True, None + @staticmethod + def customers(**args): + # The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code. + # It can be used with and without expanded items (as an independent stream or as a parent stream for other streams). + return IncrementalStripeStream( + name="customers", + path="customers", + use_cache=USE_CACHE, + event_types=["customer.created", "customer.updated", "customer.deleted"], + **args, + ) + def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) @@ -110,7 +124,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: subscriptions = IncrementalStripeStream( name="subscriptions", path="subscriptions", - use_cache=True, + use_cache=USE_CACHE, extra_request_params={"status": "all"}, event_types=[ "customer.subscription.created", @@ -127,9 +141,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: subscription_items = StripeLazySubStream( name="subscription_items", path="subscription_items", - extra_request_params=lambda self, *args, stream_slice, **kwargs: {"subscription": stream_slice[self.parent_id]}, + extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]}, parent=subscriptions, - use_cache=True, + use_cache=USE_CACHE, parent_id="subscription_id", sub_items_attr="items", **args, @@ -137,28 +151,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: transfers = IncrementalStripeStream( name="transfers", path="transfers", - use_cache=True, + use_cache=USE_CACHE, event_types=["transfer.created", "transfer.reversed", "transfer.updated"], **args, ) application_fees = IncrementalStripeStream( name="application_fees", path="application_fees", - use_cache=True, + use_cache=USE_CACHE, event_types=["application_fee.created", "application_fee.refunded"], **args, ) - customers = IncrementalStripeStream( - name="customers", - path="customers", - use_cache=True, - event_types=["customer.created", "customer.updated", "customer.deleted"], - **args, - ) invoices = IncrementalStripeStream( name="invoices", path="invoices", - use_cache=True, + use_cache=USE_CACHE, event_types=[ "invoice.created", "invoice.finalization_failed", @@ -185,7 +192,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], legacy_cursor_field=None, extra_request_params={"object": "card"}, - record_extractor=FilteringRecordExtractor("updated", None, "card"), + response_filter=lambda record: record["object"] == "card", **args, ), UpdatedCursorIncrementalStripeStream( @@ -194,12 +201,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], legacy_cursor_field=None, extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), + response_filter=lambda record: record["object"] == "bank_account", **args, ), Persons(**args), SetupAttempts(**incremental_args), - StripeStream(name="accounts", path="accounts", use_cache=True, **args), + StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args), CreatedCursorIncrementalStripeStream(name="shipping_rates", path="shipping_rates", **incremental_args), CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args), CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args), @@ -207,7 +214,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UpdatedCursorIncrementalStripeStream( name="checkout_sessions", path="checkout/sessions", - use_cache=True, + use_cache=USE_CACHE, legacy_cursor_field="expires_at", event_types=[ "checkout.session.async_payment_failed", @@ -246,7 +253,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["issuing_authorization.created", "issuing_authorization.request", "issuing_authorization.updated"], **args, ), - customers, + self.customers(**args), IncrementalStripeStream( name="cardholders", path="issuing/cardholders", @@ -334,7 +341,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), transfers, IncrementalStripeStream( - name="refunds", path="refunds", use_cache=True, event_types=["refund.created", "refund.updated"], **args + name="refunds", path="refunds", use_cache=USE_CACHE, event_types=["refund.created", "refund.updated"], **args ), IncrementalStripeStream( name="payment_intents", @@ -397,19 +404,18 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UpdatedCursorIncrementalStripeLazySubStream( name="bank_accounts", path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", - parent=customers, + parent=self.customers(expand_items=["data.sources"], **args), event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"], legacy_cursor_field=None, parent_id="customer_id", sub_items_attr="sources", - response_filter={"attr": "object", "value": "bank_account"}, extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), + response_filter=lambda record: record["object"] == "bank_account", **args, ), StripeLazySubStream( name="invoice_line_items", - path=lambda self, *args, stream_slice, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", + path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", parent=invoices, parent_id="invoice_id", sub_items_attr="lines", diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index ad7357658a34..460059716a75 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -4,6 +4,7 @@ import copy import math +import os from abc import ABC, abstractmethod from itertools import chain from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -18,42 +19,47 @@ from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy STRIPE_API_VERSION = "2022-11-15" +CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +USE_CACHE = not CACHE_DISABLED class IRecordExtractor(ABC): @abstractmethod - def extract_records(self, response: requests.Response) -> Iterable[Mapping]: + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[Mapping]: pass class DefaultRecordExtractor(IRecordExtractor): - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - response_json = response.json() - yield from response_json.get("data", []) + def __init__(self, response_filter: Optional[Callable] = None): + self._response_filter = response_filter or (lambda x: x) + + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: + yield from filter(self._response_filter, records) class EventRecordExtractor(DefaultRecordExtractor): - def __init__(self, cursor_field: str): + def __init__(self, cursor_field: str, response_filter: Optional[Callable] = None): + super().__init__(response_filter) self.cursor_field = cursor_field - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) - # set the record updated date = date of event creation + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: for record in records: item = record["data"]["object"] item[self.cursor_field] = record["created"] if record["type"].endswith(".deleted"): item["is_deleted"] = True - yield item + if self._response_filter(item): + yield item class UpdatedCursorIncrementalRecordExtractor(DefaultRecordExtractor): - def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str]): + def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str], response_filter: Optional[Callable] = None): + super().__init__(response_filter) self.cursor_field = cursor_field self.legacy_cursor_field = legacy_cursor_field - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: + records = super().extract_records(records) for record in records: if self.cursor_field in record: yield record @@ -66,18 +72,6 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin yield record | {self.cursor_field: current_cursor_value} -class FilteringRecordExtractor(UpdatedCursorIncrementalRecordExtractor): - def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str], object_type: str): - super().__init__(cursor_field, legacy_cursor_field) - self.object_type = object_type - - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) - for record in records: - if record["object"] == self.object_type: - yield record - - class StripeStream(HttpStream, ABC): url_base = "https://api.stripe.com/v1/" DEFAULT_SLICE_RANGE = 365 @@ -131,13 +125,14 @@ def __init__( use_cache: bool = False, expand_items: Optional[List[str]] = None, extra_request_params: Optional[Union[Mapping[str, Any], Callable]] = None, + response_filter: Optional[Callable] = None, primary_key: Optional[str] = "id", **kwargs, ): self.account_id = account_id self.start_date = start_date self.slice_range = slice_range or self.DEFAULT_SLICE_RANGE - self._record_extractor = record_extractor or DefaultRecordExtractor() + self._record_extractor = record_extractor or DefaultRecordExtractor(response_filter) self._name = name self._path = path self._use_cache = use_cache @@ -159,7 +154,10 @@ def request_params( next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: # Stripe default pagination is 10, max is 100 - params = {"limit": 100, **self.extra_request_params(stream_state, stream_slice, next_page_token)} + params = { + "limit": 100, + **self.extra_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + } if self.expand_items: params["expand[]"] = self.expand_items # Handle pagination by inserting the next page's token in the request parameters @@ -176,7 +174,7 @@ def parse_response( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: - yield from self.record_extractor.extract_records(response) + yield from self.record_extractor.extract_records(response.json().get("data", [])) def request_headers(self, **kwargs) -> Mapping[str, Any]: headers = {"Stripe-Version": STRIPE_API_VERSION} @@ -321,12 +319,15 @@ def __init__( legacy_cursor_field: Optional[str] = "created", event_types: Optional[List[str]] = None, record_extractor: Optional[IRecordExtractor] = None, + response_filter: Optional[Callable] = None, **kwargs, ): self._event_types = event_types self._cursor_field = cursor_field self._legacy_cursor_field = legacy_cursor_field - record_extractor = record_extractor or UpdatedCursorIncrementalRecordExtractor(self.cursor_field, self.legacy_cursor_field) + record_extractor = record_extractor or UpdatedCursorIncrementalRecordExtractor( + self.cursor_field, self.legacy_cursor_field, response_filter + ) super().__init__(*args, record_extractor=record_extractor, **kwargs) # `lookback_window_days` is hardcoded as it does not make any sense to re-export events, # as each event holds the latest value of a record. @@ -340,7 +341,7 @@ def __init__( slice_range=self.slice_range, event_types=self.event_types, cursor_field=self.cursor_field, - record_extractor=EventRecordExtractor(cursor_field=self.cursor_field), + record_extractor=EventRecordExtractor(cursor_field=self.cursor_field, response_filter=response_filter), ) def update_cursor_field(self, stream_state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: @@ -486,7 +487,7 @@ def checkout_session(self): return UpdatedCursorIncrementalStripeStream( name="checkout_sessions", path="checkout/sessions", - use_cache=True, + use_cache=USE_CACHE, legacy_cursor_field="expires_at", event_types=[ "checkout.session.async_payment_failed", @@ -518,7 +519,10 @@ def request_params( next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: # override to not refer to slice values - params = {"limit": 100, **self.extra_request_params(stream_state, stream_slice, next_page_token)} + params = { + "limit": 100, + **self.extra_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + } if self.expand_items: params["expand[]"] = self.expand_items if next_page_token: @@ -574,7 +578,7 @@ def customers(self) -> IncrementalStripeStream: return IncrementalStripeStream( name="customers", path="customers", - use_cache=True, + use_cache=USE_CACHE, event_types=["customer.created", "customer.updated", "customer.deleted"], authenticator=self.authenticator, account_id=self.account_id, @@ -658,7 +662,7 @@ class Persons(UpdatedCursorIncrementalStripeStream, HttpSubStream): event_types = ["person.created", "person.updated", "person.deleted"] def __init__(self, *args, **kwargs): - parent = StripeStream(*args, name="accounts", path="accounts", use_cache=True, **kwargs) + parent = StripeStream(*args, name="accounts", path="accounts", use_cache=USE_CACHE, **kwargs) super().__init__(*args, parent=parent, **kwargs) def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): @@ -717,10 +721,6 @@ class StripeLazySubStream(StripeStream, HttpSubStream): } """ - @property - def filter(self) -> Optional[Mapping[str, Any]]: - return self._filter - @property def add_parent_id(self) -> bool: return self._add_parent_id @@ -743,14 +743,12 @@ def sub_items_attr(self) -> str: def __init__( self, *args, - response_filter: Optional[Mapping[str, Any]] = None, add_parent_id: bool = False, parent_id: Optional[str] = None, sub_items_attr: Optional[str] = None, **kwargs, ): super().__init__(*args, **kwargs) - self._filter = response_filter self._add_parent_id = add_parent_id self._parent_id = parent_id self._sub_items_attr = sub_items_attr @@ -774,9 +772,7 @@ def read_records(self, sync_mode: SyncMode, stream_slice: Optional[Mapping[str, if not items_obj: return - items = items_obj.get("data", []) - if self.filter: - items = [i for i in items if i.get(self.filter["attr"]) == self.filter["value"]] + items = list(self.record_extractor.extract_records(items_obj.get("data", []))) # get next pages items_next_pages = [] @@ -811,7 +807,7 @@ def __init__( parent_id: Optional[str] = None, add_parent_id: bool = False, sub_items_attr: Optional[str] = None, - response_filter: Optional[Mapping[str, Any]] = None, + response_filter: Optional[Callable] = None, **kwargs, ): super().__init__(*args, **kwargs) @@ -821,6 +817,7 @@ def __init__( cursor_field=cursor_field, legacy_cursor_field=legacy_cursor_field, event_types=event_types, + response_filter=response_filter, **kwargs, ) self.lazy_substream = StripeLazySubStream( @@ -829,7 +826,9 @@ def __init__( parent_id=parent_id, add_parent_id=add_parent_id, sub_items_attr=sub_items_attr, - response_filter=response_filter, + record_extractor=UpdatedCursorIncrementalRecordExtractor( + cursor_field=cursor_field, legacy_cursor_field=legacy_cursor_field, response_filter=response_filter + ), **kwargs, ) self._parent_stream = None diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 9877fb85efc8..5eb8e515068a 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -6,9 +6,8 @@ import pytest from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from source_stripe.streams import IncrementalStripeStream, StripeLazySubStream -os.environ["REQUEST_CACHE_PATH"] = "REQUEST_CACHE_PATH" +os.environ["CACHE_DISABLED"] = "true" @pytest.fixture(name="config") @@ -34,46 +33,16 @@ def incremental_args_fixture(stream_args): return {"lookback_window_days": 14, **stream_args} -@pytest.fixture(name="invoices") -def invoices_fixture(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="invoices", - path="invoices", - use_cache=False, - event_types=[ - "invoice.created", - "invoice.finalization_failed", - "invoice.finalized", - "invoice.marked_uncollectible", - "invoice.paid", - "invoice.payment_action_required", - "invoice.payment_failed", - "invoice.payment_succeeded", - "invoice.sent", - "invoice.upcoming", - "invoice.updated", - "invoice.voided", - ], - **args, - ) +@pytest.fixture() +def stream_by_name(config): + # use local import in favour of global because we need to make imports after setting the env variables + from source_stripe.source import SourceStripe - return mocker - - -@pytest.fixture(name="invoice_line_items") -def invoice_line_items_fixture(invoices, stream_args): - parent_stream = invoices() - - def mocker(args=stream_args, parent_stream=parent_stream): - return StripeLazySubStream( - name="invoice_line_items", - path=lambda self, *args, stream_slice, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", - parent=parent_stream, - parent_id="invoice_id", - sub_items_attr="lines", - add_parent_id=True, - **args, - ) + def mocker(stream_name, source_config=config): + source = SourceStripe() + streams = source.streams(source_config) + for stream in streams: + if stream.name == stream_name: + return stream return mocker diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py index d9382de2be1f..0f747acac434 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py @@ -87,13 +87,13 @@ def test_traverse_over_substreams_failure(mocker): assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1) -def test_substream_availability(mocker, invoice_line_items): +def test_substream_availability(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock ) - stream = invoice_line_items() + stream = stream_by_name("invoice_line_items") is_available, reason = stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock()) assert is_available and reason is None @@ -102,13 +102,13 @@ def test_substream_availability(mocker, invoice_line_items): assert isinstance(check_availability_mock.call_args_list[1].args[0], StripeLazySubStream) -def test_substream_availability_no_parent(mocker, invoice_line_items): +def test_substream_availability_no_parent(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock ) - stream = invoice_line_items() + stream = stream_by_name("invoice_line_items") stream.parent = None stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock()) @@ -117,8 +117,8 @@ def test_substream_availability_no_parent(mocker, invoice_line_items): assert isinstance(check_availability_mock.call_args_list[0].args[0], StripeLazySubStream) -def test_403_error_handling(invoices, requests_mock): - stream = invoices() +def test_403_error_handling(stream_by_name, requests_mock): + stream = stream_by_name("invoices") logger = logging.getLogger("airbyte") for error_code in STRIPE_ERROR_CODES: requests_mock.get(f"{stream.url_base}{stream.path()}", status_code=403, json={"error": {"code": f"{error_code}"}}) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 55cba5aa9aa5..143331e06992 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -2,180 +2,178 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from urllib.parse import urlencode + import freezegun import pendulum import pytest -from source_stripe.streams import ( - CheckoutSessionsLineItems, - CreatedCursorIncrementalStripeStream, - CustomerBalanceTransactions, - FilteringRecordExtractor, - IncrementalStripeStream, - Persons, - SetupAttempts, - StripeStream, - UpdatedCursorIncrementalStripeLazySubStream, - UpdatedCursorIncrementalStripeStream, -) - - -@pytest.fixture() -def accounts(stream_args): - def mocker(args=stream_args): - return StripeStream(name="accounts", path="accounts", **args) - - return mocker - - -@pytest.fixture() -def balance_transactions(incremental_stream_args): - def mocker(args=incremental_stream_args): - return CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **args) - - return mocker - - -@pytest.fixture() -def credit_notes(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="credit_notes", - path="credit_notes", - event_types=["credit_note.created", "credit_note.updated", "credit_note.voided"], - **args, - ) - - return mocker - - -@pytest.fixture() -def customers(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="customers", - path="customers", - use_cache=False, - event_types=["customer.created", "customer.updated"], - **args, - ) - - return mocker +from source_stripe.streams import CheckoutSessionsLineItems, CustomerBalanceTransactions, Persons, SetupAttempts -@pytest.fixture() -def bank_accounts(customers, stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeLazySubStream( - name="bank_accounts", - path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", - parent=customers(), - event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated"], - legacy_cursor_field=None, - parent_id="customer_id", - sub_items_attr="sources", - response_filter={"attr": "object", "value": "bank_account"}, - extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), - **args, - ) - - return mocker - - -@pytest.fixture() -def external_bank_accounts(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="external_account_bank_accounts", - path=lambda self, *args, **kwargs: f"accounts/{self.account_id}/external_accounts", - event_types=["account.external_account.created", "account.external_account.updated"], - legacy_cursor_field=None, - extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), - **args, - ) - - return mocker +def read_from_stream(stream, sync_mode, state): + records = [] + for slice_ in stream.stream_slices(sync_mode=sync_mode, stream_state=state): + for record in stream.read_records(sync_mode=sync_mode, stream_slice=slice_, stream_state=state): + records.append(record) + return records -def test_request_headers(accounts): - stream = accounts() +def test_request_headers(stream_by_name): + stream = stream_by_name("accounts") headers = stream.request_headers() assert headers["Stripe-Version"] == "2022-11-15" -def test_lazy_sub_stream(requests_mock, invoice_line_items, invoices, stream_args): - # First initial request to parent stream - requests_mock.get( - "https://api.stripe.com/v1/invoices", - json={ +bank_accounts_full_refresh_test_case = ( + { + "https://api.stripe.com/v1/customers?expand%5B%5D=data.sources": { "has_more": False, "object": "list", - "url": "/v1/checkout/sessions", + "url": "/v1/customers", "data": [ { "created": 1641038947, - "customer": "cus_HezytZRkaQJC8W", - "id": "in_1KD6OVIEn5WyEQxn9xuASHsD", - "object": "invoice", + "id": "cus_HezytZRkaQJC8W", + "object": "customer", "total": 1, - "lines": { + "sources": { "data": [ { - "id": "il_1", - "object": "line_item", + "id": "cs_1", + "object": "card", }, { - "id": "il_2", - "object": "line_item", + "id": "cs_2", + "object": "bank_account", }, ], "has_more": True, "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", }, } ], }, - ) - - # Second pagination request to main stream - requests_mock.get( - "https://api.stripe.com/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", - json={ + "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources?object=bank_account&starting_after=cs_2": { "data": [ { - "id": "il_3", - "object": "line_item", + "id": "cs_3", + "object": "card", + }, + { + "id": "cs_4", + "object": "bank_account", }, ], "has_more": False, "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", }, - ) + }, + "bank_accounts", + [ + {"id": "cs_2", "object": "bank_account", "updated": 1692802815}, + {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, + ], + "full_refresh", + {}, +) - # make start date a recent date so there's just one slice in a parent stream - stream_args["start_date"] = pendulum.today().subtract(days=3).int_timestamp - parent_stream = invoices(stream_args) - stream = invoice_line_items(stream_args, parent_stream=parent_stream) - records = [] - for slice_ in stream.stream_slices(sync_mode="full_refresh"): - records.extend(stream.read_records(sync_mode="full_refresh", stream_slice=slice_)) - assert list(records) == [ - {"id": "il_1", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_2", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_3", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - ] +bank_accounts_incremental_test_case = ( + { + "https://api.stripe.com/v1/events?types%5B%5D=customer.source.created&types%5B%5D=customer.source.expiring&types" + "%5B%5D=customer.source.updated&types%5B%5D=customer.source.deleted": { + "data": [ + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802016, + "data": {"object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "type": "customer.source.created", + }, + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802017, + "data": {"object": {"object": "card", "card": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "type": "customer.source.updated", + }, + ], + "has_more": False, + } + }, + "bank_accounts", + [{"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}], + "incremental", + {"updated": 1692802015}, +) +@pytest.mark.parametrize( + "requests_mock_map, stream_cls, expected_records, sync_mode, state", + (bank_accounts_incremental_test_case, bank_accounts_full_refresh_test_case), +) @freezegun.freeze_time("2023-08-23T15:00:15Z") -def test_created_cursor_incremental_stream(requests_mock, balance_transactions, incremental_stream_args): - incremental_stream_args["start_date"] = pendulum.now().subtract(months=23).int_timestamp - stream = balance_transactions(incremental_stream_args) +def test_lazy_substream_data_cursor_value_is_populated( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state +): + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name(stream_cls, config) + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) + + records = read_from_stream(stream, sync_mode, state) + assert records == expected_records + for record in records: + assert bool(record[stream.cursor_field]) + + +@pytest.mark.parametrize("requests_mock_map, stream_cls, expected_records, sync_mode, state", (bank_accounts_full_refresh_test_case,)) +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_lazy_substream_data_is_expanded( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state +): + + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name("bank_accounts", config) + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) + + records = read_from_stream(stream, sync_mode, state) + + assert list(records) == expected_records + assert len(requests_mock.request_history) == 2 + assert urlencode({"expand[]": "data.sources"}) in requests_mock.request_history[0].url + + +@pytest.mark.parametrize( + "requests_mock_map, stream_cls, expected_records, sync_mode, state, expected_object", + ((*bank_accounts_full_refresh_test_case, "bank_account"), (*bank_accounts_incremental_test_case, "bank_account")), +) +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_lazy_substream_data_is_filtered( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state, expected_object +): + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name(stream_cls, config) + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) + + records = read_from_stream(stream, sync_mode, state) + assert records == expected_records + for record in records: + assert record["object"] == expected_object + + +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_created_cursor_incremental_stream(requests_mock, stream_by_name, config): + config["start_date"] = str(pendulum.now().subtract(months=23)) + stream = stream_by_name("balance_transactions", {"lookback_window_days": 14, **config}) requests_mock.get( "/v1/balance_transactions", [ @@ -223,18 +221,18 @@ def test_created_cursor_incremental_stream(requests_mock, balance_transactions, ) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_get_start_timestamp( - balance_transactions, incremental_stream_args, start_date, lookback_window, max_days_from_now, stream_state, expected_start_timestamp + stream_by_name, config, start_date, lookback_window, max_days_from_now, stream_state, expected_start_timestamp ): - incremental_stream_args["start_date"] = pendulum.parse(start_date).int_timestamp - incremental_stream_args["lookback_window_days"] = lookback_window - incremental_stream_args["start_date_max_days_from_now"] = max_days_from_now - stream = balance_transactions(incremental_stream_args) + config["start_date"] = start_date + config["lookback_window_days"] = lookback_window + stream = stream_by_name("balance_transactions", config) + stream.start_date_max_days_from_now = max_days_from_now assert stream.get_start_timestamp(stream_state) == pendulum.parse(expected_start_timestamp).int_timestamp @pytest.mark.parametrize("sync_mode", ("full_refresh", "incremental")) -def test_updated_cursor_incremental_stream_slices(credit_notes, sync_mode): - stream = credit_notes() +def test_updated_cursor_incremental_stream_slices(stream_by_name, sync_mode): + stream = stream_by_name("credit_notes") assert list(stream.stream_slices(sync_mode)) == [{}] @@ -242,13 +240,13 @@ def test_updated_cursor_incremental_stream_slices(credit_notes, sync_mode): "last_record, stream_state, expected_state", (({"updated": 110}, {"updated": 111}, {"updated": 111}), ({"created": 110}, {"updated": 111}, {"updated": 111})), ) -def test_updated_cursor_incremental_stream_get_updated_state(credit_notes, last_record, stream_state, expected_state): - stream = credit_notes() +def test_updated_cursor_incremental_stream_get_updated_state(stream_by_name, last_record, stream_state, expected_state): + stream = stream_by_name("credit_notes") assert stream.get_updated_state(last_record, stream_state) == expected_state @pytest.mark.parametrize("sync_mode", ("full_refresh", "incremental")) -def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mode, credit_notes): +def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mode, stream_by_name): requests_mock.get( "/v1/credit_notes", [ @@ -275,7 +273,7 @@ def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mod } ], ) - stream = credit_notes() + stream = stream_by_name("credit_notes") records = [record for record in stream.read_records(sync_mode)] assert records == [ { @@ -298,7 +296,7 @@ def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mod @freezegun.freeze_time("2023-08-23T00:00:00") -def test_updated_cursor_incremental_stream_read_w_state(requests_mock, credit_notes): +def test_updated_cursor_incremental_stream_read_w_state(requests_mock, stream_by_name): requests_mock.get( "/v1/events", [ @@ -320,7 +318,7 @@ def test_updated_cursor_incremental_stream_read_w_state(requests_mock, credit_no ], ) - stream = credit_notes() + stream = stream_by_name("credit_notes") records = [ record for record in stream.read_records("incremental", stream_state={"updated": pendulum.parse("2023-01-01T15:00:15Z").int_timestamp}) @@ -490,11 +488,11 @@ def test_persons_w_state(requests_mock, stream_args): @pytest.mark.parametrize("sync_mode, stream_state", (("full_refresh", {}), ("incremental", {}), ("incremental", {"updated": 1693987430}))) -def test_cursorless_incremental_stream(requests_mock, external_bank_accounts, sync_mode, stream_state): +def test_cursorless_incremental_stream(requests_mock, stream_by_name, sync_mode, stream_state): # Testing streams that *only* have the cursor field value in incremental mode because of API discrepancies, # e.g. /bank_accounts does not return created/updated date, however /events?type=bank_account.updated returns the update date. # Key condition here is that the underlying stream has legacy cursor field set to None. - stream = external_bank_accounts() + stream = stream_by_name("external_account_bank_accounts") requests_mock.get( "/v1/accounts//external_accounts", json={ @@ -540,9 +538,9 @@ def test_cursorless_incremental_stream(requests_mock, external_bank_accounts, sy @pytest.mark.parametrize("sync_mode, stream_state", (("full_refresh", {}), ("incremental", {}), ("incremental", {"updated": 1693987430}))) -def test_cursorless_incremental_substream(requests_mock, bank_accounts, sync_mode, stream_state): +def test_cursorless_incremental_substream(requests_mock, stream_by_name, sync_mode, stream_state): # same for substreams - stream = bank_accounts() + stream = stream_by_name("bank_accounts") requests_mock.get( "/v1/customers", json={ @@ -581,9 +579,9 @@ def test_cursorless_incremental_substream(requests_mock, bank_accounts, sync_mod stream.get_updated_state(stream_state, record) -@pytest.mark.parametrize("stream", ("bank_accounts",)) -def test_get_updated_state(stream, request, requests_mock): - stream = request.getfixturevalue(stream)() +@pytest.mark.parametrize("stream_name", ("bank_accounts",)) +def test_get_updated_state(stream_name, stream_by_name, requests_mock): + stream = stream_by_name(stream_name) response = {"data": [{"id": 1, stream.cursor_field: 1695292083}]} requests_mock.get("/v1/credit_notes", json=response) requests_mock.get("/v1/balance_transactions", json=response) @@ -597,3 +595,76 @@ def test_get_updated_state(stream, request, requests_mock): for record in stream.read_records(sync_mode="incremental", stream_slice=slice_, stream_state=state): state = stream.get_updated_state(state, record) assert state + + +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_subscription_items_extra_request_params(requests_mock, stream_by_name, config): + requests_mock.get( + "/v1/subscriptions", + json={ + "object": "list", + "url": "/v1/subscriptions", + "has_more": False, + "data": [ + { + "id": "sub_1OApco2eZvKYlo2CEDCzwLrE", + "object": "subscription", + "created": 1699603174, + "items": { + "object": "list", + "data": [ + { + "id": "si_OynDmET1kQPTbI", + "object": "subscription_item", + "created": 1699603175, + "quantity": 1, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", + } + ], + "has_more": True, + }, + "latest_invoice": None, + "livemode": False, + } + ], + "has_more": False, + }, + ) + requests_mock.get( + "/v1/subscription_items?subscription=sub_1OApco2eZvKYlo2CEDCzwLrE", + json={ + "object": "list", + "url": "/v1/subscription_items", + "has_more": False, + "data": [ + { + "id": "si_OynPdzMZykmCWm", + "object": "subscription_item", + "created": 1699603884, + "quantity": 2, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", + } + ], + }, + ) + config["start_date"] = str(pendulum.now().subtract(days=3)) + stream = stream_by_name("subscription_items", config) + records = read_from_stream(stream, "full_refresh", {}) + assert records == [ + { + "id": "si_OynDmET1kQPTbI", + "object": "subscription_item", + "created": 1699603175, + "quantity": 1, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", + }, + { + "id": "si_OynPdzMZykmCWm", + "object": "subscription_item", + "created": 1699603884, + "quantity": 2, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", + }, + ] + assert len(requests_mock.request_history) == 2 + assert "subscription=sub_1OApco2eZvKYlo2CEDCzwLrE" in requests_mock.request_history[-1].url diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 0e04310595b7..30c101fac0d8 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,7 +192,8 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | +| 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | +| 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | | 4.5.0 | 2023-10-25 | [31327](https://github.com/airbytehq/airbyte/pull/31327/) | Use concurrent CDK when running in full-refresh | | 4.4.2 | 2023-10-24 | [31764](https://github.com/airbytehq/airbyte/pull/31764) | Base image migration: remove Dockerfile and use the python-connector-base image | | 4.4.1 | 2023-10-18 | [31553](https://github.com/airbytehq/airbyte/pull/31553) | Adjusted `Setup Attempts` and extended `Checkout Sessions` stream schemas |