From a85401e7f6f5da0d2a8979e7b86bd6912686715a Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 3 Nov 2023 17:42:00 +0200 Subject: [PATCH 1/9] fix multiple stripe issues --- .../connectors/source-stripe/metadata.yaml | 2 +- .../source-stripe/source_stripe/source.py | 35 +- .../source-stripe/source_stripe/streams.py | 78 ++- .../source-stripe/unit_tests/conftest.py | 163 +++++- .../source-stripe/unit_tests/test_streams.py | 466 ++++++++++++------ docs/integrations/sources/stripe.md | 3 +- 6 files changed, 538 insertions(+), 209 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 8f40faed76db..bc5f90db6dea 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 4.5.1 + dockerImageTag: 4.5.2 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index d8193c1796f9..398811089503 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -21,7 +21,6 @@ CreatedCursorIncrementalStripeStream, CustomerBalanceTransactions, Events, - FilteringRecordExtractor, IncrementalStripeStream, Persons, SetupAttempts, @@ -97,6 +96,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return False, str(e) return True, None + @staticmethod + def customers(**args): + # The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code. + # It can be used with and without expanded items (as an independent stream or as a parent stream for other streams). + return IncrementalStripeStream( + name="customers", + path="customers", + use_cache=True, + event_types=["customer.created", "customer.updated", "customer.deleted"], + **args, + ) + def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) @@ -127,7 +138,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: subscription_items = StripeLazySubStream( name="subscription_items", path="subscription_items", - extra_request_params=lambda self, *args, stream_slice, **kwargs: {"subscription": stream_slice[self.parent_id]}, + extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]}, parent=subscriptions, use_cache=True, parent_id="subscription_id", @@ -148,13 +159,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["application_fee.created", "application_fee.refunded"], **args, ) - customers = IncrementalStripeStream( - name="customers", - path="customers", - use_cache=True, - event_types=["customer.created", "customer.updated", "customer.deleted"], - **args, - ) invoices = IncrementalStripeStream( name="invoices", path="invoices", @@ -185,7 +189,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], legacy_cursor_field=None, extra_request_params={"object": "card"}, - record_extractor=FilteringRecordExtractor("updated", None, "card"), + response_filter=lambda record: record["object"] == "card", **args, ), UpdatedCursorIncrementalStripeStream( @@ -194,7 +198,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], legacy_cursor_field=None, extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), + response_filter=lambda record: record["object"] == "bank_account", **args, ), Persons(**args), @@ -246,7 +250,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: event_types=["issuing_authorization.created", "issuing_authorization.request", "issuing_authorization.updated"], **args, ), - customers, + self.customers(**args), IncrementalStripeStream( name="cardholders", path="issuing/cardholders", @@ -397,19 +401,18 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UpdatedCursorIncrementalStripeLazySubStream( name="bank_accounts", path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", - parent=customers, + parent=self.customers(expand_items=["data.sources"], **args), event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"], legacy_cursor_field=None, parent_id="customer_id", sub_items_attr="sources", - response_filter={"attr": "object", "value": "bank_account"}, extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), + response_filter=lambda record: record["object"] == "bank_account", **args, ), StripeLazySubStream( name="invoice_line_items", - path=lambda self, *args, stream_slice, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", + path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", parent=invoices, parent_id="invoice_id", sub_items_attr="lines", diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index ad7357658a34..c500290c7b07 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -22,38 +22,41 @@ class IRecordExtractor(ABC): @abstractmethod - def extract_records(self, response: requests.Response) -> Iterable[Mapping]: + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[Mapping]: pass class DefaultRecordExtractor(IRecordExtractor): - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - response_json = response.json() - yield from response_json.get("data", []) + def __init__(self, response_filter: Optional[Callable] = None): + self._response_filter = response_filter or (lambda x: x) + + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: + yield from filter(self._response_filter, records) class EventRecordExtractor(DefaultRecordExtractor): - def __init__(self, cursor_field: str): + def __init__(self, cursor_field: str, response_filter: Optional[Callable] = None): + super().__init__(response_filter) self.cursor_field = cursor_field - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) - # set the record updated date = date of event creation + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: for record in records: item = record["data"]["object"] item[self.cursor_field] = record["created"] if record["type"].endswith(".deleted"): item["is_deleted"] = True - yield item + if self._response_filter(item): + yield item class UpdatedCursorIncrementalRecordExtractor(DefaultRecordExtractor): - def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str]): + def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str], response_filter: Optional[Callable] = None): + super().__init__(response_filter) self.cursor_field = cursor_field self.legacy_cursor_field = legacy_cursor_field - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) + def extract_records(self, records: Iterable[MutableMapping]) -> Iterable[MutableMapping]: + records = super().extract_records(records) for record in records: if self.cursor_field in record: yield record @@ -66,18 +69,6 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin yield record | {self.cursor_field: current_cursor_value} -class FilteringRecordExtractor(UpdatedCursorIncrementalRecordExtractor): - def __init__(self, cursor_field: str, legacy_cursor_field: Optional[str], object_type: str): - super().__init__(cursor_field, legacy_cursor_field) - self.object_type = object_type - - def extract_records(self, response: requests.Response) -> Iterable[MutableMapping]: - records = super().extract_records(response) - for record in records: - if record["object"] == self.object_type: - yield record - - class StripeStream(HttpStream, ABC): url_base = "https://api.stripe.com/v1/" DEFAULT_SLICE_RANGE = 365 @@ -131,13 +122,14 @@ def __init__( use_cache: bool = False, expand_items: Optional[List[str]] = None, extra_request_params: Optional[Union[Mapping[str, Any], Callable]] = None, + response_filter: Optional[Callable] = None, primary_key: Optional[str] = "id", **kwargs, ): self.account_id = account_id self.start_date = start_date self.slice_range = slice_range or self.DEFAULT_SLICE_RANGE - self._record_extractor = record_extractor or DefaultRecordExtractor() + self._record_extractor = record_extractor or DefaultRecordExtractor(response_filter) self._name = name self._path = path self._use_cache = use_cache @@ -159,7 +151,10 @@ def request_params( next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: # Stripe default pagination is 10, max is 100 - params = {"limit": 100, **self.extra_request_params(stream_state, stream_slice, next_page_token)} + params = { + "limit": 100, + **self.extra_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + } if self.expand_items: params["expand[]"] = self.expand_items # Handle pagination by inserting the next page's token in the request parameters @@ -176,7 +171,7 @@ def parse_response( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: - yield from self.record_extractor.extract_records(response) + yield from self.record_extractor.extract_records(response.json().get("data", [])) def request_headers(self, **kwargs) -> Mapping[str, Any]: headers = {"Stripe-Version": STRIPE_API_VERSION} @@ -321,12 +316,15 @@ def __init__( legacy_cursor_field: Optional[str] = "created", event_types: Optional[List[str]] = None, record_extractor: Optional[IRecordExtractor] = None, + response_filter: Optional[Callable] = None, **kwargs, ): self._event_types = event_types self._cursor_field = cursor_field self._legacy_cursor_field = legacy_cursor_field - record_extractor = record_extractor or UpdatedCursorIncrementalRecordExtractor(self.cursor_field, self.legacy_cursor_field) + record_extractor = record_extractor or UpdatedCursorIncrementalRecordExtractor( + self.cursor_field, self.legacy_cursor_field, response_filter + ) super().__init__(*args, record_extractor=record_extractor, **kwargs) # `lookback_window_days` is hardcoded as it does not make any sense to re-export events, # as each event holds the latest value of a record. @@ -340,7 +338,7 @@ def __init__( slice_range=self.slice_range, event_types=self.event_types, cursor_field=self.cursor_field, - record_extractor=EventRecordExtractor(cursor_field=self.cursor_field), + record_extractor=EventRecordExtractor(cursor_field=self.cursor_field, response_filter=response_filter), ) def update_cursor_field(self, stream_state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: @@ -518,7 +516,10 @@ def request_params( next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: # override to not refer to slice values - params = {"limit": 100, **self.extra_request_params(stream_state, stream_slice, next_page_token)} + params = { + "limit": 100, + **self.extra_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + } if self.expand_items: params["expand[]"] = self.expand_items if next_page_token: @@ -717,10 +718,6 @@ class StripeLazySubStream(StripeStream, HttpSubStream): } """ - @property - def filter(self) -> Optional[Mapping[str, Any]]: - return self._filter - @property def add_parent_id(self) -> bool: return self._add_parent_id @@ -743,14 +740,12 @@ def sub_items_attr(self) -> str: def __init__( self, *args, - response_filter: Optional[Mapping[str, Any]] = None, add_parent_id: bool = False, parent_id: Optional[str] = None, sub_items_attr: Optional[str] = None, **kwargs, ): super().__init__(*args, **kwargs) - self._filter = response_filter self._add_parent_id = add_parent_id self._parent_id = parent_id self._sub_items_attr = sub_items_attr @@ -774,9 +769,7 @@ def read_records(self, sync_mode: SyncMode, stream_slice: Optional[Mapping[str, if not items_obj: return - items = items_obj.get("data", []) - if self.filter: - items = [i for i in items if i.get(self.filter["attr"]) == self.filter["value"]] + items = list(self.record_extractor.extract_records(items_obj.get("data", []))) # get next pages items_next_pages = [] @@ -811,7 +804,7 @@ def __init__( parent_id: Optional[str] = None, add_parent_id: bool = False, sub_items_attr: Optional[str] = None, - response_filter: Optional[Mapping[str, Any]] = None, + response_filter: Optional[Callable] = None, **kwargs, ): super().__init__(*args, **kwargs) @@ -821,6 +814,7 @@ def __init__( cursor_field=cursor_field, legacy_cursor_field=legacy_cursor_field, event_types=event_types, + response_filter=response_filter, **kwargs, ) self.lazy_substream = StripeLazySubStream( @@ -829,7 +823,9 @@ def __init__( parent_id=parent_id, add_parent_id=add_parent_id, sub_items_attr=sub_items_attr, - response_filter=response_filter, + record_extractor=UpdatedCursorIncrementalRecordExtractor( + cursor_field=cursor_field, legacy_cursor_field=legacy_cursor_field, response_filter=response_filter + ), **kwargs, ) self._parent_stream = None diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 9877fb85efc8..fb21a40fe3d8 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -6,7 +6,14 @@ import pytest from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from source_stripe.streams import IncrementalStripeStream, StripeLazySubStream +from source_stripe.streams import ( + CreatedCursorIncrementalStripeStream, + IncrementalStripeStream, + StripeLazySubStream, + StripeStream, + UpdatedCursorIncrementalStripeLazySubStream, + UpdatedCursorIncrementalStripeStream, +) os.environ["REQUEST_CACHE_PATH"] = "REQUEST_CACHE_PATH" @@ -77,3 +84,157 @@ def mocker(args=stream_args, parent_stream=parent_stream): ) return mocker + + +@pytest.fixture() +def accounts(stream_args): + def mocker(args=stream_args): + return StripeStream(name="accounts", path="accounts", **args) + + return mocker + + +@pytest.fixture() +def balance_transactions(incremental_stream_args): + def mocker(args=incremental_stream_args): + return CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **args) + + return mocker + + +@pytest.fixture() +def credit_notes(stream_args): + def mocker(args=stream_args): + return UpdatedCursorIncrementalStripeStream( + name="credit_notes", + path="credit_notes", + event_types=["credit_note.created", "credit_note.updated", "credit_note.voided"], + **args, + ) + + return mocker + + +@pytest.fixture() +def customers(stream_args): + def mocker(args=stream_args): + return IncrementalStripeStream( + name="customers", + path="customers", + use_cache=False, + expand_items=["data.sources"], + event_types=["customer.created", "customer.updated"], + **args, + ) + + return mocker + + +@pytest.fixture() +def bank_accounts(customers, stream_args): + parent_stream = customers() + + def mocker(args=stream_args, parent_stream=parent_stream): + return UpdatedCursorIncrementalStripeLazySubStream( + name="bank_accounts", + path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", + parent=parent_stream, + event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"], + legacy_cursor_field=None, + parent_id="customer_id", + sub_items_attr="sources", + extra_request_params={"object": "bank_account"}, + response_filter=lambda record: record["object"] == "bank_account", + **args, + ) + + return mocker + + +@pytest.fixture() +def external_bank_accounts(stream_args): + def mocker(args=stream_args): + return UpdatedCursorIncrementalStripeStream( + name="external_account_bank_accounts", + path=lambda self, *args, **kwargs: f"accounts/{self.account_id}/external_accounts", + event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], + legacy_cursor_field=None, + extra_request_params={"object": "bank_account"}, + response_filter=lambda record: record["object"] == "bank_account", + **args, + ) + + return mocker + + +@pytest.fixture(name="subscriptions") +def subscription_fixture(stream_args): + def mocker(args=stream_args): + return IncrementalStripeStream( + name="subscriptions", + path="subscriptions", + use_cache=False, + extra_request_params={"status": "all"}, + event_types=[ + "customer.subscription.created", + "customer.subscription.paused", + "customer.subscription.pending_update_applied", + "customer.subscription.pending_update_expired", + "customer.subscription.resumed", + "customer.subscription.trial_will_end", + "customer.subscription.updated", + "customer.subscription.deleted", + ], + **args, + ) + return mocker + + +@pytest.fixture(name="subscription_items") +def subscription_items_fixture(subscriptions, stream_args): + parent_stream = subscriptions() + + def mocker(args=stream_args, parent_stream=parent_stream): + return StripeLazySubStream( + name="subscription_items", + path="subscription_items", + extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]}, + parent=parent_stream, + use_cache=False, + parent_id="subscription_id", + sub_items_attr="items", + **args, + ) + + return mocker + + +@pytest.fixture(name="application_fees") +def application_fees_fixture(stream_args): + def mocker(args=stream_args): + return IncrementalStripeStream( + name="application_fees", + path="application_fees", + use_cache=False, + event_types=["application_fee.created", "application_fee.refunded"], + **args, + ) + return mocker + + +@pytest.fixture(name="application_fees_refunds") +def application_fees_refunds_fixture(application_fees, stream_args): + parent_stream = application_fees() + + def mocker(args=stream_args, parent_stream=parent_stream): + return UpdatedCursorIncrementalStripeLazySubStream( + name="application_fees_refunds", + path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice[self.parent_id]}/refunds", + parent=parent_stream, + event_types=["application_fee.refund.updated"], + parent_id="refund_id", + sub_items_attr="refunds", + add_parent_id=True, + **args, + ) + return mocker diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 55cba5aa9aa5..6b8620e23a00 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -5,97 +5,7 @@ import freezegun import pendulum import pytest -from source_stripe.streams import ( - CheckoutSessionsLineItems, - CreatedCursorIncrementalStripeStream, - CustomerBalanceTransactions, - FilteringRecordExtractor, - IncrementalStripeStream, - Persons, - SetupAttempts, - StripeStream, - UpdatedCursorIncrementalStripeLazySubStream, - UpdatedCursorIncrementalStripeStream, -) - - -@pytest.fixture() -def accounts(stream_args): - def mocker(args=stream_args): - return StripeStream(name="accounts", path="accounts", **args) - - return mocker - - -@pytest.fixture() -def balance_transactions(incremental_stream_args): - def mocker(args=incremental_stream_args): - return CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **args) - - return mocker - - -@pytest.fixture() -def credit_notes(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="credit_notes", - path="credit_notes", - event_types=["credit_note.created", "credit_note.updated", "credit_note.voided"], - **args, - ) - - return mocker - - -@pytest.fixture() -def customers(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="customers", - path="customers", - use_cache=False, - event_types=["customer.created", "customer.updated"], - **args, - ) - - return mocker - - -@pytest.fixture() -def bank_accounts(customers, stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeLazySubStream( - name="bank_accounts", - path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", - parent=customers(), - event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated"], - legacy_cursor_field=None, - parent_id="customer_id", - sub_items_attr="sources", - response_filter={"attr": "object", "value": "bank_account"}, - extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), - **args, - ) - - return mocker - - -@pytest.fixture() -def external_bank_accounts(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="external_account_bank_accounts", - path=lambda self, *args, **kwargs: f"accounts/{self.account_id}/external_accounts", - event_types=["account.external_account.created", "account.external_account.updated"], - legacy_cursor_field=None, - extra_request_params={"object": "bank_account"}, - record_extractor=FilteringRecordExtractor("updated", None, "bank_account"), - **args, - ) - - return mocker +from source_stripe.streams import CheckoutSessionsLineItems, CustomerBalanceTransactions, Persons, SetupAttempts def test_request_headers(accounts): @@ -104,72 +14,330 @@ def test_request_headers(accounts): assert headers["Stripe-Version"] == "2022-11-15" -def test_lazy_sub_stream(requests_mock, invoice_line_items, invoices, stream_args): - # First initial request to parent stream - requests_mock.get( - "https://api.stripe.com/v1/invoices", - json={ - "has_more": False, - "object": "list", - "url": "/v1/checkout/sessions", - "data": [ - { - "created": 1641038947, - "customer": "cus_HezytZRkaQJC8W", - "id": "in_1KD6OVIEn5WyEQxn9xuASHsD", - "object": "invoice", - "total": 1, - "lines": { - "data": [ - { - "id": "il_1", - "object": "line_item", - }, - { - "id": "il_2", - "object": "line_item", - }, - ], - "has_more": True, - "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", +lazy_substream_test_suite = ( + ( + { + "https://api.stripe.com/v1/invoices": { + "has_more": False, + "object": "list", + "url": "/v1/invoices", + "data": [ + { + "created": 1641038947, + "customer": "cus_HezytZRkaQJC8W", + "id": "in_1KD6OVIEn5WyEQxn9xuASHsD", + "object": "invoice", + "total": 1, + "lines": { + "data": [ + { + "id": "il_1", + "object": "line_item", + }, + { + "id": "il_2", + "object": "line_item", + }, + ], + "has_more": True, + "object": "list", + "total_count": 3, + "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", + }, + } + ], + }, + "https://api.stripe.com/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines?starting_after=il_2": { + "data": [ + { + "id": "il_3", + "object": "line_item", }, - } - ], + ], + "has_more": False, + "object": "list", + "total_count": 3, + "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", + }, }, - ) - - # Second pagination request to main stream - requests_mock.get( - "https://api.stripe.com/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", - json={ - "data": [ - { - "id": "il_3", - "object": "line_item", - }, - ], - "has_more": False, - "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", + "invoices", + "invoice_line_items", + [ + {"id": "il_1", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, + {"id": "il_2", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, + {"id": "il_3", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, + ], + "full_refresh", + {} + ), + ( + { + "https://api.stripe.com/v1/subscriptions": { + "has_more": False, + "object": "list", + "url": "/v1/subscriptions", + "data": [ + { + "created": 1641038947, + "customer": "cus_HezytZRkaQJC8W", + "id": "si_OptSP2o3XZUBpx", + "object": "subscription", + "total": 1, + "items": { + "data": [ + { + "id": "si_1", + "object": "subscription_item", + }, + { + "id": "si_2", + "object": "subscription_item", + }, + ], + "has_more": True, + "object": "list", + "total_count": 3, + "url": "/v1/subscription_items", + }, + } + ], + }, + "https://api.stripe.com/v1/subscription_items?subscription=si_OptSP2o3XZUBpx&starting_after=si_2": { + "data": [ + { + "id": "si_3", + "object": "subscription_item", + }, + ], + "has_more": False, + "object": "list", + "total_count": 3, + "url": "/v1/subscription_items", + }, }, - ) + "subscriptions", + "subscription_items", + [ + {"id": "si_1", "object": "subscription_item"}, + {"id": "si_2", "object": "subscription_item"}, + {"id": "si_3", "object": "subscription_item"}, + ], + "full_refresh", + {} + ), + ( + { + "https://api.stripe.com/v1/customers?expand%5B%5D=data.sources": { + "has_more": False, + "object": "list", + "url": "/v1/customers", + "data": [ + { + "created": 1641038947, + "id": "cus_HezytZRkaQJC8W", + "object": "customer", + "total": 1, + "sources": { + "data": [ + { + "id": "cs_1", + "object": "card", + }, + { + "id": "cs_2", + "object": "bank_account", + }, + ], + "has_more": True, + "object": "list", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", + }, + } + ], + }, + "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources": { + "data": [ + { + "id": "cs_3", + "object": "card", + }, + { + "id": "cs_4", + "object": "bank_account", + }, + ], + "has_more": False, + "object": "list", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", + }, + }, + "customers", + "bank_accounts", + [ + {"id": "cs_2", "object": "bank_account", "updated": 1692802815}, + {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, + ], + "full_refresh", + {} + ), + ( + { + "https://api.stripe.com/v1/application_fees": { + "has_more": False, + "object": "list", + "url": "/v1/application_fees", + "data": [ + { + "created": 1641038947, + "customer": "cus_HezytZRkaQJC8W", + "id": "af_OptSP2o3XZUBpx", + "object": "application_fee", + "total": 1, + "refunds": { + "data": [ + { + "id": "fr_1", + "object": "application_fee_refund", + }, + { + "id": "fr_2", + "object": "application_fee_refund", + }, + ], + "has_more": True, + "object": "list", + "total_count": 3, + "url": "/v1/application_fees/af_OptSP2o3XZUBpx/refunds", + }, + } + ], + }, + "https://api.stripe.com/v1/application_fees/af_OptSP2o3XZUBpx/refunds": { + "data": [ + { + "id": "fr_3", + "object": "application_fee_refund", + } + ], + "has_more": False, + "object": "list", + "total_count": 3, + "url": "/v1/application_fees/af_OptSP2o3XZUBpx/refunds", + }, + }, + "application_fees", + "application_fees_refunds", + [ + {"id": "fr_1", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, + {"id": "fr_2", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, + {"id": "fr_3", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, + ], + "full_refresh", + {} + ), + ( + { + "https://api.stripe.com/v1/events?types%5B%5D=customer.source.created&types%5B%5D=customer.source.expiring&types" + "%5B%5D=customer.source.updated&types%5B%5D=customer.source.deleted": { + "data": [ + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802016, + "data": {"object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "type": "customer.source.created", + }, + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802017, + "data": {"object": {"object": "card", "card": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "type": "customer.source.updated", + } + ], + "has_more": False, + } + }, + "customers", + "bank_accounts", + [ + {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016} + ], + "incremental", + {"updated": 1692802015} + ), + ( + { + "https://api.stripe.com/v1/events?types%5B%5D=application_fee.refund.updated": { + "data": [ + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802016, + "data": { + "object": { + "object": "application_fee_refund", + "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", + "created": 1653341716 + } + }, + "type": "application_fee.refund.updated", + }, + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802017, + "data": { + "object": { + "object": "application_fee_refund", + "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", + "created": 1653341716 + } + }, + "type": "application_fee.refund.updated", + } + ], + "has_more": False, + } + }, + "application_fees", + "application_fees_refunds", + [ + {"object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}, + {"object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802017} + ], + "incremental", + {"updated": 1692802015} + ), +) + + +@pytest.mark.parametrize("requests_mock_map, parent_stream_cls, stream_cls, expected_records, sync_mode, state", lazy_substream_test_suite) +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_lazy_sub_streams( + request, requests_mock, requests_mock_map, parent_stream_cls, stream_cls, expected_records, stream_args, sync_mode, state +): + parent_stream_cls = request.getfixturevalue(parent_stream_cls) + stream_cls = request.getfixturevalue(stream_cls) + + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) # make start date a recent date so there's just one slice in a parent stream stream_args["start_date"] = pendulum.today().subtract(days=3).int_timestamp - parent_stream = invoices(stream_args) - stream = invoice_line_items(stream_args, parent_stream=parent_stream) + parent_stream = parent_stream_cls(stream_args) + stream = stream_cls(stream_args, parent_stream=parent_stream) records = [] - for slice_ in stream.stream_slices(sync_mode="full_refresh"): - records.extend(stream.read_records(sync_mode="full_refresh", stream_slice=slice_)) - assert list(records) == [ - {"id": "il_1", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_2", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_3", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - ] + for slice_ in stream.stream_slices(sync_mode=sync_mode, stream_state=state): + records.extend(stream.read_records(sync_mode=sync_mode, stream_slice=slice_, stream_state=state)) + assert list(records) == expected_records @freezegun.freeze_time("2023-08-23T15:00:15Z") diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 0e04310595b7..7812bffca780 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,7 +192,8 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | +| 4.5.2 | 2023-11-03 | [00000](https://github.com/airbytehq/airbyte/pull/00000/) | Fix multiple BankAccount issues | +| 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | | 4.5.0 | 2023-10-25 | [31327](https://github.com/airbytehq/airbyte/pull/31327/) | Use concurrent CDK when running in full-refresh | | 4.4.2 | 2023-10-24 | [31764](https://github.com/airbytehq/airbyte/pull/31764) | Base image migration: remove Dockerfile and use the python-connector-base image | | 4.4.1 | 2023-10-18 | [31553](https://github.com/airbytehq/airbyte/pull/31553) | Adjusted `Setup Attempts` and extended `Checkout Sessions` stream schemas | From 088e0d75905b9ddba6c3c0f9762c82535cf5bfc9 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 3 Nov 2023 17:46:56 +0200 Subject: [PATCH 2/9] update changelog --- docs/integrations/sources/stripe.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 7812bffca780..30c101fac0d8 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,7 +192,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.2 | 2023-11-03 | [00000](https://github.com/airbytehq/airbyte/pull/00000/) | Fix multiple BankAccount issues | +| 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | | 4.5.0 | 2023-10-25 | [31327](https://github.com/airbytehq/airbyte/pull/31327/) | Use concurrent CDK when running in full-refresh | | 4.4.2 | 2023-10-24 | [31764](https://github.com/airbytehq/airbyte/pull/31764) | Base image migration: remove Dockerfile and use the python-connector-base image | From 22202371f28fabe7357cae4495e2f73a4cb52e45 Mon Sep 17 00:00:00 2001 From: davydov-d Date: Fri, 3 Nov 2023 16:04:13 +0000 Subject: [PATCH 3/9] Automated Commit - Formatting Changes --- .../source-stripe/unit_tests/conftest.py | 3 ++ .../source-stripe/unit_tests/test_streams.py | 42 ++++++++++++------- docs/integrations/sources/stripe.md | 2 +- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index fb21a40fe3d8..80c00f6fb42b 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -187,6 +187,7 @@ def mocker(args=stream_args): ], **args, ) + return mocker @@ -219,6 +220,7 @@ def mocker(args=stream_args): event_types=["application_fee.created", "application_fee.refunded"], **args, ) + return mocker @@ -237,4 +239,5 @@ def mocker(args=stream_args, parent_stream=parent_stream): add_parent_id=True, **args, ) + return mocker diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 6b8620e23a00..6a9d5dfd72f7 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -68,7 +68,7 @@ def test_request_headers(accounts): {"id": "il_3", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, ], "full_refresh", - {} + {}, ), ( { @@ -123,7 +123,7 @@ def test_request_headers(accounts): {"id": "si_3", "object": "subscription_item"}, ], "full_refresh", - {} + {}, ), ( { @@ -180,7 +180,7 @@ def test_request_headers(accounts): {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, ], "full_refresh", - {} + {}, ), ( { @@ -235,7 +235,7 @@ def test_request_headers(accounts): {"id": "fr_3", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, ], "full_refresh", - {} + {}, ), ( { @@ -247,7 +247,9 @@ def test_request_headers(accounts): "object": "event", "api_version": "2020-08-27", "created": 1692802016, - "data": {"object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "data": { + "object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716} + }, "type": "customer.source.created", }, { @@ -257,18 +259,16 @@ def test_request_headers(accounts): "created": 1692802017, "data": {"object": {"object": "card", "card": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, "type": "customer.source.updated", - } + }, ], "has_more": False, } }, "customers", "bank_accounts", - [ - {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016} - ], + [{"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}], "incremental", - {"updated": 1692802015} + {"updated": 1692802015}, ), ( { @@ -283,7 +283,7 @@ def test_request_headers(accounts): "object": { "object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716 + "created": 1653341716, } }, "type": "application_fee.refund.updated", @@ -297,11 +297,11 @@ def test_request_headers(accounts): "object": { "object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716 + "created": 1653341716, } }, "type": "application_fee.refund.updated", - } + }, ], "has_more": False, } @@ -309,11 +309,21 @@ def test_request_headers(accounts): "application_fees", "application_fees_refunds", [ - {"object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}, - {"object": "application_fee_refund", "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802017} + { + "object": "application_fee_refund", + "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", + "created": 1653341716, + "updated": 1692802016, + }, + { + "object": "application_fee_refund", + "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", + "created": 1653341716, + "updated": 1692802017, + }, ], "incremental", - {"updated": 1692802015} + {"updated": 1692802015}, ), ) diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 30c101fac0d8..7812bffca780 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,7 +192,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | +| 4.5.2 | 2023-11-03 | [00000](https://github.com/airbytehq/airbyte/pull/00000/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | | 4.5.0 | 2023-10-25 | [31327](https://github.com/airbytehq/airbyte/pull/31327/) | Use concurrent CDK when running in full-refresh | | 4.4.2 | 2023-10-24 | [31764](https://github.com/airbytehq/airbyte/pull/31764) | Base image migration: remove Dockerfile and use the python-connector-base image | From 5b19277264c897ae7ac9c628a9c26d1486ffb73c Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Wed, 8 Nov 2023 19:02:18 +0200 Subject: [PATCH 4/9] test connector streams instead of classes --- .../source-stripe/unit_tests/conftest.py | 217 +----------------- .../unit_tests/test_availability_strategy.py | 12 +- .../source-stripe/unit_tests/test_streams.py | 70 +++--- docs/integrations/sources/stripe.md | 2 +- 4 files changed, 45 insertions(+), 256 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 80c00f6fb42b..40beb1a895c8 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -2,20 +2,9 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -import os - import pytest from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from source_stripe.streams import ( - CreatedCursorIncrementalStripeStream, - IncrementalStripeStream, - StripeLazySubStream, - StripeStream, - UpdatedCursorIncrementalStripeLazySubStream, - UpdatedCursorIncrementalStripeStream, -) - -os.environ["REQUEST_CACHE_PATH"] = "REQUEST_CACHE_PATH" +from source_stripe.source import SourceStripe @pytest.fixture(name="config") @@ -41,203 +30,13 @@ def incremental_args_fixture(stream_args): return {"lookback_window_days": 14, **stream_args} -@pytest.fixture(name="invoices") -def invoices_fixture(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="invoices", - path="invoices", - use_cache=False, - event_types=[ - "invoice.created", - "invoice.finalization_failed", - "invoice.finalized", - "invoice.marked_uncollectible", - "invoice.paid", - "invoice.payment_action_required", - "invoice.payment_failed", - "invoice.payment_succeeded", - "invoice.sent", - "invoice.upcoming", - "invoice.updated", - "invoice.voided", - ], - **args, - ) - - return mocker - - -@pytest.fixture(name="invoice_line_items") -def invoice_line_items_fixture(invoices, stream_args): - parent_stream = invoices() - - def mocker(args=stream_args, parent_stream=parent_stream): - return StripeLazySubStream( - name="invoice_line_items", - path=lambda self, *args, stream_slice, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines", - parent=parent_stream, - parent_id="invoice_id", - sub_items_attr="lines", - add_parent_id=True, - **args, - ) - - return mocker - - -@pytest.fixture() -def accounts(stream_args): - def mocker(args=stream_args): - return StripeStream(name="accounts", path="accounts", **args) - - return mocker - - -@pytest.fixture() -def balance_transactions(incremental_stream_args): - def mocker(args=incremental_stream_args): - return CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **args) - - return mocker - - @pytest.fixture() -def credit_notes(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="credit_notes", - path="credit_notes", - event_types=["credit_note.created", "credit_note.updated", "credit_note.voided"], - **args, - ) - - return mocker - - -@pytest.fixture() -def customers(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="customers", - path="customers", - use_cache=False, - expand_items=["data.sources"], - event_types=["customer.created", "customer.updated"], - **args, - ) - - return mocker - - -@pytest.fixture() -def bank_accounts(customers, stream_args): - parent_stream = customers() - - def mocker(args=stream_args, parent_stream=parent_stream): - return UpdatedCursorIncrementalStripeLazySubStream( - name="bank_accounts", - path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources", - parent=parent_stream, - event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"], - legacy_cursor_field=None, - parent_id="customer_id", - sub_items_attr="sources", - extra_request_params={"object": "bank_account"}, - response_filter=lambda record: record["object"] == "bank_account", - **args, - ) - - return mocker - - -@pytest.fixture() -def external_bank_accounts(stream_args): - def mocker(args=stream_args): - return UpdatedCursorIncrementalStripeStream( - name="external_account_bank_accounts", - path=lambda self, *args, **kwargs: f"accounts/{self.account_id}/external_accounts", - event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"], - legacy_cursor_field=None, - extra_request_params={"object": "bank_account"}, - response_filter=lambda record: record["object"] == "bank_account", - **args, - ) - - return mocker - - -@pytest.fixture(name="subscriptions") -def subscription_fixture(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="subscriptions", - path="subscriptions", - use_cache=False, - extra_request_params={"status": "all"}, - event_types=[ - "customer.subscription.created", - "customer.subscription.paused", - "customer.subscription.pending_update_applied", - "customer.subscription.pending_update_expired", - "customer.subscription.resumed", - "customer.subscription.trial_will_end", - "customer.subscription.updated", - "customer.subscription.deleted", - ], - **args, - ) - - return mocker - - -@pytest.fixture(name="subscription_items") -def subscription_items_fixture(subscriptions, stream_args): - parent_stream = subscriptions() - - def mocker(args=stream_args, parent_stream=parent_stream): - return StripeLazySubStream( - name="subscription_items", - path="subscription_items", - extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]}, - parent=parent_stream, - use_cache=False, - parent_id="subscription_id", - sub_items_attr="items", - **args, - ) - - return mocker - - -@pytest.fixture(name="application_fees") -def application_fees_fixture(stream_args): - def mocker(args=stream_args): - return IncrementalStripeStream( - name="application_fees", - path="application_fees", - use_cache=False, - event_types=["application_fee.created", "application_fee.refunded"], - **args, - ) - - return mocker - - -@pytest.fixture(name="application_fees_refunds") -def application_fees_refunds_fixture(application_fees, stream_args): - parent_stream = application_fees() - - def mocker(args=stream_args, parent_stream=parent_stream): - return UpdatedCursorIncrementalStripeLazySubStream( - name="application_fees_refunds", - path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice[self.parent_id]}/refunds", - parent=parent_stream, - event_types=["application_fee.refund.updated"], - parent_id="refund_id", - sub_items_attr="refunds", - add_parent_id=True, - **args, - ) +def stream_by_name(config): + def mocker(stream_name, source_config=config): + source = SourceStripe() + streams = source.streams(source_config) + for stream in streams: + if stream.name == stream_name: + return stream return mocker diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py index d9382de2be1f..0f747acac434 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py @@ -87,13 +87,13 @@ def test_traverse_over_substreams_failure(mocker): assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1) -def test_substream_availability(mocker, invoice_line_items): +def test_substream_availability(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock ) - stream = invoice_line_items() + stream = stream_by_name("invoice_line_items") is_available, reason = stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock()) assert is_available and reason is None @@ -102,13 +102,13 @@ def test_substream_availability(mocker, invoice_line_items): assert isinstance(check_availability_mock.call_args_list[1].args[0], StripeLazySubStream) -def test_substream_availability_no_parent(mocker, invoice_line_items): +def test_substream_availability_no_parent(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock ) - stream = invoice_line_items() + stream = stream_by_name("invoice_line_items") stream.parent = None stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock()) @@ -117,8 +117,8 @@ def test_substream_availability_no_parent(mocker, invoice_line_items): assert isinstance(check_availability_mock.call_args_list[0].args[0], StripeLazySubStream) -def test_403_error_handling(invoices, requests_mock): - stream = invoices() +def test_403_error_handling(stream_by_name, requests_mock): + stream = stream_by_name("invoices") logger = logging.getLogger("airbyte") for error_code in STRIPE_ERROR_CODES: requests_mock.get(f"{stream.url_base}{stream.path()}", status_code=403, json={"error": {"code": f"{error_code}"}}) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 6a9d5dfd72f7..e3cf5ab15f1d 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -8,8 +8,8 @@ from source_stripe.streams import CheckoutSessionsLineItems, CustomerBalanceTransactions, Persons, SetupAttempts -def test_request_headers(accounts): - stream = accounts() +def test_request_headers(stream_by_name): + stream = stream_by_name("accounts") headers = stream.request_headers() assert headers["Stripe-Version"] == "2022-11-15" @@ -60,7 +60,6 @@ def test_request_headers(accounts): "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", }, }, - "invoices", "invoice_line_items", [ {"id": "il_1", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, @@ -115,7 +114,6 @@ def test_request_headers(accounts): "url": "/v1/subscription_items", }, }, - "subscriptions", "subscription_items", [ {"id": "si_1", "object": "subscription_item"}, @@ -173,7 +171,6 @@ def test_request_headers(accounts): "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", }, }, - "customers", "bank_accounts", [ {"id": "cs_2", "object": "bank_account", "updated": 1692802815}, @@ -227,7 +224,6 @@ def test_request_headers(accounts): "url": "/v1/application_fees/af_OptSP2o3XZUBpx/refunds", }, }, - "application_fees", "application_fees_refunds", [ {"id": "fr_1", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, @@ -264,7 +260,6 @@ def test_request_headers(accounts): "has_more": False, } }, - "customers", "bank_accounts", [{"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}], "incremental", @@ -306,7 +301,6 @@ def test_request_headers(accounts): "has_more": False, } }, - "application_fees", "application_fees_refunds", [ { @@ -328,32 +322,28 @@ def test_request_headers(accounts): ) -@pytest.mark.parametrize("requests_mock_map, parent_stream_cls, stream_cls, expected_records, sync_mode, state", lazy_substream_test_suite) +@pytest.mark.parametrize("requests_mock_map, stream_cls, expected_records, sync_mode, state", lazy_substream_test_suite) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_lazy_sub_streams( - request, requests_mock, requests_mock_map, parent_stream_cls, stream_cls, expected_records, stream_args, sync_mode, state + stream_by_name, requests_mock, requests_mock_map, stream_cls, expected_records, config, sync_mode, state ): - parent_stream_cls = request.getfixturevalue(parent_stream_cls) - stream_cls = request.getfixturevalue(stream_cls) + # make start date a recent date so there's just one slice in a parent stream + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name(stream_cls, config) for url, body in requests_mock_map.items(): requests_mock.get(url, json=body) - # make start date a recent date so there's just one slice in a parent stream - stream_args["start_date"] = pendulum.today().subtract(days=3).int_timestamp - parent_stream = parent_stream_cls(stream_args) - stream = stream_cls(stream_args, parent_stream=parent_stream) records = [] - for slice_ in stream.stream_slices(sync_mode=sync_mode, stream_state=state): records.extend(stream.read_records(sync_mode=sync_mode, stream_slice=slice_, stream_state=state)) assert list(records) == expected_records @freezegun.freeze_time("2023-08-23T15:00:15Z") -def test_created_cursor_incremental_stream(requests_mock, balance_transactions, incremental_stream_args): - incremental_stream_args["start_date"] = pendulum.now().subtract(months=23).int_timestamp - stream = balance_transactions(incremental_stream_args) +def test_created_cursor_incremental_stream(requests_mock, stream_by_name, config): + config["start_date"] = str(pendulum.now().subtract(months=23)) + stream = stream_by_name("balance_transactions", {"lookback_window_days": 14, **config}) requests_mock.get( "/v1/balance_transactions", [ @@ -401,18 +391,18 @@ def test_created_cursor_incremental_stream(requests_mock, balance_transactions, ) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_get_start_timestamp( - balance_transactions, incremental_stream_args, start_date, lookback_window, max_days_from_now, stream_state, expected_start_timestamp + stream_by_name, config, start_date, lookback_window, max_days_from_now, stream_state, expected_start_timestamp ): - incremental_stream_args["start_date"] = pendulum.parse(start_date).int_timestamp - incremental_stream_args["lookback_window_days"] = lookback_window - incremental_stream_args["start_date_max_days_from_now"] = max_days_from_now - stream = balance_transactions(incremental_stream_args) + config["start_date"] = start_date + config["lookback_window_days"] = lookback_window + stream = stream_by_name("balance_transactions", config) + stream.start_date_max_days_from_now = max_days_from_now assert stream.get_start_timestamp(stream_state) == pendulum.parse(expected_start_timestamp).int_timestamp @pytest.mark.parametrize("sync_mode", ("full_refresh", "incremental")) -def test_updated_cursor_incremental_stream_slices(credit_notes, sync_mode): - stream = credit_notes() +def test_updated_cursor_incremental_stream_slices(stream_by_name, sync_mode): + stream = stream_by_name("credit_notes") assert list(stream.stream_slices(sync_mode)) == [{}] @@ -420,13 +410,13 @@ def test_updated_cursor_incremental_stream_slices(credit_notes, sync_mode): "last_record, stream_state, expected_state", (({"updated": 110}, {"updated": 111}, {"updated": 111}), ({"created": 110}, {"updated": 111}, {"updated": 111})), ) -def test_updated_cursor_incremental_stream_get_updated_state(credit_notes, last_record, stream_state, expected_state): - stream = credit_notes() +def test_updated_cursor_incremental_stream_get_updated_state(stream_by_name, last_record, stream_state, expected_state): + stream = stream_by_name("credit_notes") assert stream.get_updated_state(last_record, stream_state) == expected_state @pytest.mark.parametrize("sync_mode", ("full_refresh", "incremental")) -def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mode, credit_notes): +def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mode, stream_by_name): requests_mock.get( "/v1/credit_notes", [ @@ -453,7 +443,7 @@ def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mod } ], ) - stream = credit_notes() + stream = stream_by_name("credit_notes") records = [record for record in stream.read_records(sync_mode)] assert records == [ { @@ -476,7 +466,7 @@ def test_updated_cursor_incremental_stream_read_wo_state(requests_mock, sync_mod @freezegun.freeze_time("2023-08-23T00:00:00") -def test_updated_cursor_incremental_stream_read_w_state(requests_mock, credit_notes): +def test_updated_cursor_incremental_stream_read_w_state(requests_mock, stream_by_name): requests_mock.get( "/v1/events", [ @@ -498,7 +488,7 @@ def test_updated_cursor_incremental_stream_read_w_state(requests_mock, credit_no ], ) - stream = credit_notes() + stream = stream_by_name("credit_notes") records = [ record for record in stream.read_records("incremental", stream_state={"updated": pendulum.parse("2023-01-01T15:00:15Z").int_timestamp}) @@ -668,11 +658,11 @@ def test_persons_w_state(requests_mock, stream_args): @pytest.mark.parametrize("sync_mode, stream_state", (("full_refresh", {}), ("incremental", {}), ("incremental", {"updated": 1693987430}))) -def test_cursorless_incremental_stream(requests_mock, external_bank_accounts, sync_mode, stream_state): +def test_cursorless_incremental_stream(requests_mock, stream_by_name, sync_mode, stream_state): # Testing streams that *only* have the cursor field value in incremental mode because of API discrepancies, # e.g. /bank_accounts does not return created/updated date, however /events?type=bank_account.updated returns the update date. # Key condition here is that the underlying stream has legacy cursor field set to None. - stream = external_bank_accounts() + stream = stream_by_name("external_account_bank_accounts") requests_mock.get( "/v1/accounts//external_accounts", json={ @@ -718,9 +708,9 @@ def test_cursorless_incremental_stream(requests_mock, external_bank_accounts, sy @pytest.mark.parametrize("sync_mode, stream_state", (("full_refresh", {}), ("incremental", {}), ("incremental", {"updated": 1693987430}))) -def test_cursorless_incremental_substream(requests_mock, bank_accounts, sync_mode, stream_state): +def test_cursorless_incremental_substream(requests_mock, stream_by_name, sync_mode, stream_state): # same for substreams - stream = bank_accounts() + stream = stream_by_name("bank_accounts") requests_mock.get( "/v1/customers", json={ @@ -759,9 +749,9 @@ def test_cursorless_incremental_substream(requests_mock, bank_accounts, sync_mod stream.get_updated_state(stream_state, record) -@pytest.mark.parametrize("stream", ("bank_accounts",)) -def test_get_updated_state(stream, request, requests_mock): - stream = request.getfixturevalue(stream)() +@pytest.mark.parametrize("stream_name", ("bank_accounts",)) +def test_get_updated_state(stream_name, stream_by_name, requests_mock): + stream = stream_by_name(stream_name) response = {"data": [{"id": 1, stream.cursor_field: 1695292083}]} requests_mock.get("/v1/credit_notes", json=response) requests_mock.get("/v1/balance_transactions", json=response) diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 7812bffca780..30c101fac0d8 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,7 +192,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.2 | 2023-11-03 | [00000](https://github.com/airbytehq/airbyte/pull/00000/) | Fix multiple BankAccount issues | +| 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | | 4.5.0 | 2023-10-25 | [31327](https://github.com/airbytehq/airbyte/pull/31327/) | Use concurrent CDK when running in full-refresh | | 4.4.2 | 2023-10-24 | [31764](https://github.com/airbytehq/airbyte/pull/31764) | Base image migration: remove Dockerfile and use the python-connector-base image | From 2f0b20479f954aeefff6f401da780c108afe5fc9 Mon Sep 17 00:00:00 2001 From: davydov-d Date: Wed, 8 Nov 2023 17:13:44 +0000 Subject: [PATCH 5/9] Automated Commit - Formatting Changes --- .../connectors/source-stripe/unit_tests/test_streams.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index e3cf5ab15f1d..ef615db3efcf 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -324,9 +324,7 @@ def test_request_headers(stream_by_name): @pytest.mark.parametrize("requests_mock_map, stream_cls, expected_records, sync_mode, state", lazy_substream_test_suite) @freezegun.freeze_time("2023-08-23T15:00:15Z") -def test_lazy_sub_streams( - stream_by_name, requests_mock, requests_mock_map, stream_cls, expected_records, config, sync_mode, state -): +def test_lazy_sub_streams(stream_by_name, requests_mock, requests_mock_map, stream_cls, expected_records, config, sync_mode, state): # make start date a recent date so there's just one slice in a parent stream config["start_date"] = str(pendulum.today().subtract(days=3)) From 7c200801eb6babf1b75dcc5553621aad1b08fed5 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Wed, 8 Nov 2023 22:22:21 +0200 Subject: [PATCH 6/9] update tests --- .../source-stripe/source_stripe/source.py | 21 +- .../source-stripe/source_stripe/streams.py | 9 +- .../source-stripe/unit_tests/conftest.py | 4 + .../source-stripe/unit_tests/test_streams.py | 429 ++++++------------ 4 files changed, 156 insertions(+), 307 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 398811089503..9ebd06df8026 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import os from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -32,6 +33,8 @@ ) _MAX_CONCURRENCY = 3 +_CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +USE_CACHE = not _CACHE_DISABLED class SourceStripe(AbstractSource): @@ -103,7 +106,7 @@ def customers(**args): return IncrementalStripeStream( name="customers", path="customers", - use_cache=True, + use_cache=USE_CACHE, event_types=["customer.created", "customer.updated", "customer.deleted"], **args, ) @@ -121,7 +124,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: subscriptions = IncrementalStripeStream( name="subscriptions", path="subscriptions", - use_cache=True, + use_cache=USE_CACHE, extra_request_params={"status": "all"}, event_types=[ "customer.subscription.created", @@ -140,7 +143,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: path="subscription_items", extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]}, parent=subscriptions, - use_cache=True, + use_cache=USE_CACHE, parent_id="subscription_id", sub_items_attr="items", **args, @@ -148,21 +151,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: transfers = IncrementalStripeStream( name="transfers", path="transfers", - use_cache=True, + use_cache=USE_CACHE, event_types=["transfer.created", "transfer.reversed", "transfer.updated"], **args, ) application_fees = IncrementalStripeStream( name="application_fees", path="application_fees", - use_cache=True, + use_cache=USE_CACHE, event_types=["application_fee.created", "application_fee.refunded"], **args, ) invoices = IncrementalStripeStream( name="invoices", path="invoices", - use_cache=True, + use_cache=USE_CACHE, event_types=[ "invoice.created", "invoice.finalization_failed", @@ -203,7 +206,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), Persons(**args), SetupAttempts(**incremental_args), - StripeStream(name="accounts", path="accounts", use_cache=True, **args), + StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args), CreatedCursorIncrementalStripeStream(name="shipping_rates", path="shipping_rates", **incremental_args), CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args), CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args), @@ -211,7 +214,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: UpdatedCursorIncrementalStripeStream( name="checkout_sessions", path="checkout/sessions", - use_cache=True, + use_cache=USE_CACHE, legacy_cursor_field="expires_at", event_types=[ "checkout.session.async_payment_failed", @@ -338,7 +341,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), transfers, IncrementalStripeStream( - name="refunds", path="refunds", use_cache=True, event_types=["refund.created", "refund.updated"], **args + name="refunds", path="refunds", use_cache=USE_CACHE, event_types=["refund.created", "refund.updated"], **args ), IncrementalStripeStream( name="payment_intents", diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index c500290c7b07..460059716a75 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -4,6 +4,7 @@ import copy import math +import os from abc import ABC, abstractmethod from itertools import chain from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union @@ -18,6 +19,8 @@ from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy STRIPE_API_VERSION = "2022-11-15" +CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +USE_CACHE = not CACHE_DISABLED class IRecordExtractor(ABC): @@ -484,7 +487,7 @@ def checkout_session(self): return UpdatedCursorIncrementalStripeStream( name="checkout_sessions", path="checkout/sessions", - use_cache=True, + use_cache=USE_CACHE, legacy_cursor_field="expires_at", event_types=[ "checkout.session.async_payment_failed", @@ -575,7 +578,7 @@ def customers(self) -> IncrementalStripeStream: return IncrementalStripeStream( name="customers", path="customers", - use_cache=True, + use_cache=USE_CACHE, event_types=["customer.created", "customer.updated", "customer.deleted"], authenticator=self.authenticator, account_id=self.account_id, @@ -659,7 +662,7 @@ class Persons(UpdatedCursorIncrementalStripeStream, HttpSubStream): event_types = ["person.created", "person.updated", "person.deleted"] def __init__(self, *args, **kwargs): - parent = StripeStream(*args, name="accounts", path="accounts", use_cache=True, **kwargs) + parent = StripeStream(*args, name="accounts", path="accounts", use_cache=USE_CACHE, **kwargs) super().__init__(*args, parent=parent, **kwargs) def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 40beb1a895c8..c74a3386f464 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -2,10 +2,14 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import os + import pytest from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from source_stripe.source import SourceStripe +os.environ["CACHE_DISABLED"] = "true" + @pytest.fixture(name="config") def config_fixture(): diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index e3cf5ab15f1d..194c542c4b7a 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -2,159 +2,61 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from urllib.parse import urlencode + import freezegun import pendulum import pytest from source_stripe.streams import CheckoutSessionsLineItems, CustomerBalanceTransactions, Persons, SetupAttempts +def read_from_stream(stream, sync_mode, state): + records = [] + for slice_ in stream.stream_slices(sync_mode=sync_mode, stream_state=state): + for record in stream.read_records(sync_mode=sync_mode, stream_slice=slice_, stream_state=state): + records.append(record) + return records + + def test_request_headers(stream_by_name): stream = stream_by_name("accounts") headers = stream.request_headers() assert headers["Stripe-Version"] == "2022-11-15" -lazy_substream_test_suite = ( - ( - { - "https://api.stripe.com/v1/invoices": { - "has_more": False, - "object": "list", - "url": "/v1/invoices", - "data": [ - { - "created": 1641038947, - "customer": "cus_HezytZRkaQJC8W", - "id": "in_1KD6OVIEn5WyEQxn9xuASHsD", - "object": "invoice", - "total": 1, - "lines": { - "data": [ - { - "id": "il_1", - "object": "line_item", - }, - { - "id": "il_2", - "object": "line_item", - }, - ], - "has_more": True, - "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", - }, - } - ], - }, - "https://api.stripe.com/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines?starting_after=il_2": { - "data": [ - { - "id": "il_3", - "object": "line_item", - }, - ], - "has_more": False, - "object": "list", - "total_count": 3, - "url": "/v1/invoices/in_1KD6OVIEn5WyEQxn9xuASHsD/lines", - }, - }, - "invoice_line_items", - [ - {"id": "il_1", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_2", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - {"id": "il_3", "invoice_id": "in_1KD6OVIEn5WyEQxn9xuASHsD", "object": "line_item"}, - ], - "full_refresh", - {}, - ), - ( - { - "https://api.stripe.com/v1/subscriptions": { - "has_more": False, - "object": "list", - "url": "/v1/subscriptions", - "data": [ - { - "created": 1641038947, - "customer": "cus_HezytZRkaQJC8W", - "id": "si_OptSP2o3XZUBpx", - "object": "subscription", - "total": 1, - "items": { - "data": [ - { - "id": "si_1", - "object": "subscription_item", - }, - { - "id": "si_2", - "object": "subscription_item", - }, - ], - "has_more": True, - "object": "list", - "total_count": 3, - "url": "/v1/subscription_items", - }, - } - ], - }, - "https://api.stripe.com/v1/subscription_items?subscription=si_OptSP2o3XZUBpx&starting_after=si_2": { - "data": [ - { - "id": "si_3", - "object": "subscription_item", +bank_accounts_full_refresh_test_case = ( + { + "https://api.stripe.com/v1/customers?expand%5B%5D=data.sources": { + "has_more": False, + "object": "list", + "url": "/v1/customers", + "data": [ + { + "created": 1641038947, + "id": "cus_HezytZRkaQJC8W", + "object": "customer", + "total": 1, + "sources": { + "data": [ + { + "id": "cs_1", + "object": "card", + }, + { + "id": "cs_2", + "object": "bank_account", + }, + ], + "has_more": True, + "object": "list", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", }, - ], - "has_more": False, - "object": "list", - "total_count": 3, - "url": "/v1/subscription_items", - }, + } + ], }, - "subscription_items", - [ - {"id": "si_1", "object": "subscription_item"}, - {"id": "si_2", "object": "subscription_item"}, - {"id": "si_3", "object": "subscription_item"}, - ], - "full_refresh", - {}, - ), - ( - { - "https://api.stripe.com/v1/customers?expand%5B%5D=data.sources": { - "has_more": False, - "object": "list", - "url": "/v1/customers", - "data": [ - { - "created": 1641038947, - "id": "cus_HezytZRkaQJC8W", - "object": "customer", - "total": 1, - "sources": { - "data": [ - { - "id": "cs_1", - "object": "card", - }, - { - "id": "cs_2", - "object": "bank_account", - }, - ], - "has_more": True, - "object": "list", - "total_count": 4, - "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", - }, - } - ], - }, - "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources": { + "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources?object=bank_account&starting_after=cs_2": + { "data": [ { "id": "cs_3", @@ -169,175 +71,112 @@ def test_request_headers(stream_by_name): "object": "list", "total_count": 4, "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", - }, - }, - "bank_accounts", - [ - {"id": "cs_2", "object": "bank_account", "updated": 1692802815}, - {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, - ], - "full_refresh", - {}, - ), - ( - { - "https://api.stripe.com/v1/application_fees": { - "has_more": False, - "object": "list", - "url": "/v1/application_fees", - "data": [ - { - "created": 1641038947, - "customer": "cus_HezytZRkaQJC8W", - "id": "af_OptSP2o3XZUBpx", - "object": "application_fee", - "total": 1, - "refunds": { - "data": [ - { - "id": "fr_1", - "object": "application_fee_refund", - }, - { - "id": "fr_2", - "object": "application_fee_refund", - }, - ], - "has_more": True, - "object": "list", - "total_count": 3, - "url": "/v1/application_fees/af_OptSP2o3XZUBpx/refunds", - }, - } - ], - }, - "https://api.stripe.com/v1/application_fees/af_OptSP2o3XZUBpx/refunds": { - "data": [ - { - "id": "fr_3", - "object": "application_fee_refund", - } - ], - "has_more": False, - "object": "list", - "total_count": 3, - "url": "/v1/application_fees/af_OptSP2o3XZUBpx/refunds", - }, - }, - "application_fees_refunds", - [ - {"id": "fr_1", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, - {"id": "fr_2", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, - {"id": "fr_3", "object": "application_fee_refund", "refund_id": "af_OptSP2o3XZUBpx", "updated": 1692802815}, - ], - "full_refresh", - {}, - ), - ( - { - "https://api.stripe.com/v1/events?types%5B%5D=customer.source.created&types%5B%5D=customer.source.expiring&types" - "%5B%5D=customer.source.updated&types%5B%5D=customer.source.deleted": { - "data": [ - { - "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", - "object": "event", - "api_version": "2020-08-27", - "created": 1692802016, - "data": { - "object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716} - }, - "type": "customer.source.created", - }, - { - "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", - "object": "event", - "api_version": "2020-08-27", - "created": 1692802017, - "data": {"object": {"object": "card", "card": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, - "type": "customer.source.updated", - }, - ], - "has_more": False, } - }, - "bank_accounts", - [{"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}], - "incremental", - {"updated": 1692802015}, - ), - ( - { - "https://api.stripe.com/v1/events?types%5B%5D=application_fee.refund.updated": { - "data": [ - { - "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", - "object": "event", - "api_version": "2020-08-27", - "created": 1692802016, - "data": { - "object": { - "object": "application_fee_refund", - "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716, - } - }, - "type": "application_fee.refund.updated", - }, - { - "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", - "object": "event", - "api_version": "2020-08-27", - "created": 1692802017, - "data": { - "object": { - "object": "application_fee_refund", - "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716, - } - }, - "type": "application_fee.refund.updated", + }, + "bank_accounts", + [ + {"id": "cs_2", "object": "bank_account", "updated": 1692802815}, + {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, + ], + "full_refresh", + {} +) + + +bank_accounts_incremental_test_case = ( + { + "https://api.stripe.com/v1/events?types%5B%5D=customer.source.created&types%5B%5D=customer.source.expiring&types" + "%5B%5D=customer.source.updated&types%5B%5D=customer.source.deleted": { + "data": [ + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802016, + "data": { + "object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716} }, - ], - "has_more": False, - } - }, - "application_fees_refunds", - [ - { - "object": "application_fee_refund", - "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716, - "updated": 1692802016, - }, - { - "object": "application_fee_refund", - "application_fee_refund": "afr_1K9GK0EcXtiJtvvhSo2LvGqT", - "created": 1653341716, - "updated": 1692802017, - }, - ], - "incremental", - {"updated": 1692802015}, - ), + "type": "customer.source.created", + }, + { + "id": "evt_1NdNFoEcXtiJtvvhBP5mxQmL", + "object": "event", + "api_version": "2020-08-27", + "created": 1692802017, + "data": {"object": {"object": "card", "card": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, + "type": "customer.source.updated", + }, + ], + "has_more": False, + } + }, + "bank_accounts", + [{"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716, "updated": 1692802016}], + "incremental", + {"updated": 1692802015}, ) -@pytest.mark.parametrize("requests_mock_map, stream_cls, expected_records, sync_mode, state", lazy_substream_test_suite) +@pytest.mark.parametrize( + "requests_mock_map, stream_cls, expected_records, sync_mode, state", + (bank_accounts_incremental_test_case, bank_accounts_full_refresh_test_case) +) @freezegun.freeze_time("2023-08-23T15:00:15Z") -def test_lazy_sub_streams( - stream_by_name, requests_mock, requests_mock_map, stream_cls, expected_records, config, sync_mode, state +def test_lazy_substream_data_cursor_value_is_populated( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state ): - # make start date a recent date so there's just one slice in a parent stream config["start_date"] = str(pendulum.today().subtract(days=3)) - stream = stream_by_name(stream_cls, config) for url, body in requests_mock_map.items(): requests_mock.get(url, json=body) - records = [] - for slice_ in stream.stream_slices(sync_mode=sync_mode, stream_state=state): - records.extend(stream.read_records(sync_mode=sync_mode, stream_slice=slice_, stream_state=state)) + records = read_from_stream(stream, sync_mode, state) + assert records == expected_records + for record in records: + assert bool(record[stream.cursor_field]) + + +@pytest.mark.parametrize( + "requests_mock_map, stream_cls, expected_records, sync_mode, state", + (bank_accounts_full_refresh_test_case,) +) +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_lazy_substream_data_is_expanded( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state +): + + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name("bank_accounts", config) + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) + + records = read_from_stream(stream, sync_mode, state) + assert list(records) == expected_records + assert len(requests_mock.request_history) == 2 + assert urlencode({"expand[]": "data.sources"}) in requests_mock.request_history[0].url + + +@pytest.mark.parametrize( + "requests_mock_map, stream_cls, expected_records, sync_mode, state, expected_object", + ( + (*bank_accounts_full_refresh_test_case, "bank_account"), + (*bank_accounts_incremental_test_case, "bank_account") + ) +) +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_lazy_substream_data_is_filtered( + requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state, expected_object +): + config["start_date"] = str(pendulum.today().subtract(days=3)) + stream = stream_by_name(stream_cls, config) + for url, body in requests_mock_map.items(): + requests_mock.get(url, json=body) + + records = read_from_stream(stream, sync_mode, state) + assert records == expected_records + for record in records: + assert record["object"] == expected_object @freezegun.freeze_time("2023-08-23T15:00:15Z") From 2697f7bd7babce53fb07ed388f00b8c0796b0f40 Mon Sep 17 00:00:00 2001 From: davydov-d Date: Wed, 8 Nov 2023 20:40:18 +0000 Subject: [PATCH 7/9] Automated Commit - Formatting Changes --- .../source-stripe/unit_tests/test_streams.py | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 194c542c4b7a..9af65e19a85b 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -55,23 +55,22 @@ def test_request_headers(stream_by_name): } ], }, - "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources?object=bank_account&starting_after=cs_2": - { - "data": [ - { - "id": "cs_3", - "object": "card", - }, - { - "id": "cs_4", - "object": "bank_account", - }, - ], - "has_more": False, - "object": "list", - "total_count": 4, - "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", - } + "https://api.stripe.com/v1/customers/cus_HezytZRkaQJC8W/sources?object=bank_account&starting_after=cs_2": { + "data": [ + { + "id": "cs_3", + "object": "card", + }, + { + "id": "cs_4", + "object": "bank_account", + }, + ], + "has_more": False, + "object": "list", + "total_count": 4, + "url": "/v1/customers/cus_HezytZRkaQJC8W/sources", + }, }, "bank_accounts", [ @@ -79,7 +78,7 @@ def test_request_headers(stream_by_name): {"id": "cs_4", "object": "bank_account", "updated": 1692802815}, ], "full_refresh", - {} + {}, ) @@ -93,9 +92,7 @@ def test_request_headers(stream_by_name): "object": "event", "api_version": "2020-08-27", "created": 1692802016, - "data": { - "object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716} - }, + "data": {"object": {"object": "bank_account", "bank_account": "cs_1K9GK0EcXtiJtvvhSo2LvGqT", "created": 1653341716}}, "type": "customer.source.created", }, { @@ -119,7 +116,7 @@ def test_request_headers(stream_by_name): @pytest.mark.parametrize( "requests_mock_map, stream_cls, expected_records, sync_mode, state", - (bank_accounts_incremental_test_case, bank_accounts_full_refresh_test_case) + (bank_accounts_incremental_test_case, bank_accounts_full_refresh_test_case), ) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_lazy_substream_data_cursor_value_is_populated( @@ -136,10 +133,7 @@ def test_lazy_substream_data_cursor_value_is_populated( assert bool(record[stream.cursor_field]) -@pytest.mark.parametrize( - "requests_mock_map, stream_cls, expected_records, sync_mode, state", - (bank_accounts_full_refresh_test_case,) -) +@pytest.mark.parametrize("requests_mock_map, stream_cls, expected_records, sync_mode, state", (bank_accounts_full_refresh_test_case,)) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_lazy_substream_data_is_expanded( requests_mock, stream_by_name, config, requests_mock_map, stream_cls, expected_records, sync_mode, state @@ -159,10 +153,7 @@ def test_lazy_substream_data_is_expanded( @pytest.mark.parametrize( "requests_mock_map, stream_cls, expected_records, sync_mode, state, expected_object", - ( - (*bank_accounts_full_refresh_test_case, "bank_account"), - (*bank_accounts_incremental_test_case, "bank_account") - ) + ((*bank_accounts_full_refresh_test_case, "bank_account"), (*bank_accounts_incremental_test_case, "bank_account")), ) @freezegun.freeze_time("2023-08-23T15:00:15Z") def test_lazy_substream_data_is_filtered( From 8a7571afea85feb599c7e589667b7864b386df38 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Fri, 10 Nov 2023 10:31:06 +0200 Subject: [PATCH 8/9] update tests --- .../source-stripe/unit_tests/conftest.py | 4 +- .../source-stripe/unit_tests/test_streams.py | 73 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index c74a3386f464..5eb8e515068a 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -6,7 +6,6 @@ import pytest from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from source_stripe.source import SourceStripe os.environ["CACHE_DISABLED"] = "true" @@ -36,6 +35,9 @@ def incremental_args_fixture(stream_args): @pytest.fixture() def stream_by_name(config): + # use local import in favour of global because we need to make imports after setting the env variables + from source_stripe.source import SourceStripe + def mocker(stream_name, source_config=config): source = SourceStripe() streams = source.streams(source_config) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 194c542c4b7a..ae35feb0ccdb 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -604,3 +604,76 @@ def test_get_updated_state(stream_name, stream_by_name, requests_mock): for record in stream.read_records(sync_mode="incremental", stream_slice=slice_, stream_state=state): state = stream.get_updated_state(state, record) assert state + + +@freezegun.freeze_time("2023-08-23T15:00:15Z") +def test_subscription_items_extra_request_params(requests_mock, stream_by_name, config): + requests_mock.get( + "/v1/subscriptions", + json={ + "object": "list", + "url": "/v1/subscriptions", + "has_more": False, + "data": [ + { + "id": "sub_1OApco2eZvKYlo2CEDCzwLrE", + "object": "subscription", + "created": 1699603174, + "items": { + "object": "list", + "data": [ + { + "id": "si_OynDmET1kQPTbI", + "object": "subscription_item", + "created": 1699603175, + "quantity": 1, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + } + ], + "has_more": True, + }, + "latest_invoice": None, + "livemode": False + } + ], + "has_more": False + } + ) + requests_mock.get( + "/v1/subscription_items?subscription=sub_1OApco2eZvKYlo2CEDCzwLrE", + json={ + "object": "list", + "url": "/v1/subscription_items", + "has_more": False, + "data": [ + { + "id": "si_OynPdzMZykmCWm", + "object": "subscription_item", + "created": 1699603884, + "quantity": 2, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + } + ] + } + ) + config["start_date"] = str(pendulum.now().subtract(days=3)) + stream = stream_by_name("subscription_items", config) + records = read_from_stream(stream, "full_refresh", {}) + assert records == [ + { + "id": "si_OynDmET1kQPTbI", + "object": "subscription_item", + "created": 1699603175, + "quantity": 1, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + }, + { + "id": "si_OynPdzMZykmCWm", + "object": "subscription_item", + "created": 1699603884, + "quantity": 2, + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + } + ] + assert len(requests_mock.request_history) == 2 + assert "subscription=sub_1OApco2eZvKYlo2CEDCzwLrE" in requests_mock.request_history[-1].url From 32190b339680acc839ae1b9a9faa6e7fa8718c2b Mon Sep 17 00:00:00 2001 From: davydov-d Date: Fri, 10 Nov 2023 08:42:39 +0000 Subject: [PATCH 9/9] Automated Commit - Formatting Changes --- .../source-stripe/unit_tests/test_streams.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 69ba6b837910..143331e06992 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -618,17 +618,17 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name, "object": "subscription_item", "created": 1699603175, "quantity": 1, - "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", } ], "has_more": True, }, "latest_invoice": None, - "livemode": False + "livemode": False, } ], - "has_more": False - } + "has_more": False, + }, ) requests_mock.get( "/v1/subscription_items?subscription=sub_1OApco2eZvKYlo2CEDCzwLrE", @@ -642,10 +642,10 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name, "object": "subscription_item", "created": 1699603884, "quantity": 2, - "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", } - ] - } + ], + }, ) config["start_date"] = str(pendulum.now().subtract(days=3)) stream = stream_by_name("subscription_items", config) @@ -656,15 +656,15 @@ def test_subscription_items_extra_request_params(requests_mock, stream_by_name, "object": "subscription_item", "created": 1699603175, "quantity": 1, - "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", }, { "id": "si_OynPdzMZykmCWm", "object": "subscription_item", "created": 1699603884, "quantity": 2, - "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE" - } + "subscription": "sub_1OApco2eZvKYlo2CEDCzwLrE", + }, ] assert len(requests_mock.request_history) == 2 assert "subscription=sub_1OApco2eZvKYlo2CEDCzwLrE" in requests_mock.request_history[-1].url