diff --git a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl index a84e541f0ecf..da9e7ed4ea18 100644 --- a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl @@ -46,7 +46,7 @@ {"stream": "products", "data": {"id": "prod_KouQ5ez86yREmB", "object": "product", "active": true, "attributes": [], "created": 1640124902, "default_price": "price_1K9GbqEcXtiJtvvhJ3lZe4i5", "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "edgao-test-product", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839715, "url": null}, "emitted_at": 1697627307635} {"stream": "products", "data": {"id": "prod_NHcKselSHfKdfc", "object": "product", "active": true, "attributes": [], "created": 1675345504, "default_price": "price_1MX364EcXtiJtvvhE3WgTl4O", "description": "Test Product 1 description", "features": [], "images": ["https://files.stripe.com/links/MDB8YWNjdF8xSndub2lFY1h0aUp0dnZofGZsX3Rlc3RfdjBOT09UaHRiNVl2WmJ6clNYRUlmcFFD00cCBRNHnV"], "livemode": false, "metadata": {}, "name": "Test Product 1", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10301000", "type": "service", "unit_label": null, "updated": 1696839789, "url": null}, "emitted_at": 1697627307877} {"stream": "products", "data": {"id": "prod_NCgx1XP2IFQyKF", "object": "product", "active": true, "attributes": [], "created": 1674209524, "default_price": null, "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "tu", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839225, "url": null}, "emitted_at": 1697627307879} -{"stream": "subscriptions", "data": {"id": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true}, "billing_cycle_anchor": 1697550676.0, "billing_thresholds": null, "cancel_at": 1705499476.0, "cancel_at_period_end": false, "canceled_at": 1697550676.0, "cancellation_details": {"comment": null, "feedback": null, "reason": "cancellation_requested"}, "collection_method": "charge_automatically", "created": 1697550676, "currency": "usd", "current_period_end": 1700229076.0, "current_period_start": 1697550676, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": null, "items": {"object": "list", "data": [{"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1O2Dg0EcXtiJtvvhz7Q4zS0n"}, "latest_invoice": "in_1O2Dg0EcXtiJtvvhLe87VaYL", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": null}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "start_date": 1697550676, "status": "active", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1697550676}, "emitted_at": 1697627310741} +{"stream": "subscriptions", "data": {"id": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true}, "billing_cycle_anchor": 1697550676.0, "billing_thresholds": null, "cancel_at": 1705499476.0, "cancel_at_period_end": false, "canceled_at": 1697550676.0, "cancellation_details": {"comment": null, "feedback": null, "reason": "cancellation_requested"}, "collection_method": "charge_automatically", "created": 1697550676, "currency": "usd", "current_period_end": 1702821076.0, "current_period_start": 1700229076, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": null, "items": {"object": "list", "data": [{"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1O2Dg0EcXtiJtvvhz7Q4zS0n"}, "latest_invoice": "in_1ODSSHEcXtiJtvvhW5LllxDH", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": null}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "start_date": 1697550676, "status": "active", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1697550676}, "emitted_at": 1700232971060} {"stream": "subscription_schedule", "data": {"id": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "object": "subscription_schedule", "application": null, "canceled_at": null, "completed_at": null, "created": 1697550676, "current_phase": {"end_date": 1705499476, "start_date": 1697550676}, "customer": "cus_NGoTFiJFVbSsvZ", "default_settings": {"application_fee_percent": null, "automatic_tax": {"enabled": false}, "billing_cycle_anchor": "automatic", "billing_thresholds": null, "collection_method": "charge_automatically", "default_payment_method": null, "default_source": null, "description": "Test Test", "invoice_settings": "{'days_until_due': None}", "on_behalf_of": null, "transfer_data": null}, "end_behavior": "cancel", "livemode": false, "metadata": {}, "phases": [{"add_invoice_items": [], "application_fee_percent": null, "automatic_tax": {"enabled": true}, "billing_cycle_anchor": null, "billing_thresholds": null, "collection_method": "charge_automatically", "coupon": null, "currency": "usd", "default_payment_method": null, "default_tax_rates": [], "description": "Test Test", "end_date": 1705499476, "invoice_settings": "{'days_until_due': None}", "items": [{"billing_thresholds": null, "metadata": {}, "plan": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "price": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "quantity": 1, "tax_rates": []}], "metadata": {}, "on_behalf_of": null, "proration_behavior": "create_prorations", "start_date": 1697550676, "transfer_data": null, "trial_end": null}], "released_at": null, "released_subscription": null, "renewal_interval": null, "status": "active", "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "test_clock": null, "updated": 1697550676}, "emitted_at": 1697627312079} {"stream": "transfers", "data": {"id": "tr_1NH18zEcXtiJtvvhnd827cNO", "object": "transfer", "amount": 10000, "amount_reversed": 0, "balance_transaction": "txn_1NH190EcXtiJtvvhBO3PeR7p", "created": 1686301085, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NH18zEYmRTj5on1GkCCsqLK", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [], "has_more": false, "total_count": 0.0, "url": "/v1/transfers/tr_1NH18zEcXtiJtvvhnd827cNO/reversals"}, "reversed": false, "source_transaction": null, "source_type": "card", "transfer_group": null, "updated": 1686301085}, "emitted_at": 1697627313262} {"stream": "transfers", "data": {"id": "tr_1NGoaCEcXtiJtvvhjmHtOGOm", "object": "transfer", "amount": 100, "amount_reversed": 100, "balance_transaction": "txn_1NGoaDEcXtiJtvvhsZrNMsdJ", "created": 1686252800, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NGoaCEYmRTj5on1LAlAIG3a", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [{"id": "trr_1NGolCEcXtiJtvvhOYPck3CP", "object": "transfer_reversal", "amount": 100, "balance_transaction": "txn_1NGolCEcXtiJtvvhZRy4Kd5S", "created": 1686253482, "currency": "usd", "destination_payment_refund": "pyr_1NGolBEYmRTj5on1STal3rmp", "metadata": {}, "source_refund": null, "transfer": "tr_1NGoaCEcXtiJtvvhjmHtOGOm"}], "has_more": false, "total_count": 1.0, "url": "/v1/transfers/tr_1NGoaCEcXtiJtvvhjmHtOGOm/reversals"}, "reversed": true, "source_transaction": null, "source_type": "card", "transfer_group": "ORDER10", "updated": 1686252800}, "emitted_at": 1697627313264} @@ -69,4 +69,4 @@ {"stream": "invoice_line_items", "data": {"id": "il_1MX2yfEcXtiJtvvhiunY2j1x", "object": "line_item", "amount": 25200, "amount_excluding_tax": 25200, "currency": "usd", "description": "edgao-test-product", "discount_amounts": [{"amount": 2520, "discount": "di_1MX2ysEcXtiJtvvh8ORqRVKm"}], "discountable": true, "discounts": ["di_1MX2ysEcXtiJtvvh8ORqRVKm"], "invoice_item": "ii_1MX2yfEcXtiJtvvhfhyOG7SP", "livemode": false, "metadata": {}, "period": {"end": 1675345045, "start": 1675345045}, "plan": null, "price": {"id": "price_1K9GbqEcXtiJtvvhJ3lZe4i5", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1640124902, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_KouQ5ez86yREmB", "recurring": null, "tax_behavior": "inclusive", "tiers_mode": null, "transform_quantity": null, "type": "one_time", "unit_amount": 12600, "unit_amount_decimal": "12600"}, "proration": false, "proration_details": {"credited_items": null}, "quantity": 2, "subscription": null, "tax_amounts": [{"amount": 0, "inclusive": true, "tax_rate": "txr_1MX2yfEcXtiJtvvhVcMEMTRj", "taxability_reason": "not_collecting", "taxable_amount": 0}], "tax_rates": [], "type": "invoiceitem", "unit_amount_excluding_tax": "12600", "invoice_id": "in_1MX2yFEcXtiJtvvhMXhUCgKx"}, "emitted_at": 1697627336449} {"stream": "subscription_items", "data": {"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}, "emitted_at": 1697627337431} {"stream": "transfer_reversals", "data": {"id": "trr_1NGolCEcXtiJtvvhOYPck3CP", "object": "transfer_reversal", "amount": 100, "balance_transaction": "txn_1NGolCEcXtiJtvvhZRy4Kd5S", "created": 1686253482, "currency": "usd", "destination_payment_refund": "pyr_1NGolBEYmRTj5on1STal3rmp", "metadata": {}, "source_refund": null, "transfer": "tr_1NGoaCEcXtiJtvvhjmHtOGOm"}, "emitted_at": 1697627338960} -{"stream": "usage_records", "data": {"id": "sis_1O4gIOEcXtiJtvvhmsoeBHkP", "object": "usage_record_summary", "invoice": null, "livemode": false, "period": {"end": null, "start": 1697550676}, "subscription_item": "si_OptSP2o3XZUBpx", "total_usage": 1}, "emitted_at": 1697627340175} +{"stream": "usage_records", "data": {"id": "sis_1ODTdwEcXtiJtvvhZChEVsbN", "object": "usage_record_summary", "invoice": null, "livemode": false, "period": {"end": null, "start": 1700229076}, "subscription_item": "si_OptSP2o3XZUBpx", "total_usage": 1}, "emitted_at": 1700233660884} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index f521fb172278..11fc73a40128 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 5.0.0 + dockerImageTag: 5.0.1 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py b/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py index 9906c21a525a..6226ffc12ea9 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/availability_strategy.py @@ -3,13 +3,16 @@ # import logging -from typing import Optional, Tuple +from typing import Any, Mapping, Optional, Tuple +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from requests import HTTPError +from .stream_helpers import get_first_record_for_slice, get_first_stream_slice + STRIPE_ERROR_CODES = { "more_permissions_required": "This is most likely due to insufficient permissions on the credentials in use. " "Try to grant required permissions/scopes or re-authenticate", @@ -20,6 +23,60 @@ class StripeAvailabilityStrategy(HttpAvailabilityStrategy): + def _check_availability_for_sync_mode( + self, + stream: Stream, + sync_mode: SyncMode, + logger: logging.Logger, + source: Optional["Source"], + stream_state: Optional[Mapping[str, Any]], + ) -> Tuple[bool, Optional[str]]: + try: + # Some streams need a stream slice to read records (e.g. if they have a SubstreamPartitionRouter) + # Streams that don't need a stream slice will return `None` as their first stream slice. + stream_slice = get_first_stream_slice(stream, sync_mode, stream_state) + except StopIteration: + # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) + # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield ` + # without accounting for the case in which the parent stream is empty. + reason = f"Cannot attempt to connect to stream {stream.name} - no stream slices were found, likely because the parent stream is empty." + return False, reason + except HTTPError as error: + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to get slices for {stream.name} stream, because of error in parent stream. {reason}" + return is_available, reason + + try: + get_first_record_for_slice(stream, sync_mode, stream_slice, stream_state) + return True, None + except StopIteration: + logger.info(f"Successfully connected to stream {stream.name}, but got 0 records.") + return True, None + except HTTPError as error: + is_available, reason = self.handle_http_error(stream, logger, source, error) + if not is_available: + reason = f"Unable to read {stream.name} stream. {reason}" + return is_available, reason + + def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]: + """ + Check stream availability by attempting to read the first record of the + stream. + + :param stream: stream + :param logger: source logger + :param source: (optional) source + :return: A tuple of (boolean, str). If boolean is true, then the stream + is available, and no str is required. Otherwise, the stream is unavailable + for some reason and the str should describe what went wrong and how to + resolve the unavailability, if possible. + """ + is_available, reason = self._check_availability_for_sync_mode(stream, SyncMode.full_refresh, logger, source, None) + if not is_available or not stream.supports_incremental: + return is_available, reason + return self._check_availability_for_sync_mode(stream, SyncMode.incremental, logger, source, {stream.cursor_field: 0}) + def handle_http_error( self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError ) -> Tuple[bool, Optional[str]]: diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py b/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py new file mode 100644 index 000000000000..dad073ae485b --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/stream_helpers.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Mapping, Optional + +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import Stream, StreamData + + +def get_first_stream_slice(stream, sync_mode, stream_state) -> Optional[Mapping[str, Any]]: + """ + Gets the first stream_slice from a given stream's stream_slices. + :param stream: stream + :param sync_mode: sync_mode + :param stream_state: stream_state + :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) + :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) + """ + # We wrap the return output of stream_slices() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + slices = iter(stream.stream_slices(sync_mode=sync_mode, cursor_field=stream.cursor_field, stream_state=stream_state)) + return next(slices) + + +def get_first_record_for_slice( + stream: Stream, sync_mode: SyncMode, stream_slice: Optional[Mapping[str, Any]], stream_state: Optional[Mapping[str, Any]] +) -> StreamData: + """ + Gets the first record for a stream_slice of a stream. + :param stream: stream + :param sync_mode: sync_mode + :param stream_slice: stream_slice + :param stream_state: stream_state + :raises StopIteration: if there is no first record to return (the read_records generator is empty) + :return: StreamData containing the first record in the slice + """ + # We wrap the return output of read_records() because some implementations return types that are iterable, + # but not iterators such as lists or tuples + records_for_slice = iter(stream.read_records(sync_mode=sync_mode, stream_slice=stream_slice, stream_state=stream_state)) + return next(records_for_slice) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py index a8b7feaef07f..f47f34d26bc6 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py @@ -15,6 +15,7 @@ from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy @@ -491,12 +492,9 @@ class CustomerBalanceTransactions(StripeStream): API docs: https://stripe.com/docs/api/customer_balance_transactions/list """ - def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): - return f"customers/{stream_slice['id']}/balance_transactions" - - @property - def customers(self) -> IncrementalStripeStream: - return IncrementalStripeStream( + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.parent = IncrementalStripeStream( name="customers", path="customers", use_cache=USE_CACHE, @@ -506,13 +504,19 @@ def customers(self) -> IncrementalStripeStream: start_date=self.start_date, ) + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): + return f"customers/{stream_slice['id']}/balance_transactions" + + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return StripeSubStreamAvailabilityStrategy() + def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - parent_stream = self.customers - slices = parent_stream.stream_slices(sync_mode=SyncMode.full_refresh) + slices = self.parent.stream_slices(sync_mode=SyncMode.full_refresh) for _slice in slices: - for customer in parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice): + for customer in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice): # we use `get` here because some attributes may not be returned by some API versions if customer.get("next_invoice_sequence") == 1 and customer.get("balance") == 0: # We're making this check in order to speed up a sync. if a customer's balance is 0 and there are no @@ -547,6 +551,12 @@ def __init__(self, **kwargs): def path(self, **kwargs) -> str: return "setup_attempts" + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + # we use the default http availability strategy here because parent stream may lack data in the incremental stream mode + # and this stream would be marked inaccessible which is not actually true + return HttpAvailabilityStrategy() + def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: @@ -586,6 +596,10 @@ def __init__(self, *args, **kwargs): parent = StripeStream(*args, name="accounts", path="accounts", use_cache=USE_CACHE, **kwargs) super().__init__(*args, parent=parent, **kwargs) + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return StripeSubStreamAvailabilityStrategy() + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs): return f"accounts/{stream_slice['parent']['id']}/persons" @@ -597,7 +611,9 @@ def stream_slices( class StripeSubStream(StripeStream, HttpSubStream): - pass + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return StripeSubStreamAvailabilityStrategy() class StripeLazySubStream(StripeStream, HttpSubStream): @@ -800,10 +816,6 @@ def stream_slices( def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: return {self.cursor_field: max(current_stream_state.get(self.cursor_field, 0), latest_record[self.cursor_field])} - @property - def availability_strategy(self) -> Optional[AvailabilityStrategy]: - return StripeSubStreamAvailabilityStrategy() - @property def raise_on_http_errors(self) -> bool: return False @@ -822,3 +834,9 @@ def parse_response(self, response: requests.Response, *args, **kwargs) -> Iterab ) return [] response.raise_for_status() + + @property + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + # we use the default http availability strategy here because parent stream may lack data in the incremental stream mode + # and this stream would be marked inaccessible which is not actually true + return HttpAvailabilityStrategy() diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py index 0f747acac434..ee41b71dd049 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_availability_strategy.py @@ -3,34 +3,45 @@ # import logging +import urllib.parse +import pytest from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from source_stripe.availability_strategy import STRIPE_ERROR_CODES, StripeSubStreamAvailabilityStrategy from source_stripe.streams import IncrementalStripeStream, StripeLazySubStream -def test_traverse_over_substreams(mocker): +@pytest.fixture() +def stream_mock(mocker): + def _mocker(): + return mocker.Mock(stream_slices=mocker.Mock(return_value=[{}]), read_records=mocker.Mock(return_value=[{}])) + return _mocker + + +def test_traverse_over_substreams(stream_mock, mocker): # Mock base HttpAvailabilityStrategy to capture all the check_availability method calls - check_availability_mock = mocker.MagicMock() - check_availability_mock.return_value = (True, None) + check_availability_mock = mocker.MagicMock(return_value=(True, None)) + cdk_check_availability_mock = mocker.MagicMock(return_value=(True, None)) mocker.patch( - "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock + "source_stripe.availability_strategy.StripeAvailabilityStrategy.check_availability", check_availability_mock + ) + mocker.patch( + "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", cdk_check_availability_mock ) - # Prepare tree of nested objects - root = mocker.Mock() + root = stream_mock() root.availability_strategy = HttpAvailabilityStrategy() root.parent = None - child_1 = mocker.Mock() + child_1 = stream_mock() child_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1.parent = root - child_1_1 = mocker.Mock() + child_1_1 = stream_mock() child_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1_1.parent = child_1 - child_1_1_1 = mocker.Mock() + child_1_1_1 = stream_mock() child_1_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1_1_1.parent = child_1_1 @@ -38,39 +49,38 @@ def test_traverse_over_substreams(mocker): is_available, reason = child_1_1_1.availability_strategy.check_availability(child_1_1_1, mocker.Mock(), mocker.Mock()) assert is_available and reason is None - # Check availability strategy was called once for every nested object - assert check_availability_mock.call_count == 4 + assert check_availability_mock.call_count == 3 + assert cdk_check_availability_mock.call_count == 1 # Check each availability strategy was called with proper instance argument - assert id(check_availability_mock.call_args_list[0].args[0]) == id(root) - assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1) - assert id(check_availability_mock.call_args_list[2].args[0]) == id(child_1_1) - assert id(check_availability_mock.call_args_list[3].args[0]) == id(child_1_1_1) + assert id(cdk_check_availability_mock.call_args_list[0].args[0]) == id(root) + assert id(check_availability_mock.call_args_list[0].args[0]) == id(child_1) + assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1_1) + assert id(check_availability_mock.call_args_list[2].args[0]) == id(child_1_1_1) -def test_traverse_over_substreams_failure(mocker): +def test_traverse_over_substreams_failure(stream_mock, mocker): # Mock base HttpAvailabilityStrategy to capture all the check_availability method calls - check_availability_mock = mocker.MagicMock() - check_availability_mock.side_effect = [(True, None), (False, "child_1")] + check_availability_mock = mocker.MagicMock(side_effect=[(True, None), (False, "child_1")]) mocker.patch( - "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock + "source_stripe.availability_strategy.StripeAvailabilityStrategy.check_availability", check_availability_mock ) # Prepare tree of nested objects - root = mocker.Mock() + root = stream_mock() root.availability_strategy = HttpAvailabilityStrategy() root.parent = None - child_1 = mocker.Mock() + child_1 = stream_mock() child_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1.parent = root - child_1_1 = mocker.Mock() + child_1_1 = stream_mock() child_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1_1.parent = child_1 - child_1_1_1 = mocker.Mock() + child_1_1_1 = stream_mock() child_1_1_1.availability_strategy = StripeSubStreamAvailabilityStrategy() child_1_1_1.parent = child_1_1 @@ -83,15 +93,15 @@ def test_traverse_over_substreams_failure(mocker): assert check_availability_mock.call_count == 2 # Check each availability strategy was called with proper instance argument - assert id(check_availability_mock.call_args_list[0].args[0]) == id(root) - assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1) + assert id(check_availability_mock.call_args_list[0].args[0]) == id(child_1) + assert id(check_availability_mock.call_args_list[1].args[0]) == id(child_1_1) def test_substream_availability(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( - "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock + "source_stripe.availability_strategy.StripeAvailabilityStrategy.check_availability", check_availability_mock ) stream = stream_by_name("invoice_line_items") is_available, reason = stream.availability_strategy.check_availability(stream, mocker.Mock(), mocker.Mock()) @@ -106,7 +116,7 @@ def test_substream_availability_no_parent(mocker, stream_by_name): check_availability_mock = mocker.MagicMock() check_availability_mock.return_value = (True, None) mocker.patch( - "airbyte_cdk.sources.streams.http.availability_strategy.HttpAvailabilityStrategy.check_availability", check_availability_mock + "source_stripe.availability_strategy.StripeAvailabilityStrategy.check_availability", check_availability_mock ) stream = stream_by_name("invoice_line_items") stream.parent = None @@ -125,3 +135,91 @@ def test_403_error_handling(stream_by_name, requests_mock): available, message = stream.check_availability(logger) assert not available assert STRIPE_ERROR_CODES[error_code] in message + + +@pytest.mark.parametrize( + "stream_name, endpoints, expected_calls", + ( + ( + "accounts", + { + "/v1/accounts": {"data": []} + }, + 1 + ), + ( + "refunds", + { + "/v1/refunds": {"data": []} + }, + 2 + ), + ( + "credit_notes", + { + "/v1/credit_notes": {"data": []}, "/v1/events": {"data": []} + }, + 2 + ), + ( + "charges", + { + "/v1/charges": {"data": []}, "/v1/events": {"data": []} + }, + 2 + ), + ( + "subscription_items", + { + "/v1/subscriptions": {"data": [{"id": 1}]}, + "/v1/events": {"data": []} + }, + 3 + ), + ( + "bank_accounts", + { + "/v1/customers": {"data": [{"id": 1}]}, + "/v1/events": {"data": []} + }, + 2 + ), + ( + "customer_balance_transactions", + { + "/v1/events": {"data": [{"data":{"object": {"id": 1}}, "created": 1, "type": "customer.updated"}]}, + "/v1/customers": {"data": [{"id": 1}]}, + "/v1/customers/1/balance_transactions": {"data": []} + }, + 4 + ), + ( + "transfer_reversals", + { + "/v1/transfers": {"data": [{"id": 1}]}, + "/v1/events": {"data": [{"data":{"object": {"id": 1}}, "created": 1, "type": "transfer.updated"}]}, + "/v1/transfers/1/reversals": {"data": []} + }, + 4 + ), + ( + "persons", + { + "/v1/accounts": {"data": [{"id": 1}]}, + "/v1/events": {"data": []}, + "/v1/accounts/1/persons": {"data": []} + }, + 4 + ) + ) +) +def test_availability_strategy_visits_endpoints(stream_by_name, stream_name, endpoints, expected_calls, requests_mock, mocker, config): + for endpoint, data in endpoints.items(): + requests_mock.get(endpoint, json=data) + stream = stream_by_name(stream_name, config) + is_available, reason = stream.check_availability(mocker.Mock(), mocker.Mock()) + assert (is_available, reason) == (True, None) + assert len(requests_mock.request_history) == expected_calls + + for call in requests_mock.request_history: + assert urllib.parse.urlparse(call.url).path in endpoints.keys() diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 3f949c6847e6..c873eae04833 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -216,7 +216,8 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 5.0.0 | 2023-11-14 | [32286](https://github.com/airbytehq/airbyte/pull/32286/) | Fix multiple issues regarding usage of the incremental sync mode for the `Refunds`, `CheckoutSessions`, `CheckoutSessionsLineItems` streams. Fix schemas for the streams: `Invoices`, `Subscriptions`, `SubscriptionSchedule` | +| 5.0.1 | 2023-11-17 | [32638](https://github.com/airbytehq/airbyte/pull/32638/) | Availability stretegy: check availability of both endpoints (if applicable) - common API + events API | +| 5.0.0 | 2023-11-16 | [32286](https://github.com/airbytehq/airbyte/pull/32286/) | Fix multiple issues regarding usage of the incremental sync mode for the `Refunds`, `CheckoutSessions`, `CheckoutSessionsLineItems` streams. Fix schemas for the streams: `Invoices`, `Subscriptions`, `SubscriptionSchedule` | | 4.5.4 | 2023-11-16 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting | | 4.5.3 | 2023-11-14 | [32473](https://github.com/airbytehq/airbyte/pull/32473/) | Have all full_refresh stream syncs be concurrent | | 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues |