diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 0c34e7b7c27d..1aaaa9ee97c4 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 4.5.3 + dockerImageTag: 4.5.4 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/airbyte-integrations/connectors/source-stripe/setup.py b/airbyte-integrations/connectors/source-stripe/setup.py index 8ce3d6936bdd..55bb256393b6 100644 --- a/airbyte-integrations/connectors/source-stripe/setup.py +++ b/airbyte-integrations/connectors/source-stripe/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk==0.52.8", "stripe==2.56.0", "pendulum==2.1.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.6", "stripe==2.56.0", "pendulum==2.1.2"] TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index e72e4dd5398b..5683c875497e 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -2,7 +2,9 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging import os +from datetime import timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -13,6 +15,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -33,9 +36,12 @@ UpdatedCursorIncrementalStripeStream, ) -_MAX_CONCURRENCY = 3 +logger = logging.getLogger("airbyte") + +_MAX_CONCURRENCY = 20 _CACHE_DISABLED = os.environ.get("CACHE_DISABLED") USE_CACHE = not _CACHE_DISABLED +STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_" class SourceStripe(AbstractSource): @@ -114,6 +120,52 @@ def customers(**args): **args, ) + @staticmethod + def is_test_account(config: Mapping[str, Any]) -> bool: + """Check if configuration uses Stripe test account (https://stripe.com/docs/keys#obtain-api-keys) + + :param config: + :return: True if configured to use a test account, False - otherwise + """ + + return str(config["client_secret"]).startswith(STRIPE_TEST_ACCOUNT_PREFIX) + + def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: + """Get API call budget which connector is allowed to use. + + :param config: + :return: + """ + + max_call_rate = 25 if self.is_test_account(config) else 100 + if config.get("call_rate_limit"): + call_limit = config["call_rate_limit"] + if call_limit > max_call_rate: + logger.warning( + "call_rate_limit is larger than maximum allowed %s, fallback to default %s.", + max_call_rate, + max_call_rate, + ) + call_limit = max_call_rate + else: + call_limit = max_call_rate + + policies = [ + MovingWindowCallRatePolicy( + rates=[Rate(limit=20, interval=timedelta(seconds=1))], + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), + ], + ), + MovingWindowCallRatePolicy( + rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], + matchers=[], + ), + ] + + return HttpAPIBudget(policies=policies) + def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) @@ -122,6 +174,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: "account_id": config["account_id"], "start_date": config["start_date"], "slice_range": config["slice_range"], + "api_budget": self.get_api_call_budget(config), } incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]} subscriptions = IncrementalStripeStream( @@ -441,9 +494,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), ] - # We cap the number of workers to avoid hitting the Stripe rate limit - # The limit can be removed or increased once we have proper rate limiting - concurrency_level = min(config.get("num_workers", 2), _MAX_CONCURRENCY) + concurrency_level = min(config.get("num_workers", 10), _MAX_CONCURRENCY) streams[0].logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}") return [ diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index f65886c41298..5a31b610cd27 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -61,10 +61,18 @@ connectionSpecification: type: integer title: Number of concurrent workers minimum: 1 - maximum: 3 - default: 2 + maximum: 20 + default: 10 examples: [1, 2, 3] description: >- - The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be. - Be careful as rate limiting is not implemented. + The number of worker thread to use for the sync. + The performance upper boundary depends on call_rate_limit setting and type of account. order: 5 + call_rate_limit: + type: integer + title: Max number of API calls per second + examples: [25, 100] + description: >- + The number of API calls per second that you allow connector to make. This value can not be bigger than real + API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100 + calls per second for test and production tokens respectively. diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index 61b226b5da83..476dbd38a689 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import datetime import logging from contextlib import nullcontext as does_not_raise from unittest.mock import patch @@ -10,7 +10,9 @@ import source_stripe import stripe from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode +from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.utils import AirbyteTracedException from source_stripe import SourceStripe @@ -92,3 +94,57 @@ def test_when_streams_return_full_refresh_as_concurrent(): ).streams(_a_valid_config()) assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 1 + + +@pytest.mark.parametrize( + "input_config, default_call_limit", + ( + ({"account_id": 1, "client_secret": "secret"}, 100), + ({"account_id": 1, "client_secret": "secret", "call_rate_limit": 10}, 10), + ({"account_id": 1, "client_secret": "secret", "call_rate_limit": 110}, 100), + ({"account_id": 1, "client_secret": "sk_test_some_secret"}, 25), + ({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 10}, 10), + ({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 30}, 25), + ), +) +def test_call_budget_creation(mocker, input_config, default_call_limit): + """Test that call_budget was created with specific config i.e., that first policy has specific matchers.""" + + policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy") + matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher") + source = SourceStripe(catalog=None) + + source.get_api_call_budget(input_config) + + policy_mock.assert_has_calls( + calls=[ + mocker.call(matchers=[mocker.ANY, mocker.ANY], rates=[Rate(limit=20, interval=datetime.timedelta(seconds=1))]), + mocker.call(matchers=[], rates=[Rate(limit=default_call_limit, interval=datetime.timedelta(seconds=1))]), + ], + ) + + matcher_mock.assert_has_calls( + calls=[ + mocker.call(url="https://api.stripe.com/v1/files"), + mocker.call(url="https://api.stripe.com/v1/file_links"), + ] + ) + + +def test_call_budget_passed_to_every_stream(mocker): + """Test that each stream has call_budget passed and creates a proper session""" + + prod_config = {"account_id": 1, "client_secret": "secret"} + source = SourceStripe(catalog=None) + get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget") + + streams = source.streams(prod_config) + + assert streams + get_api_call_budget_mock.assert_called_once() + + for stream in streams: + assert isinstance(stream, HttpStream) + session = stream.request_session() + assert isinstance(session, (CachedLimiterSession, LimiterSession)) + assert session._api_budget == get_api_call_budget_mock.return_value diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 06bea065cf1f..bde9f9d6d0e6 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -216,6 +216,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.5.4 | 2023-11-16 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting | | 4.5.3 | 2023-11-14 | [32473](https://github.com/airbytehq/airbyte/pull/32473/) | Have all full_refresh stream syncs be concurrent | | 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | @@ -299,4 +300,4 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts | | 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors | - \ No newline at end of file +