From a28aab900c9a2d395f4a64f0271f60278e489e6b Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Tue, 6 Feb 2024 08:18:17 -0500 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Source=20Stripe:=20Events=20stream?= =?UTF-8?q?=20concurrent=20on=20incremental=20syncs=20(#34619)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source-stripe/acceptance-test-config.yml | 1 + .../integration_tests/expected_records.jsonl | 11 +-- .../connectors/source-stripe/metadata.yaml | 2 +- .../connectors/source-stripe/setup.py | 2 +- .../source-stripe/source_stripe/run.py | 2 + .../source-stripe/source_stripe/source.py | 79 +++++++++++++------ .../source-stripe/unit_tests/conftest.py | 8 +- .../integration/test_application_fees.py | 16 ++-- .../test_application_fees_refunds.py | 7 +- .../integration/test_authorizations.py | 8 +- .../integration/test_bank_accounts.py | 7 +- .../unit_tests/integration/test_cards.py | 8 +- .../integration/test_early_fraud_warnings.py | 8 +- .../unit_tests/integration/test_events.py | 14 ++-- .../test_external_account_bank_accounts.py | 8 +- .../test_external_account_cards.py | 8 +- .../integration/test_payment_methods.py | 8 +- .../unit_tests/integration/test_persons.py | 54 +++++++------ .../unit_tests/integration/test_reviews.py | 8 +- .../integration/test_transactions.py | 8 +- .../source-stripe/unit_tests/test_source.py | 23 +++--- .../source-stripe/unit_tests/test_streams.py | 2 +- docs/integrations/sources/stripe.md | 1 + 23 files changed, 176 insertions(+), 117 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/acceptance-test-config.yml b/airbyte-integrations/connectors/source-stripe/acceptance-test-config.yml index c3002b6d31f3..af2bc7f25dfe 100644 --- a/airbyte-integrations/connectors/source-stripe/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-stripe/acceptance-test-config.yml @@ -18,6 +18,7 @@ acceptance_tests: basic_read: tests: - config_path: "secrets/config.json" + fail_on_extra_columns: false # CATs are failing since https://github.com/airbytehq/airbyte/commit/dccb2fa7165f031fa1233d695897b07f9aacb39c, API Source team to fix this timeout_seconds: 3600 empty_streams: - name: "application_fees" diff --git a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl index 25517e84945e..c6602a1a5df7 100644 --- a/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-stripe/integration_tests/expected_records.jsonl @@ -4,9 +4,10 @@ {"stream": "setup_attempts", "data": {"id": "setatt_1KnfIjEcXtiJtvvhqDfSlpM4", "object": "setup_attempt", "application": null, "created": 1649752937, "customer": null, "flow_directions": null, "livemode": false, "on_behalf_of": null, "payment_method": "pm_1KnfIj2eZvKYlo2CAlv2Vhqc", "payment_method_details": {"acss_debit": {}, "type": "acss_debit"}, "setup_error": null, "setup_intent": "seti_1KnfIjEcXtiJtvvhPw5znVKY", "status": "succeeded", "usage": "off_session"}, "emitted_at": 1697627241471} {"stream": "setup_attempts", "data": {"id": "setatt_1KnfIdEcXtiJtvvhpDrYVlRP", "object": "setup_attempt", "application": null, "created": 1649752931, "customer": null, "flow_directions": null, "livemode": false, "on_behalf_of": null, "payment_method": "pm_1KnfIc2eZvKYlo2Civ7snSPy", "payment_method_details": {"acss_debit": {}, "type": "acss_debit"}, "setup_error": null, "setup_intent": "seti_1KnfIcEcXtiJtvvh61qlCaDf", "status": "succeeded", "usage": "off_session"}, "emitted_at": 1697627242509} {"stream": "setup_attempts", "data": {"id": "setatt_1KnfIVEcXtiJtvvhqouWGuhD", "object": "setup_attempt", "application": null, "created": 1649752923, "customer": null, "flow_directions": null, "livemode": false, "on_behalf_of": null, "payment_method": "pm_1KnfIV2eZvKYlo2CaOLGBF00", "payment_method_details": {"acss_debit": {}, "type": "acss_debit"}, "setup_error": null, "setup_intent": "seti_1KnfIVEcXtiJtvvhWiIbMkpH", "status": "succeeded", "usage": "off_session"}, "emitted_at": 1697627243547} -{"stream": "accounts", "data": {"id": "acct_1NGp6SD04fX0Aizk", "object": "account", "capabilities": {"acss_debit_payments": "active", "affirm_payments": "active", "afterpay_clearpay_payments": "active", "bancontact_payments": "active", "card_payments": "active", "cartes_bancaires_payments": "pending", "cashapp_payments": "active", "eps_payments": "active", "giropay_payments": "active", "ideal_payments": "active", "klarna_payments": "active", "link_payments": "active", "p24_payments": "active", "sepa_debit_payments": "active", "sofort_payments": "active", "transfers": "active", "us_bank_account_ach_payments": "active"}, "charges_enabled": true, "country": "US", "default_currency": "usd", "details_submitted": true, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "payouts_enabled": true, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"statement_descriptor_prefix": "AIRBYTE", "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": "Airbyte", "timezone": "Asia/Tbilisi"}, "payments": {"statement_descriptor": "WWW.AIRBYTE.COM", "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "sepa_debit_payments": {}}, "type": "standard"}, "emitted_at": 1697627267880} -{"stream": "accounts", "data": {"id": "acct_1MwD6tIyVv44cUB4", "object": "account", "business_profile": {"annual_revenue": null,"estimated_worker_count": null,"mcc": null, "name": null, "product_description": null, "support_address": null, "support_email": null, "support_phone": null, "support_url": null, "url": null}, "business_type": null, "capabilities": {"card_payments": "inactive", "transfers": "inactive"}, "charges_enabled": false, "country": "US", "created": 1681342196, "default_currency": "usd", "details_submitted": false, "email": "jenny.rosen@example.com", "external_accounts": {"object": "list", "data": [], "has_more": false, "total_count": 0, "url": "/v1/accounts/acct_1MwD6tIyVv44cUB4/external_accounts"}, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "metadata": {}, "payouts_enabled": false, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "disabled_reason": "requirements.past_due", "errors": [], "eventually_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "past_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"decline_on": {"avs_failure": false, "cvc_failure": false}, "statement_descriptor_prefix": null, "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": null, "timezone": "Etc/UTC"}, "payments": {"statement_descriptor": null, "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "payouts": {"debit_negative_balances": false, "schedule": {"delay_days": 2, "interval": "daily"}, "statement_descriptor": null}, "sepa_debit_payments": {}}, "tos_acceptance": {"date": null, "ip": null, "user_agent": null}, "type": "custom"}, "emitted_at": 1697627267882} -{"stream": "accounts", "data": {"id": "acct_1Jx8unEYmRTj5on1", "object": "account", "business_profile": {"annual_revenue": null,"estimated_worker_count": null,"mcc": null, "name": "Airbyte", "support_address": null, "support_email": null, "support_phone": null, "support_url": null, "url": null}, "capabilities": {}, "charges_enabled": false, "controller": {"type": "account"}, "country": "US", "default_currency": "usd", "details_submitted": false, "email": null, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "metadata": {}, "payouts_enabled": false, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": ["business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "tos_acceptance.date", "tos_acceptance.ip"], "disabled_reason": "requirements.past_due", "errors": [], "eventually_due": ["business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "tos_acceptance.date", "tos_acceptance.ip"], "past_due": [], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"statement_descriptor_prefix": null, "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": null, "timezone": "Etc/UTC"}, "payments": {"statement_descriptor": null, "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "sepa_debit_payments": {}}, "type": "standard"}, "emitted_at": 1697627267884} +{"stream": "accounts", "data": {"id": "acct_1NGp6SD04fX0Aizk", "object": "account", "capabilities": {"acss_debit_payments": "active", "affirm_payments": "active", "afterpay_clearpay_payments": "active", "bancontact_payments": "active", "card_payments": "active", "cartes_bancaires_payments": "pending", "cashapp_payments": "active", "eps_payments": "active", "giropay_payments": "active", "ideal_payments": "active", "klarna_payments": "active", "link_payments": "active", "p24_payments": "active", "sepa_debit_payments": "active", "sofort_payments": "active", "transfers": "active", "us_bank_account_ach_payments": "active"}, "charges_enabled": true, "country": "US", "default_currency": "usd", "details_submitted": true, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "payouts_enabled": true, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"statement_descriptor_prefix": "AIRBYTE", "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": "Airbyte", "timezone": "Asia/Tbilisi"}, "invoices": {"default_account_tax_ids": null}, "payments": {"statement_descriptor": "WWW.AIRBYTE.COM", "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "sepa_debit_payments": {}}, "type": "standard"}, "emitted_at": 1697627267880} +{"stream": "accounts", "data": {"id": "acct_1MwD6tIyVv44cUB4", "object": "account", "business_profile": {"annual_revenue": null,"estimated_worker_count": null,"mcc": null, "name": null, "product_description": null, "support_address": null, "support_email": null, "support_phone": null, "support_url": null, "url": null}, "business_type": null, "capabilities": {"card_payments": "inactive", "transfers": "inactive"}, "charges_enabled": false, "country": "US", "created": 1681342196, "default_currency": "usd", "details_submitted": false, "email": "jenny.rosen@example.com", "external_accounts": {"object": "list", "data": [], "has_more": false, "total_count": 0, "url": "/v1/accounts/acct_1MwD6tIyVv44cUB4/external_accounts"}, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "metadata": {}, "payouts_enabled": false, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "disabled_reason": "requirements.past_due", "errors": [], "eventually_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "past_due": ["business_profile.mcc", "business_profile.url", "business_type", "external_account", "representative.first_name", "representative.last_name", "tos_acceptance.date", "tos_acceptance.ip"], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"decline_on": {"avs_failure": false, "cvc_failure": false}, "statement_descriptor_prefix": null, "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": null, "timezone": "Etc/UTC"}, "invoices": {"default_account_tax_ids": null}, "payments": {"statement_descriptor": null, "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "payouts": {"debit_negative_balances": false, "schedule": {"delay_days": 2, "interval": "daily"}, "statement_descriptor": null}, "sepa_debit_payments": {}}, "tos_acceptance": {"date": null, "ip": null, "user_agent": null}, "type": "custom"}, "emitted_at": 1697627267882} +{"stream": "accounts", "data": {"id": "acct_1Jx8unEYmRTj5on1", "object": "account", "business_profile": {"annual_revenue": null,"estimated_worker_count": null,"mcc": null, "name": "Airbyte", "support_address": null, "support_email": null, "support_phone": null, "support_url": null, "url": null}, "capabilities": {}, "charges_enabled": false, "controller": {"type": "account"}, "country": "US", "default_currency": "usd", "details_submitted": false, "email": null, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "metadata": {}, "payouts_enabled": false, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": ["business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "tos_acceptance.date", "tos_acceptance.ip"], "disabled_reason": "requirements.past_due", "errors": [], "eventually_due": ["business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "tos_acceptance.date", "tos_acceptance.ip"], "past_due": [], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"statement_descriptor_prefix": null, "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": null, "timezone": "Etc/UTC"}, "invoices": {"default_account_tax_ids": null}, "payments": {"statement_descriptor": null, "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "sepa_debit_payments": {}}, "type": "standard"}, "emitted_at": 1697627267884} +{"stream": "accounts", "data": {"id": "acct_1HRPLyCpK2Z3jTFF", "object": "account", "capabilities": {"acss_debit_payments": "inactive", "afterpay_clearpay_payments": "inactive", "bancontact_payments": "inactive", "card_payments": "inactive", "eps_payments": "inactive", "giropay_payments": "inactive", "ideal_payments": "inactive", "p24_payments": "inactive", "sepa_debit_payments": "inactive", "sofort_payments": "inactive", "transfers": "inactive"}, "charges_enabled": false, "country": "US", "default_currency": "usd", "details_submitted": false, "future_requirements": {"alternatives": [], "current_deadline": null, "currently_due": [], "disabled_reason": null, "errors": [], "eventually_due": [], "past_due": [], "pending_verification": []}, "payouts_enabled": false, "requirements": {"alternatives": [], "current_deadline": null, "currently_due": ["business_profile.mcc", "business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "individual.dob.day", "individual.dob.month", "individual.dob.year", "individual.email", "individual.first_name", "individual.last_name", "individual.phone", "individual.ssn_last_4", "tos_acceptance.date", "tos_acceptance.ip"], "disabled_reason": "requirements.past_due", "errors": [], "eventually_due": ["business_profile.mcc", "business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "individual.dob.day", "individual.dob.month", "individual.dob.year", "individual.email", "individual.first_name", "individual.last_name", "individual.phone", "individual.ssn_last_4", "tos_acceptance.date", "tos_acceptance.ip"], "past_due": ["business_profile.mcc", "business_profile.product_description", "business_profile.support_phone", "business_profile.url", "external_account", "individual.dob.day", "individual.dob.month", "individual.dob.year", "individual.email", "individual.first_name", "individual.last_name", "individual.phone", "individual.ssn_last_4", "tos_acceptance.date", "tos_acceptance.ip"], "pending_verification": []}, "settings": {"bacs_debit_payments": {"display_name": null, "service_user_number": null}, "branding": {"icon": null, "logo": null, "primary_color": null, "secondary_color": null}, "card_issuing": {"tos_acceptance": {"date": null, "ip": null}}, "card_payments": {"statement_descriptor_prefix": null, "statement_descriptor_prefix_kana": null, "statement_descriptor_prefix_kanji": null}, "dashboard": {"display_name": null, "timezone": "America/Los_Angeles"}, "invoices": {"default_account_tax_ids": null}, "payments": {"statement_descriptor": null, "statement_descriptor_kana": null, "statement_descriptor_kanji": null}, "sepa_debit_payments": {}}, "type": "standard"}, "emitted_at": 1707141120832} {"stream": "shipping_rates", "data": {"id": "shr_1NXgplEcXtiJtvvhA1ntV782", "object": "shipping_rate", "active": true, "created": 1690274589, "delivery_estimate": "{'maximum': {'unit': 'business_day', 'value': 14}, 'minimum': {'unit': 'business_day', 'value': 10}}", "display_name": "Test Ground Shipping", "fixed_amount": {"amount": 999, "currency": "usd"}, "livemode": false, "metadata": {}, "tax_behavior": "inclusive", "tax_code": "txcd_92010001", "type": "fixed_amount"}, "emitted_at": 1697627269309} {"stream": "balance_transactions", "data": {"id": "txn_1KVQhfEcXtiJtvvhF7ox3YEm", "object": "balance_transaction", "amount": -9164, "available_on": 1645488000, "created": 1645406919, "currency": "usd", "description": "STRIPE PAYOUT", "exchange_rate": null, "fee": 0, "fee_details": [], "net": -9164, "reporting_category": "payout", "source": "po_1KVQhfEcXtiJtvvhZlUkl08U", "status": "available", "type": "payout"}, "emitted_at": 1697627270253} {"stream": "balance_transactions", "data": {"id": "txn_3K9FSOEcXtiJtvvh0KoS5mx7", "object": "balance_transaction", "amount": 5300, "available_on": 1640649600, "created": 1640120473, "currency": "usd", "description": null, "exchange_rate": null, "fee": 184, "fee_details": [{"amount": 184, "application": null, "currency": "usd", "description": "Stripe processing fees", "type": "stripe_fee"}], "net": 5116, "reporting_category": "charge", "source": "ch_3K9FSOEcXtiJtvvh0zxb7clc", "status": "available", "type": "charge"}, "emitted_at": 1697627270254} @@ -46,8 +47,8 @@ {"stream": "products", "data": {"id": "prod_KouQ5ez86yREmB", "object": "product", "active": true, "attributes": [], "created": 1640124902, "default_price": "price_1K9GbqEcXtiJtvvhJ3lZe4i5", "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "edgao-test-product", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839715, "url": null}, "emitted_at": 1697627307635} {"stream": "products", "data": {"id": "prod_NHcKselSHfKdfc", "object": "product", "active": true, "attributes": [], "created": 1675345504, "default_price": "price_1MX364EcXtiJtvvhE3WgTl4O", "description": "Test Product 1 description", "features": [], "images": ["https://files.stripe.com/links/MDB8YWNjdF8xSndub2lFY1h0aUp0dnZofGZsX3Rlc3RfdjBOT09UaHRiNVl2WmJ6clNYRUlmcFFD00cCBRNHnV"], "livemode": false, "metadata": {}, "name": "Test Product 1", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10301000", "type": "service", "unit_label": null, "updated": 1696839789, "url": null}, "emitted_at": 1697627307877} {"stream": "products", "data": {"id": "prod_NCgx1XP2IFQyKF", "object": "product", "active": true, "attributes": [], "created": 1674209524, "default_price": null, "description": null, "features": [], "images": [], "livemode": false, "metadata": {}, "name": "tu", "package_dimensions": null, "shippable": null, "statement_descriptor": null, "tax_code": "txcd_10000000", "type": "service", "unit_label": null, "updated": 1696839225, "url": null}, "emitted_at": 1697627307879} -{"stream": "subscriptions", "data": {"id": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": 1697550676.0, "billing_cycle_anchor_config": null, "billing_thresholds": null, "cancel_at": null, "cancel_at_period_end": false, "canceled_at": 1697550676.0, "cancellation_details": {"comment": null, "feedback": null, "reason": "cancellation_requested"}, "collection_method": "charge_automatically", "created": 1697550676, "currency": "usd", "current_period_end": 1705499476.0, "current_period_start": 1702821076, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": 1705329724.0, "invoice_settings": {"issuer": {"type": "self"}}, "items": {"object": "list", "data": [{"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1O2Dg0EcXtiJtvvhz7Q4zS0n"}, "latest_invoice": "in_1OOKkUEcXtiJtvvheUUavyuB", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": null}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "start_date": 1697550676, "status": "canceled", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1697550676}, "emitted_at": 1705636378387} -{"stream":"subscription_schedule","data":{"id":"sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP","object":"subscription_schedule","application":null,"canceled_at":"1705329724","completed_at":null,"created":1697550676,"current_phase":null,"customer":"cus_NGoTFiJFVbSsvZ","default_settings":{"application_fee_percent":null,"automatic_tax":{"enabled":false, "liability": null},"billing_cycle_anchor":"automatic","billing_thresholds":null,"collection_method":"charge_automatically","default_payment_method":null,"default_source":null,"description":"Test Test","invoice_settings":"{'days_until_due': None, 'issuer': {'type': 'self'}}","on_behalf_of":null,"transfer_data":null},"end_behavior":"cancel","livemode":false,"metadata":{},"phases":[{"add_invoice_items":[],"application_fee_percent":null,"automatic_tax":{"enabled":true, "liability": {"type": "self"}},"billing_cycle_anchor":null,"billing_thresholds":null,"collection_method":"charge_automatically","coupon":null,"currency":"usd","default_payment_method":null,"default_tax_rates":[],"description":"Test Test","end_date":1705499476,"invoice_settings":"{'days_until_due': None, 'issuer': None}","items":[{"billing_thresholds":null,"metadata":{},"plan":"price_1MSHZoEcXtiJtvvh6O8TYD8T","price":"price_1MSHZoEcXtiJtvvh6O8TYD8T","quantity":1,"tax_rates":[]}],"metadata":{},"on_behalf_of":null,"proration_behavior":"create_prorations","start_date":1697550676,"transfer_data":null,"trial_end":null}],"released_at":null,"released_subscription":null,"renewal_interval":null,"status":"canceled","subscription":"sub_1O2Dg0EcXtiJtvvhz7Q4zS0n","test_clock":null,"updated":1697550676},"emitted_at":1705636378620} +{"stream": "subscriptions", "data": {"id": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "object": "subscription", "application": null, "application_fee_percent": null, "automatic_tax": {"enabled": true, "liability": {"type": "self"}}, "billing_cycle_anchor": 1697550676.0, "billing_cycle_anchor_config": null, "billing_thresholds": null, "cancel_at": null, "cancel_at_period_end": false, "canceled_at": 1697550676.0, "cancellation_details": {"comment": null, "feedback": null, "reason": "cancellation_requested"}, "collection_method": "charge_automatically", "created": 1697550676, "currency": "usd", "current_period_end": 1705499476.0, "current_period_start": 1702821076, "customer": "cus_NGoTFiJFVbSsvZ", "days_until_due": null, "default_payment_method": null, "default_source": null, "default_tax_rates": [], "description": null, "discount": null, "ended_at": 1705329724.0, "invoice_settings": {"account_tax_ids": null, "issuer": {"type": "self"}}, "items": {"object": "list", "data": [{"id": "si_OptSP2o3XZUBpx", "object": "subscription_item", "billing_thresholds": null, "created": 1697550677, "metadata": {}, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "price": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "price", "active": true, "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "custom_unit_amount": null, "livemode": false, "lookup_key": null, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "recurring": {"aggregate_usage": null, "interval": "month", "interval_count": 1, "trial_period_days": null, "usage_type": "licensed"}, "tax_behavior": "exclusive", "tiers_mode": null, "transform_quantity": null, "type": "recurring", "unit_amount": 600, "unit_amount_decimal": "600"}, "quantity": 1, "subscription": "sub_1O2Dg0EcXtiJtvvhz7Q4zS0n", "tax_rates": []}], "has_more": false, "total_count": 1.0, "url": "/v1/subscription_items?subscription=sub_1O2Dg0EcXtiJtvvhz7Q4zS0n"}, "latest_invoice": "in_1OOKkUEcXtiJtvvheUUavyuB", "livemode": false, "metadata": {}, "next_pending_invoice_item_invoice": null, "on_behalf_of": null, "pause_collection": null, "payment_settings": {"payment_method_options": null, "payment_method_types": null, "save_default_payment_method": null}, "pending_invoice_item_interval": null, "pending_setup_intent": null, "pending_update": null, "plan": {"id": "price_1MSHZoEcXtiJtvvh6O8TYD8T", "object": "plan", "active": true, "aggregate_usage": null, "amount": 600, "amount_decimal": "600", "billing_scheme": "per_unit", "created": 1674209524, "currency": "usd", "interval": "month", "interval_count": 1, "livemode": false, "metadata": {}, "nickname": null, "product": "prod_NCgx1XP2IFQyKF", "tiers_mode": null, "transform_usage": null, "trial_period_days": null, "usage_type": "licensed"}, "quantity": 1, "schedule": "sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP", "start_date": 1697550676, "status": "canceled", "test_clock": null, "transfer_data": null, "trial_end": null, "trial_settings": {"end_behavior": {"missing_payment_method": "create_invoice"}}, "trial_start": null, "updated": 1697550676}, "emitted_at": 1707158969393} +{"stream":"subscription_schedule","data":{"id":"sub_sched_1O2Dg0EcXtiJtvvh7GtbtIhP","object":"subscription_schedule","application":null,"canceled_at":"1705329724","completed_at":null,"created":1697550676,"current_phase":null,"customer":"cus_NGoTFiJFVbSsvZ","default_settings":{"application_fee_percent":null,"automatic_tax":{"enabled":false, "liability": null},"billing_cycle_anchor":"automatic","billing_thresholds":null,"collection_method":"charge_automatically","default_payment_method":null,"default_source":null,"description":"Test Test","invoice_settings":"{'account_tax_ids': None, 'days_until_due': None, 'issuer': {'type': 'self'}}","on_behalf_of":null,"transfer_data":null},"end_behavior":"cancel","livemode":false,"metadata":{},"phases":[{"add_invoice_items":[],"application_fee_percent":null,"automatic_tax":{"enabled":true, "liability": {"type": "self"}},"billing_cycle_anchor":null,"billing_thresholds":null,"collection_method":"charge_automatically","coupon":null,"currency":"usd","default_payment_method":null,"default_tax_rates":[],"description":"Test Test","end_date":1705499476,"invoice_settings":"{'account_tax_ids': None, 'days_until_due': None, 'issuer': None}","items":[{"billing_thresholds":null,"metadata":{},"plan":"price_1MSHZoEcXtiJtvvh6O8TYD8T","price":"price_1MSHZoEcXtiJtvvh6O8TYD8T","quantity":1,"tax_rates":[]}],"metadata":{},"on_behalf_of":null,"proration_behavior":"create_prorations","start_date":1697550676,"transfer_data":null,"trial_end":null}],"released_at":null,"released_subscription":null,"renewal_interval":null,"status":"canceled","subscription":"sub_1O2Dg0EcXtiJtvvhz7Q4zS0n","test_clock":null,"updated":1697550676},"emitted_at":1705636378620} {"stream": "transfers", "data": {"id": "tr_1NH18zEcXtiJtvvhnd827cNO", "object": "transfer", "amount": 10000, "amount_reversed": 0, "balance_transaction": "txn_1NH190EcXtiJtvvhBO3PeR7p", "created": 1686301085, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NH18zEYmRTj5on1GkCCsqLK", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [], "has_more": false, "total_count": 0.0, "url": "/v1/transfers/tr_1NH18zEcXtiJtvvhnd827cNO/reversals"}, "reversed": false, "source_transaction": null, "source_type": "card", "transfer_group": null, "updated": 1686301085}, "emitted_at": 1697627313262} {"stream": "transfers", "data": {"id": "tr_1NGoaCEcXtiJtvvhjmHtOGOm", "object": "transfer", "amount": 100, "amount_reversed": 100, "balance_transaction": "txn_1NGoaDEcXtiJtvvhsZrNMsdJ", "created": 1686252800, "currency": "usd", "description": null, "destination": "acct_1Jx8unEYmRTj5on1", "destination_payment": "py_1NGoaCEYmRTj5on1LAlAIG3a", "livemode": false, "metadata": {}, "reversals": {"object": "list", "data": [{"id": "trr_1NGolCEcXtiJtvvhOYPck3CP", "object": "transfer_reversal", "amount": 100, "balance_transaction": "txn_1NGolCEcXtiJtvvhZRy4Kd5S", "created": 1686253482, "currency": "usd", "destination_payment_refund": "pyr_1NGolBEYmRTj5on1STal3rmp", "metadata": {}, "source_refund": null, "transfer": "tr_1NGoaCEcXtiJtvvhjmHtOGOm"}], "has_more": false, "total_count": 1.0, "url": "/v1/transfers/tr_1NGoaCEcXtiJtvvhjmHtOGOm/reversals"}, "reversed": true, "source_transaction": null, "source_type": "card", "transfer_group": "ORDER10", "updated": 1686252800}, "emitted_at": 1697627313264} {"stream": "refunds", "data": {"id": "re_3MVuZyEcXtiJtvvh0A6rSbeJ", "object": "refund", "amount": 200000, "balance_transaction": "txn_3MVuZyEcXtiJtvvh0v0QyAMx", "charge": "ch_3MVuZyEcXtiJtvvh0tiVC7DI", "created": 1675074488, "currency": "usd", "destination_details": {"card": {"reference": "5871771120000631", "reference_status": "available", "reference_type": "acquirer_reference_number", "type": "refund"}, "type": "card"}, "metadata": {}, "payment_intent": "pi_3MVuZyEcXtiJtvvh07Ehi4cx", "reason": "fraudulent", "receipt_number": "3278-5368", "source_transfer_reversal": null, "status": "succeeded", "transfer_reversal": null}, "emitted_at": 1701882752716} diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 5353dde9a5a0..ea46aef64023 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 5.2.1 + dockerImageTag: 5.2.2 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/setup.py b/airbyte-integrations/connectors/source-stripe/setup.py index e0203e616ad7..25a139e0b1ba 100644 --- a/airbyte-integrations/connectors/source-stripe/setup.py +++ b/airbyte-integrations/connectors/source-stripe/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk==0.59.1", "stripe==2.56.0", "pendulum==2.1.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.60.1", "stripe==2.56.0", "pendulum==2.1.2"] # we set `requests-mock~=1.11.0` to ensure concurrency is supported TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock~=1.11.0", "requests_mock~=1.8", "freezegun==1.2.2"] diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/run.py b/airbyte-integrations/connectors/source-stripe/source_stripe/run.py index b72025a2c726..b5a321986359 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/run.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/run.py @@ -16,10 +16,12 @@ def _get_source(args: List[str]): catalog_path = AirbyteEntrypoint.extract_catalog(args) config_path = AirbyteEntrypoint.extract_config(args) + state_path = AirbyteEntrypoint.extract_state(args) try: return SourceStripe( SourceStripe.read_catalog(catalog_path) if catalog_path else None, SourceStripe.read_config(config_path) if config_path else None, + SourceStripe.read_state(state_path) if state_path else None, ) except Exception as error: print( diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index ebb5dd7a1a00..49479c5cc78e 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -14,11 +14,14 @@ from airbyte_cdk.models import ConfiguredAirbyteCatalog, FailureType from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter +from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message.repository import InMemoryMessageRepository +from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade -from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor +from airbyte_cdk.sources.streams.concurrent.cursor import Comparable, ConcurrentCursor, CursorField, NoopCursor +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import EpochValueConcurrentStreamStateConverter from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator from airbyte_cdk.utils.traced_exception import AirbyteTracedException from airbyte_protocol.models import SyncMode @@ -42,6 +45,10 @@ _MAX_CONCURRENCY = 20 _DEFAULT_CONCURRENCY = 10 _CACHE_DISABLED = os.environ.get("CACHE_DISABLED") +_REFUND_STREAM_NAME = "refunds" +_INCREMENTAL_CONCURRENCY_EXCLUSION = { + _REFUND_STREAM_NAME, # excluded because of the upcoming changes in terms of cursor https://github.com/airbytehq/airbyte/issues/34332 +} USE_CACHE = not _CACHE_DISABLED STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_" @@ -49,8 +56,12 @@ class SourceStripe(ConcurrentSourceAdapter): message_repository = InMemoryMessageRepository(entrypoint_logger.level) + _SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION = { + Events: ("created[gte]", "created[lte]"), + CreatedCursorIncrementalStripeStream: ("created[gte]", "created[lte]"), + } - def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], **kwargs): + def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs): if config: concurrency_level = min(config.get("num_workers", _DEFAULT_CONCURRENCY), _MAX_CONCURRENCY) else: @@ -60,6 +71,7 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional concurrency_level, concurrency_level // 2, logger, self._slice_logger, self.message_repository ) super().__init__(concurrent_source) + self._state = state if catalog: self._streams_configured_as_full_refresh = { configured_stream.stream.name @@ -71,9 +83,8 @@ def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional self._streams_configured_as_full_refresh = set() @staticmethod - def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping: - start_date, lookback_window_days, slice_range = ( - config.get("start_date"), + def validate_and_fill_with_defaults(config: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + lookback_window_days, slice_range = ( config.get("lookback_window_days"), config.get("slice_range"), ) @@ -86,9 +97,9 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping: internal_message=message, failure_type=FailureType.config_error, ) - if start_date: - # verifies the start_date is parseable - SourceStripe._start_date_to_timestamp(start_date) + + # verifies the start_date in the config is valid + SourceStripe._start_date_to_timestamp(config) if slice_range is None: config["slice_range"] = 365 elif not isinstance(slice_range, int) or slice_range < 1: @@ -100,7 +111,7 @@ def validate_and_fill_with_defaults(config: MutableMapping) -> MutableMapping: ) return config - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + def check_connection(self, logger: AirbyteLogger, config: MutableMapping[str, Any]) -> Tuple[bool, Any]: self.validate_and_fill_with_defaults(config) stripe.api_key = config["client_secret"] try: @@ -167,14 +178,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: return HttpAPIBudget(policies=policies) - def streams(self, config: Mapping[str, Any]) -> List[Stream]: + def streams(self, config: MutableMapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) - if "start_date" in config: - start_timestamp = self._start_date_to_timestamp(config["start_date"]) - else: - start_timestamp = pendulum.datetime(2017, 1, 25).int_timestamp + start_timestamp = self._start_date_to_timestamp(config) args = { "authenticator": authenticator, "account_id": config["account_id"], @@ -289,7 +297,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # The Refunds stream does not utilize the Events API as it created issues with data loss during the incremental syncs. # Therefore, we're using the regular API with the `created` cursor field. A bug has been filed with Stripe. # See more at https://github.com/airbytehq/oncall/issues/3090, https://github.com/airbytehq/oncall/issues/3428 - CreatedCursorIncrementalStripeStream(name="refunds", path="refunds", **incremental_args), + CreatedCursorIncrementalStripeStream(name=_REFUND_STREAM_NAME, path="refunds", **incremental_args), UpdatedCursorIncrementalStripeStream( name="payment_methods", path="payment_methods", @@ -511,21 +519,44 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), ] - return [ - StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor()) - if stream.name in self._streams_configured_as_full_refresh - else stream - for stream in streams - ] + state_manager = ConnectorStateManager(stream_instance_map={s.name: s for s in streams}, state=self._state) + return [self._to_concurrent(stream, self._start_date_to_timestamp(config), state_manager) for stream in streams] + + def _to_concurrent(self, stream: Stream, fallback_start, state_manager: ConnectorStateManager) -> Stream: + if stream.name in self._streams_configured_as_full_refresh: + return StreamFacade.create_from_stream(stream, self, entrypoint_logger, self._create_empty_state(), NoopCursor()) + + state = state_manager.get_stream_state(stream.name, stream.namespace) + slice_boundary_fields = self._SLICE_BOUNDARY_FIELDS_BY_IMPLEMENTATION.get(type(stream)) + if slice_boundary_fields and stream.name not in _INCREMENTAL_CONCURRENCY_EXCLUSION: + cursor_field = CursorField(stream.cursor_field) if isinstance(stream.cursor_field, str) else CursorField(stream.cursor_field[0]) + converter = EpochValueConcurrentStreamStateConverter() + cursor = ConcurrentCursor( + stream.name, + stream.namespace, + state_manager.get_stream_state(stream.name, stream.namespace), + self.message_repository, + state_manager, + converter, + cursor_field, + slice_boundary_fields, + fallback_start, + ) + return StreamFacade.create_from_stream(stream, self, entrypoint_logger, state, cursor) + + return stream def _create_empty_state(self) -> MutableMapping[str, Any]: - # The state is known to be empty because concurrent CDK is currently only used for full refresh return {} @staticmethod - def _start_date_to_timestamp(start_date: str) -> int: + def _start_date_to_timestamp(config: Mapping[str, Any]) -> int: + if "start_date" not in config: + return pendulum.datetime(2017, 1, 25).int_timestamp # type: ignore # pendulum not typed + + start_date = config["start_date"] try: - return pendulum.parse(start_date).int_timestamp + return pendulum.parse(start_date).int_timestamp # type: ignore # pendulum not typed except pendulum.parsing.exceptions.ParserError as e: message = f"Invalid start date {start_date}. Please use YYYY-MM-DDTHH:MM:SSZ format." raise AirbyteTracedException( diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py index 8e81ce968306..0463e204fdb0 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/conftest.py @@ -5,7 +5,9 @@ import os import pytest +from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator +from airbyte_cdk.test.state_builder import StateBuilder os.environ["CACHE_DISABLED"] = "true" os.environ["DEPLOYMENT_MODE"] = "testing" @@ -40,10 +42,14 @@ def stream_by_name(config): from source_stripe.source import SourceStripe def mocker(stream_name, source_config=config): - source = SourceStripe(None, source_config) + source = SourceStripe(None, source_config, StateBuilder().build()) streams = source.streams(source_config) for stream in streams: if stream.name == stream_name: + if isinstance(stream, StreamFacade): + # to avoid breaking changes for tests, we will return the legacy test. Tests that would be affected by not having this + # would probably need to be moved to integration tests or unit tests + return stream._legacy_stream return stream return mocker diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py index 9619010e4c24..b7028590f446 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -19,7 +19,7 @@ find_template, ) from airbyte_cdk.test.state_builder import StateBuilder -from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from airbyte_protocol.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType, SyncMode from integration.config import ConfigBuilder from integration.pagination import StripePaginationStrategy from integration.request_builder import StripeRequestBuilder @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -110,12 +110,12 @@ def _given_events_availability_check(http_mocker: HttpMocker) -> None: def _read( config_builder: ConfigBuilder, sync_mode: SyncMode, - state: Optional[Dict[str, Any]] = None, + state: Optional[List[AirbyteStateMessage]] = None, expecting_exception: bool = False ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) @@ -372,5 +372,5 @@ def test_given_state_earlier_than_30_days_when_read_then_query_events_using_type def _an_application_fee_event(self) -> RecordBuilder: return _an_event().with_field(_DATA_FIELD, _an_application_fee().build()) - def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + def _read(self, config: ConfigBuilder, state: Optional[List[AirbyteStateMessage]], expecting_exception: bool = False) -> EntrypointOutput: return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py index f01c6da9689e..bfde5e409d11 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py @@ -7,6 +7,7 @@ from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -62,8 +63,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -143,7 +144,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) def _assert_not_available(output: EntrypointOutput) -> None: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py index ab140559840f..90e61aad3166 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_authorizations.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py index c0995a6c11b3..db5c32d5d9a3 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py @@ -7,6 +7,7 @@ from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -66,8 +67,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -146,7 +147,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) def _assert_not_available(output: EntrypointOutput) -> None: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py index 573ba8824a9e..413c1e15d2a9 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_cards.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py index f4c2165c582a..7f8a0800b97e 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py index c93bc691a96e..14942b03c54c 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_events.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpResponse @@ -48,8 +48,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _a_record() -> RecordBuilder: @@ -73,7 +73,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) @@ -230,7 +230,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_cursor_fiel _a_response().with_record(_a_record().with_cursor(cursor_value)).build(), ) output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) - assert output.most_recent_state == {"events": {"created": cursor_value}} + assert output.most_recent_state == {"events": {"created": int(_NOW.timestamp())}} @HttpMocker() def test_given_state_when_read_then_use_state_for_query_params(self, http_mocker: HttpMocker) -> None: @@ -254,6 +254,10 @@ def test_given_state_when_read_then_use_state_for_query_params(self, http_mocker @HttpMocker() def test_given_state_more_recent_than_cursor_when_read_then_return_state_based_on_cursor_field(self, http_mocker: HttpMocker) -> None: + """ + We do not see exactly how this case can happen in a real life scenario but it is used to see if at least one state message + would be populated given that no partitions were created. + """ cursor_value = int(_A_START_DATE.timestamp()) + 1 more_recent_than_record_cursor = int(_NOW.timestamp()) - 1 http_mocker.get( diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py index f1c5804173c1..cbd08bce1a5c 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -56,8 +56,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py index 705faf8530a0..3635f7bd6d6d 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -56,8 +56,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -120,7 +120,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py index d8e9f1450c66..10785b6c4777 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -60,8 +60,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -120,7 +120,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_persons.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_persons.py index db000211be08..7df5857b0000 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_persons.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_persons.py @@ -34,6 +34,7 @@ "client_secret": _CLIENT_SECRET, "account_id": _ACCOUNT_ID, } +_NO_STATE = StateBuilder().build() _AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) @@ -116,7 +117,7 @@ def test_full_refresh(self, http_mocker): _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -149,7 +150,7 @@ def test_parent_pagination(self, http_mocker): _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -183,7 +184,7 @@ def test_substream_pagination(self, http_mocker): _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -196,7 +197,7 @@ def test_accounts_400_error(self, http_mocker: HttpMocker): a_response_with_status(400), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) error_log_messages = [message for message in actual_messages.logs if message.log.level == Level.ERROR] @@ -218,7 +219,7 @@ def test_persons_400_error(self, http_mocker: HttpMocker): a_response_with_status(400), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) error_log_messages = [message for message in actual_messages.logs if message.log.level == Level.ERROR] @@ -234,7 +235,7 @@ def test_accounts_401_error(self, http_mocker: HttpMocker): a_response_with_status(401), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog(), expecting_exception=True) assert actual_messages.errors[-1].trace.error.failure_type == FailureType.system_error @@ -252,7 +253,7 @@ def test_persons_401_error(self, http_mocker: HttpMocker): a_response_with_status(401), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog(), expecting_exception=True) assert actual_messages.errors[-1].trace.error.failure_type == FailureType.system_error @@ -270,7 +271,7 @@ def test_persons_403_error(self, http_mocker: HttpMocker): a_response_with_status(403), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog(), expecting_exception=True) error_log_messages = [message for message in actual_messages.logs if message.log.level == Level.ERROR] @@ -305,12 +306,13 @@ def test_incremental_with_recent_state(self, http_mocker: HttpMocker): _create_response().with_record(record=_create_persons_event_record(event_type="person.created")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental)) + state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build() + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), state=state) actual_messages = read( source, config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), - state=StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + state=state, ) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -343,12 +345,13 @@ def test_incremental_with_deleted_event(self, http_mocker: HttpMocker): _create_response().with_record(record=_create_persons_event_record(event_type="person.deleted")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental)) + state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build() + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), state=state) actual_messages = read( source, config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), - state=StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + state=state, ) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -378,12 +381,13 @@ def test_incremental_with_newer_start_date(self, http_mocker): _create_response().with_record(record=_create_persons_event_record(event_type="person.created")).build(), ) - source = SourceStripe(config=config, catalog=_create_catalog(sync_mode=SyncMode.incremental)) + state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build() + source = SourceStripe(config=config, catalog=_create_catalog(sync_mode=SyncMode.incremental), state=state) actual_messages = read( source, config=config, catalog=_create_catalog(sync_mode=SyncMode.incremental), - state=StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + state=state, ) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -411,7 +415,7 @@ def test_rate_limited_parent_stream_accounts(self, http_mocker: HttpMocker) -> N _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -438,7 +442,7 @@ def test_rate_limited_substream_persons(self, http_mocker: HttpMocker) -> None: _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -476,12 +480,13 @@ def test_rate_limited_incremental_events(self, http_mocker: HttpMocker) -> None: ] ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental)) + state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build() + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), state=state) actual_messages = read( source, config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), - state=StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + state=state, ) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -510,7 +515,7 @@ def test_rate_limit_max_attempts_exceeded(self, http_mocker: HttpMocker) -> None _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert len(actual_messages.errors) == 1 @@ -544,12 +549,13 @@ def test_incremental_rate_limit_max_attempts_exceeded(self, http_mocker: HttpMoc a_response_with_status(429), # Returns 429 on all subsequent requests to test the maximum number of retries ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental)) + state = StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build() + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), state=state) actual_messages = read( source, config=_CONFIG, catalog=_create_catalog(sync_mode=SyncMode.incremental), - state=StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + state=state, ) assert len(actual_messages.errors) == 1 @@ -575,7 +581,7 @@ def test_server_error_parent_stream_accounts(self, http_mocker: HttpMocker) -> N _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -602,7 +608,7 @@ def test_server_error_substream_persons(self, http_mocker: HttpMocker) -> None: _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert emits_successful_sync_status_messages(actual_messages.get_stream_statuses(_STREAM_NAME)) @@ -630,7 +636,7 @@ def test_server_error_max_attempts_exceeded(self, http_mocker: HttpMocker) -> No _create_response().with_record(record=_create_record("events")).with_record(record=_create_record("events")).build(), ) - source = SourceStripe(config=_CONFIG, catalog=_create_catalog()) + source = SourceStripe(config=_CONFIG, catalog=_create_catalog(), state=_NO_STATE) actual_messages = read(source, config=_CONFIG, catalog=_create_catalog()) assert len(actual_messages.errors) == 1 diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py index 45ee0219f8da..d454faec79e8 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_reviews.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py index 8c4db0697223..f0a04e093760 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_transactions.py @@ -1,11 +1,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. -import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional from unittest import TestCase import freezegun +from airbyte_cdk.sources.source import TState from airbyte_cdk.test.catalog_builder import CatalogBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -55,8 +55,8 @@ def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() -def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: - return SourceStripe(catalog, config) +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe: + return SourceStripe(catalog, config, state) def _an_event() -> RecordBuilder: @@ -115,7 +115,7 @@ def _read( ) -> EntrypointOutput: catalog = _catalog(sync_mode) config = config_builder.build() - return read(_source(catalog, config), config, catalog, state, expecting_exception) + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) @freezegun.freeze_time(_NOW.isoformat()) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index 2f2a1c0acd1e..5a2f6e06a719 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -12,13 +12,14 @@ from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade -from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.test.state_builder import StateBuilder from airbyte_cdk.utils import AirbyteTracedException from source_stripe import SourceStripe logger = logging.getLogger("airbyte") _ANY_CATALOG = ConfiguredAirbyteCatalog.parse_obj({"streams": []}) _ANY_CONFIG = {} +_NO_STATE = StateBuilder().build() class CatalogBuilder: @@ -51,11 +52,11 @@ def _a_valid_config(): @patch.object(source_stripe.source, "stripe") def test_source_check_connection_ok(mocked_client, config): - assert SourceStripe(_ANY_CATALOG, _ANY_CONFIG).check_connection(logger, config=config) == (True, None) + assert SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=config) == (True, None) def test_streams_are_unique(config): - stream_names = [s.name for s in SourceStripe(_ANY_CATALOG, _ANY_CONFIG).streams(config=config)] + stream_names = [s.name for s in SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).streams(config=config)] assert len(stream_names) == len(set(stream_names)) == 46 @@ -72,7 +73,7 @@ def test_streams_are_unique(config): def test_config_validation(mocked_client, input_config, expected_error_msg): context = pytest.raises(AirbyteTracedException, match=expected_error_msg) if expected_error_msg else does_not_raise() with context: - SourceStripe(_ANY_CATALOG, _ANY_CONFIG).check_connection(logger, config=input_config) + SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=input_config) @pytest.mark.parametrize( @@ -85,7 +86,7 @@ def test_config_validation(mocked_client, input_config, expected_error_msg): @patch.object(source_stripe.source.stripe, "Account") def test_given_stripe_error_when_check_connection_then_connection_not_available(mocked_client, exception): mocked_client.retrieve.side_effect = exception - is_available, _ = SourceStripe(_ANY_CATALOG, _ANY_CONFIG).check_connection(logger, config=_a_valid_config()) + is_available, _ = SourceStripe(_ANY_CATALOG, _ANY_CONFIG, _NO_STATE).check_connection(logger, config=_a_valid_config()) assert not is_available @@ -93,9 +94,12 @@ def test_when_streams_return_full_refresh_as_concurrent(): streams = SourceStripe( CatalogBuilder().with_stream("bank_accounts", SyncMode.full_refresh).with_stream("customers", SyncMode.incremental).build(), _a_valid_config(), + _NO_STATE, ).streams(_a_valid_config()) - assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 1 + # bank_accounts (as it is defined as full_refresh) + # balance_transactions, events, files, file_links and shipping_rates (as it is always concurrent now) + assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 6 @pytest.mark.parametrize( @@ -114,7 +118,7 @@ def test_call_budget_creation(mocker, input_config, default_call_limit): policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy") matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher") - source = SourceStripe(catalog=None, config=input_config) + source = SourceStripe(catalog=None, config=input_config, state=_NO_STATE) source.get_api_call_budget(input_config) @@ -137,7 +141,7 @@ def test_call_budget_passed_to_every_stream(mocker): """Test that each stream has call_budget passed and creates a proper session""" prod_config = {"account_id": 1, "client_secret": "secret"} - source = SourceStripe(catalog=None, config=prod_config) + source = SourceStripe(catalog=None, config=prod_config, state=_NO_STATE) get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget") streams = source.streams(prod_config) @@ -146,7 +150,8 @@ def test_call_budget_passed_to_every_stream(mocker): get_api_call_budget_mock.assert_called_once() for stream in streams: - assert isinstance(stream, HttpStream) + if isinstance(stream, StreamFacade): + stream = stream._legacy_stream session = stream.request_session() assert isinstance(session, (CachedLimiterSession, LimiterSession)) assert session._api_budget == get_api_call_budget_mock.return_value diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py index 55da589609e7..3e6ee9b1be05 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_streams.py @@ -334,7 +334,7 @@ def test_created_cursor_incremental_stream( for url, response in requests_mock_map.items(): requests_mock.get(url, response) - slices = list(stream.stream_slices(sync_mode, stream_state=state)) + slices = list(stream.stream_slices(sync_mode=sync_mode, stream_state=state)) assert slices == expected_slices records = read_from_stream(stream, sync_mode, state) assert records == expected_records diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 557100dca099..853574454104 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -223,6 +223,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 5.2.2 | 2024-01-31 | [34619](https://github.com/airbytehq/airbyte/pull/34619) | Events stream concurrent on incremental syncs | | 5.2.1 | 2024-01-18 | [34495](https://github.com/airbytehq/airbyte/pull/34495) | Fix deadlock issue | 5.2.0 | 2024-01-18 | [34347](https://github.com/airbytehq/airbyte/pull//34347) | Add new fields invoices and subscription streams. Upgrade the CDK for better memory usage. | | 5.1.3 | 2023-12-18 | [33306](https://github.com/airbytehq/airbyte/pull/33306/) | Adding integration tests |