Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Stripe: fix multiple BankAccounts issues #32146

Merged
merged 12 commits into from
Nov 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 4.5.1
dockerImageTag: 4.5.2
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
Expand All @@ -21,7 +22,6 @@
CreatedCursorIncrementalStripeStream,
CustomerBalanceTransactions,
Events,
FilteringRecordExtractor,
IncrementalStripeStream,
Persons,
SetupAttempts,
Expand All @@ -33,6 +33,8 @@
)

_MAX_CONCURRENCY = 3
_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
USE_CACHE = not _CACHE_DISABLED


class SourceStripe(AbstractSource):
Expand Down Expand Up @@ -97,6 +99,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return False, str(e)
return True, None

@staticmethod
def customers(**args):
# The Customers stream is instantiated in a dedicated method to allow parametrization and avoid duplicated code.
# It can be used with and without expanded items (as an independent stream or as a parent stream for other streams).
return IncrementalStripeStream(
name="customers",
path="customers",
use_cache=USE_CACHE,
event_types=["customer.created", "customer.updated", "customer.deleted"],
**args,
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])
Expand All @@ -110,7 +124,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
subscriptions = IncrementalStripeStream(
name="subscriptions",
path="subscriptions",
use_cache=True,
use_cache=USE_CACHE,
extra_request_params={"status": "all"},
event_types=[
"customer.subscription.created",
Expand All @@ -127,38 +141,31 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
subscription_items = StripeLazySubStream(
name="subscription_items",
path="subscription_items",
extra_request_params=lambda self, *args, stream_slice, **kwargs: {"subscription": stream_slice[self.parent_id]},
extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]},
parent=subscriptions,
use_cache=True,
use_cache=USE_CACHE,
parent_id="subscription_id",
sub_items_attr="items",
**args,
)
transfers = IncrementalStripeStream(
name="transfers",
path="transfers",
use_cache=True,
use_cache=USE_CACHE,
event_types=["transfer.created", "transfer.reversed", "transfer.updated"],
**args,
)
application_fees = IncrementalStripeStream(
name="application_fees",
path="application_fees",
use_cache=True,
use_cache=USE_CACHE,
event_types=["application_fee.created", "application_fee.refunded"],
**args,
)
customers = IncrementalStripeStream(
name="customers",
path="customers",
use_cache=True,
event_types=["customer.created", "customer.updated", "customer.deleted"],
**args,
)
invoices = IncrementalStripeStream(
name="invoices",
path="invoices",
use_cache=True,
use_cache=USE_CACHE,
event_types=[
"invoice.created",
"invoice.finalization_failed",
Expand All @@ -185,7 +192,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"],
legacy_cursor_field=None,
extra_request_params={"object": "card"},
record_extractor=FilteringRecordExtractor("updated", None, "card"),
response_filter=lambda record: record["object"] == "card",
**args,
),
UpdatedCursorIncrementalStripeStream(
Expand All @@ -194,20 +201,20 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
event_types=["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"],
legacy_cursor_field=None,
extra_request_params={"object": "bank_account"},
record_extractor=FilteringRecordExtractor("updated", None, "bank_account"),
response_filter=lambda record: record["object"] == "bank_account",
**args,
),
Persons(**args),
SetupAttempts(**incremental_args),
StripeStream(name="accounts", path="accounts", use_cache=True, **args),
StripeStream(name="accounts", path="accounts", use_cache=USE_CACHE, **args),
CreatedCursorIncrementalStripeStream(name="shipping_rates", path="shipping_rates", **incremental_args),
CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args),
CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args),
CreatedCursorIncrementalStripeStream(name="file_links", path="file_links", **incremental_args),
UpdatedCursorIncrementalStripeStream(
name="checkout_sessions",
path="checkout/sessions",
use_cache=True,
use_cache=USE_CACHE,
legacy_cursor_field="expires_at",
event_types=[
"checkout.session.async_payment_failed",
Expand Down Expand Up @@ -246,7 +253,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
event_types=["issuing_authorization.created", "issuing_authorization.request", "issuing_authorization.updated"],
**args,
),
customers,
self.customers(**args),
IncrementalStripeStream(
name="cardholders",
path="issuing/cardholders",
Expand Down Expand Up @@ -334,7 +341,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
),
transfers,
IncrementalStripeStream(
name="refunds", path="refunds", use_cache=True, event_types=["refund.created", "refund.updated"], **args
name="refunds", path="refunds", use_cache=USE_CACHE, event_types=["refund.created", "refund.updated"], **args
),
IncrementalStripeStream(
name="payment_intents",
Expand Down Expand Up @@ -397,19 +404,18 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
UpdatedCursorIncrementalStripeLazySubStream(
name="bank_accounts",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources",
parent=customers,
parent=self.customers(expand_items=["data.sources"], **args),
event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"],
legacy_cursor_field=None,
parent_id="customer_id",
sub_items_attr="sources",
response_filter={"attr": "object", "value": "bank_account"},
extra_request_params={"object": "bank_account"},
record_extractor=FilteringRecordExtractor("updated", None, "bank_account"),
response_filter=lambda record: record["object"] == "bank_account",
**args,
),
StripeLazySubStream(
name="invoice_line_items",
path=lambda self, *args, stream_slice, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines",
path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines",
parent=invoices,
parent_id="invoice_id",
sub_items_attr="lines",
Expand Down
Loading
Loading