Skip to content

Commit

Permalink
🚨 🚨 Source Stripe: fix multiple issues regarding Refunds, CheckoutSes…
Browse files Browse the repository at this point in the history
…sions and CheckoutSessionsLineItems + fix stream schemas (#32286)

Co-authored-by: davydov-d <davydov-d@users.noreply.github.com>
  • Loading branch information
davydov-d and davydov-d authored Nov 16, 2023
1 parent 136535d commit f5cbe29
Show file tree
Hide file tree
Showing 15 changed files with 788 additions and 375 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: 4.4.2
basic_read:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "expires_at": 10000000000 },
"stream_state": { "updated": 10000000000 },
"stream_descriptor": { "name": "checkout_sessions" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "checkout_session_expires_at": 10000000000 },
"stream_state": { "checkout_session_updated": 10000000000 },
"stream_descriptor": { "name": "checkout_sessions_line_items" }
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["checkout_session_expires_at"],
"default_cursor_field": ["checkout_session_updated"],
"source_defined_primary_key": [["id"]]
},
"primary_key": [["id"]],
"cursor_field": ["checkout_session_expires_at"],
"cursor_field": ["checkout_session_updated"],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
Expand Down Expand Up @@ -459,11 +459,11 @@
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated"],
"default_cursor_field": ["created"],
"source_defined_primary_key": [["id"]]
},
"primary_key": [["id"]],
"cursor_field": ["updated"],
"cursor_field": ["created"],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
Expand Down

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion airbyte-integrations/connectors/source-stripe/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 4.5.4
dockerImageTag: 5.0.0
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand All @@ -33,6 +33,11 @@ data:
schema refresh of all effected streams is required to use the new cursor
format.
upgradeDeadline: "2023-09-14"
5.0.0:
message:
Version 5.0.0 introduces fixes for the `CheckoutSessions`, `CheckoutSessionsLineItems` and `Refunds` streams. The cursor field is changed for the `CheckoutSessionsLineItems` and `Refunds` streams. This will prevent data loss during incremental syncs.
Also, the `Invoices`, `Subscriptions` and `SubscriptionSchedule` stream schemas have been updated.
upgradeDeadline: "2023-11-30"
suggestedStreams:
streams:
- customers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"id": { "type": ["null", "string"] },
"checkout_session_id": { "type": ["null", "string"] },
"checkout_session_expires_at": { "type": ["null", "integer"] },
"checkout_session_created": { "type": ["null", "integer"] },
"checkout_session_updated": { "type": ["null", "integer"] },
"object": { "type": ["null", "string"] },
"amount_subtotal": { "type": ["null", "integer"] },
"amount_tax": { "type": ["null", "integer"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,7 @@
"type": ["null", "integer"]
},
"default_tax_rates": {
"type": ["null", "array"],
"items": {
"$ref": "tax_rates.json"
}
"$ref": "tax_rates.json"
},
"total_excluding_tax": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@
"type": ["null", "string"]
},
"default_tax_rates": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"additionalProperties": true
}
"$ref": "tax_rates.json"
},
"description": {
"type": ["null", "string"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,7 @@
}
},
"default_tax_rates": {
"type": ["null", "array"],
"items": {
"$ref": "tax_rates.json"
}
"$ref": "tax_rates.json"
},
"pause_collection": {
"type": ["null", "object"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import SyncMode
from source_stripe.streams import (
CheckoutSessionsLineItems,
CreatedCursorIncrementalStripeStream,
CustomerBalanceTransactions,
Events,
IncrementalStripeStream,
ParentIncrementalStipeSubStream,
Persons,
SetupAttempts,
StripeLazySubStream,
Expand Down Expand Up @@ -197,10 +197,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
subscription_items = StripeLazySubStream(
name="subscription_items",
path="subscription_items",
extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice[self.parent_id]},
extra_request_params=lambda self, stream_slice, *args, **kwargs: {"subscription": stream_slice["parent"]["id"]},
parent=subscriptions,
use_cache=USE_CACHE,
parent_id="subscription_id",
sub_items_attr="items",
**args,
)
Expand Down Expand Up @@ -238,8 +237,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
],
**args,
)
checkout_sessions = UpdatedCursorIncrementalStripeStream(
name="checkout_sessions",
path="checkout/sessions",
use_cache=USE_CACHE,
legacy_cursor_field="created",
event_types=[
"checkout.session.async_payment_failed",
"checkout.session.async_payment_succeeded",
"checkout.session.completed",
"checkout.session.expired",
],
**args,
)

streams = [
CheckoutSessionsLineItems(**incremental_args),
checkout_sessions,
CustomerBalanceTransactions(**args),
Events(**incremental_args),
UpdatedCursorIncrementalStripeStream(
Expand Down Expand Up @@ -267,19 +280,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args),
CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args),
CreatedCursorIncrementalStripeStream(name="file_links", path="file_links", **incremental_args),
UpdatedCursorIncrementalStripeStream(
name="checkout_sessions",
path="checkout/sessions",
use_cache=USE_CACHE,
legacy_cursor_field="expires_at",
event_types=[
"checkout.session.async_payment_failed",
"checkout.session.async_payment_succeeded",
"checkout.session.completed",
"checkout.session.expired",
],
**args,
),
# The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs.
# Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe.
# See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428
CreatedCursorIncrementalStripeStream(name="refunds", path="refunds", **incremental_args),
UpdatedCursorIncrementalStripeStream(
name="payment_methods",
path="payment_methods",
Expand Down Expand Up @@ -396,9 +400,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
**args,
),
transfers,
IncrementalStripeStream(
name="refunds", path="refunds", use_cache=USE_CACHE, event_types=["refund.created", "refund.updated"], **args
),
IncrementalStripeStream(
name="payment_intents",
path="payment_intents",
Expand Down Expand Up @@ -449,45 +450,56 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
),
UpdatedCursorIncrementalStripeLazySubStream(
name="application_fees_refunds",
path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice[self.parent_id]}/refunds",
path=lambda self, stream_slice, *args, **kwargs: f"application_fees/{stream_slice['parent']['id']}/refunds",
parent=application_fees,
event_types=["application_fee.refund.updated"],
parent_id="refund_id",
sub_items_attr="refunds",
add_parent_id=True,
**args,
),
UpdatedCursorIncrementalStripeLazySubStream(
name="bank_accounts",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice[self.parent_id]}/sources",
path=lambda self, stream_slice, *args, **kwargs: f"customers/{stream_slice['parent']['id']}/sources",
parent=self.customers(expand_items=["data.sources"], **args),
event_types=["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"],
legacy_cursor_field=None,
parent_id="customer_id",
sub_items_attr="sources",
extra_request_params={"object": "bank_account"},
response_filter=lambda record: record["object"] == "bank_account",
**args,
),
ParentIncrementalStipeSubStream(
name="checkout_sessions_line_items",
path=lambda self, stream_slice, *args, **kwargs: f"checkout/sessions/{stream_slice['parent']['id']}/line_items",
parent=checkout_sessions,
expand_items=["data.discounts", "data.taxes"],
cursor_field="checkout_session_updated",
slice_data_retriever=lambda record, stream_slice: {
"checkout_session_id": stream_slice["parent"]["id"],
"checkout_session_expires_at": stream_slice["parent"]["expires_at"],
"checkout_session_created": stream_slice["parent"]["created"],
"checkout_session_updated": stream_slice["parent"]["updated"],
**record,
},
**args,
),
StripeLazySubStream(
name="invoice_line_items",
path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice[self.parent_id]}/lines",
path=lambda self, stream_slice, *args, **kwargs: f"invoices/{stream_slice['parent']['id']}/lines",
parent=invoices,
parent_id="invoice_id",
sub_items_attr="lines",
add_parent_id=True,
slice_data_retriever=lambda record, stream_slice: {"invoice_id": stream_slice["parent"]["id"], **record},
**args,
),
subscription_items,
StripeSubStream(
name="transfer_reversals",
path=lambda self, stream_slice, *args, **kwargs: f"transfers/{stream_slice.get('parent', {}).get('id')}/reversals",
path=lambda self, stream_slice, *args, **kwargs: f"transfers/{stream_slice['parent']['id']}/reversals",
parent=transfers,
**args,
),
StripeSubStream(
name="usage_records",
path=lambda self, stream_slice, *args, **kwargs: f"subscription_items/{stream_slice.get('parent', {}).get('id')}/usage_record_summaries",
path=lambda self, stream_slice, *args, **kwargs: f"subscription_items/{stream_slice['parent']['id']}/usage_record_summaries",
parent=subscription_items,
primary_key=None,
**args,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ connectionSpecification:
description: >-
When set, the connector will always re-export data from the past N days,
where N is the value set here. This is useful if your data is frequently updated
after creation. Applies only to streams that do not support event-based incremental syncs: CheckoutSessionLineItems,
Events, SetupAttempts, ShippingRates, BalanceTransactions, Files, FileLinks. More info <a
after creation. The Lookback Window only applies to streams that do not support event-based incremental syncs: Events,
SetupAttempts, ShippingRates, BalanceTransactions, Files, FileLinks, Refunds. More info <a
href="https://docs.airbyte.com/integrations/sources/stripe#requirements">here</a>
order: 3
slice_range:
Expand Down
Loading

0 comments on commit f5cbe29

Please sign in to comment.