From cbe519b26c434f1428d33bb459d78d192d8b599c Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 8 Nov 2023 02:32:33 +0200 Subject: [PATCH 01/15] add call_rate_limit parameter --- .../source-stripe/source_stripe/source.py | 44 ++++++++++++++++++- .../source-stripe/source_stripe/spec.yaml | 10 ++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index d8193c1796f9..51c8696cf87f 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -1,7 +1,8 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging +from datetime import datetime, timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -12,6 +13,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.call_rate import APIBudget, FixedWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -32,7 +34,10 @@ UpdatedCursorIncrementalStripeStream, ) +logger = logging.getLogger("airbyte") + _MAX_CONCURRENCY = 3 +STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_" class SourceStripe(AbstractSource): @@ -97,6 +102,42 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> return False, str(e) return True, None + @staticmethod + def is_test_account(config: Mapping[str, Any]) -> bool: + """Check if configuration uses Stripe test account (https://stripe.com/docs/keys#obtain-api-keys) + + :param config: + :return: True if configured to use a test account, False - otherwise + """ + + return str(config["client_secret"]).startswith(STRIPE_TEST_ACCOUNT_PREFIX) + + def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: + """Get API call budget which connector is allowed to use. + + :param config: + :return: + """ + + max_call_rate = 25 if self.is_test_account(config) else 100 + if config.get("call_rate_limit"): + call_limit = config["call_rate_limit"] + if call_limit > max_call_rate: + logger.warning( + "call_rate_limit is larger than maximum allowed %s, fallback to default %s.", + max_call_rate, + max_call_rate, + ) + else: + call_limit = max_call_rate + + call_budget = APIBudget( + policies=[ + FixedWindowCallRatePolicy(next_reset_ts=datetime.now(), period=timedelta(seconds=1), call_limit=call_limit, matchers=[]), + ] + ) + return call_budget + def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) authenticator = TokenAuthenticator(config["client_secret"]) @@ -105,6 +146,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: "account_id": config["account_id"], "start_date": config["start_date"], "slice_range": config["slice_range"], + "call_budget": self.get_api_call_budget(config), } incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]} subscriptions = IncrementalStripeStream( diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index f65886c41298..f1d95b1e8e57 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -66,5 +66,13 @@ connectionSpecification: examples: [1, 2, 3] description: >- The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be. - Be careful as rate limiting is not implemented. + The performance upper boundary depends on call_rate_limit setting and type of account. order: 5 + call_rate_limit: + type: integer + title: Max number of API calls per second + examples: [25, 100] + description: >- + The number of API calls per second that you allow connector to make. This value can not be bigger than real + API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100 + calls per second for test and production tokens respectively. From 8be446249231fdbe4f474071dc0ae934e9c56cd2 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 8 Nov 2023 02:48:35 +0200 Subject: [PATCH 02/15] customize call rate limit for files API --- .../source-stripe/source_stripe/source.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 51c8696cf87f..1a56c0c38cea 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -13,7 +13,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.call_rate import APIBudget, FixedWindowCallRatePolicy +from airbyte_cdk.sources.streams.call_rate import APIBudget, FixedWindowCallRatePolicy, HttpRequestMatcher from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -133,7 +133,18 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: call_budget = APIBudget( policies=[ - FixedWindowCallRatePolicy(next_reset_ts=datetime.now(), period=timedelta(seconds=1), call_limit=call_limit, matchers=[]), + FixedWindowCallRatePolicy( + next_reset_ts=datetime.now(), + period=timedelta(seconds=1), + call_limit=20, + matchers=[HttpRequestMatcher(url="https://api.stripe.com/v1/files")], + ), + FixedWindowCallRatePolicy( + next_reset_ts=datetime.now(), + period=timedelta(seconds=1), + call_limit=call_limit, + matchers=[], + ), ] ) return call_budget From c39d5ef386c0d5698117081893229b09c63bad86 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 8 Nov 2023 17:28:24 +0200 Subject: [PATCH 03/15] add file_links call rate checks --- airbyte-integrations/connectors/source-stripe/setup.py | 4 ++-- .../connectors/source-stripe/source_stripe/source.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/setup.py b/airbyte-integrations/connectors/source-stripe/setup.py index 8ce3d6936bdd..b9833f37f709 100644 --- a/airbyte-integrations/connectors/source-stripe/setup.py +++ b/airbyte-integrations/connectors/source-stripe/setup.py @@ -5,9 +5,9 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk==0.52.8", "stripe==2.56.0", "pendulum==2.1.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.2", "stripe==2.56.0", "pendulum==2.1.2"] -TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] +TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1.0", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] setup( name="source_stripe", diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 1a56c0c38cea..f1dd11507648 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -137,7 +137,10 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: next_reset_ts=datetime.now(), period=timedelta(seconds=1), call_limit=20, - matchers=[HttpRequestMatcher(url="https://api.stripe.com/v1/files")], + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links") + ], ), FixedWindowCallRatePolicy( next_reset_ts=datetime.now(), From c503ed943b04cf11e4500c935e3a959e50f8f444 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 14 Nov 2023 02:48:18 +0200 Subject: [PATCH 04/15] unit tests --- .../connectors/source-stripe/setup.py | 2 +- .../source-stripe/source_stripe/source.py | 11 +-- .../source-stripe/unit_tests/test_source.py | 70 ++++++++++++++++++- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/setup.py b/airbyte-integrations/connectors/source-stripe/setup.py index b9833f37f709..ea2b6ece5e5c 100644 --- a/airbyte-integrations/connectors/source-stripe/setup.py +++ b/airbyte-integrations/connectors/source-stripe/setup.py @@ -7,7 +7,7 @@ MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.2", "stripe==2.56.0", "pendulum==2.1.2"] -TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1.0", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] +TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] setup( name="source_stripe", diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index f1dd11507648..12ff710c5967 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -13,7 +13,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.call_rate import APIBudget, FixedWindowCallRatePolicy, HttpRequestMatcher +from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, HttpRequestMatcher from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -112,7 +112,7 @@ def is_test_account(config: Mapping[str, Any]) -> bool: return str(config["client_secret"]).startswith(STRIPE_TEST_ACCOUNT_PREFIX) - def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: + def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: """Get API call budget which connector is allowed to use. :param config: @@ -128,10 +128,11 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: max_call_rate, max_call_rate, ) + call_limit = max_call_rate else: call_limit = max_call_rate - call_budget = APIBudget( + call_budget = HttpAPIBudget( policies=[ FixedWindowCallRatePolicy( next_reset_ts=datetime.now(), @@ -139,7 +140,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> APIBudget: call_limit=20, matchers=[ HttpRequestMatcher(url="https://api.stripe.com/v1/files"), - HttpRequestMatcher(url="https://api.stripe.com/v1/file_links") + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), ], ), FixedWindowCallRatePolicy( @@ -160,7 +161,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: "account_id": config["account_id"], "start_date": config["start_date"], "slice_range": config["slice_range"], - "call_budget": self.get_api_call_budget(config), + "api_budget": self.get_api_call_budget(config), } incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]} subscriptions = IncrementalStripeStream( diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index 13d7e15f9fc4..c39b56f8ddcc 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -1,14 +1,17 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import datetime import logging from contextlib import nullcontext as does_not_raise from unittest.mock import patch +import freezegun import pytest import source_stripe import stripe +from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.utils import AirbyteTracedException from source_stripe import SourceStripe @@ -57,3 +60,68 @@ def test_given_stripe_error_when_check_connection_then_connection_not_available( mocked_client.retrieve.side_effect = exception is_available, _ = SourceStripe().check_connection(logger, config=_a_valid_config()) assert not is_available + + +@pytest.mark.parametrize( + "input_config, default_call_limit", + ( + ({"account_id": 1, "client_secret": "secret"}, 100), + ({"account_id": 1, "client_secret": "secret", "call_rate_limit": 10}, 10), + ({"account_id": 1, "client_secret": "secret", "call_rate_limit": 110}, 100), + ({"account_id": 1, "client_secret": "sk_test_some_secret"}, 25), + ({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 10}, 10), + ({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 30}, 25), + ), +) +@freezegun.freeze_time("2021-01-01") +def test_call_budget_creation(mocker, input_config, default_call_limit): + """Test that call_budget was created with specific config i.e., that first policy has specific matchers.""" + + fixed_window_mock = mocker.patch("source_stripe.source.FixedWindowCallRatePolicy") + matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher") + source = SourceStripe() + + source.get_api_call_budget(input_config) + + fixed_window_mock.assert_has_calls( + calls=[ + mocker.call( + matchers=[mocker.ANY, mocker.ANY], + call_limit=20, + next_reset_ts=datetime.datetime.now(), + period=datetime.timedelta(seconds=1), + ), + mocker.call( + matchers=[], + call_limit=default_call_limit, + next_reset_ts=datetime.datetime.now(), + period=datetime.timedelta(seconds=1), + ), + ], + ) + + matcher_mock.assert_has_calls( + calls=[ + mocker.call(url="https://api.stripe.com/v1/files"), + mocker.call(url="https://api.stripe.com/v1/file_links"), + ] + ) + + +def test_call_budget_passed_to_every_stream(mocker): + """Test that each stream has call_budget passed and creates a proper session""" + + prod_config = {"account_id": 1, "client_secret": "secret"} + source = SourceStripe() + get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget") + + streams = source.streams(prod_config) + + assert streams + get_api_call_budget_mock.assert_called_once() + + for stream in streams: + assert isinstance(stream, HttpStream) + session = stream.request_session() + assert isinstance(session, (CachedLimiterSession, LimiterSession)) + assert session._api_budget == get_api_call_budget_mock.return_value From 4dfafc4d348dca7e68a1ddd8dbbddc7f13d83530 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 14 Nov 2023 03:54:13 +0200 Subject: [PATCH 05/15] change call rate policy to use MovingWindow it showed better performance and almost never hit the API limit in local testing --- .../source-stripe/source_stripe/source.py | 39 ++++++++----------- .../source-stripe/unit_tests/test_source.py | 22 +++-------- 2 files changed, 22 insertions(+), 39 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index afd99a932561..c5f24d752f61 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -4,7 +4,7 @@ import logging import os -from datetime import datetime, timedelta +from datetime import timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -15,7 +15,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, FixedWindowCallRatePolicy, HttpAPIBudget, HttpRequestMatcher +from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -147,26 +147,21 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: else: call_limit = max_call_rate - call_budget = HttpAPIBudget( - policies=[ - FixedWindowCallRatePolicy( - next_reset_ts=datetime.now(), - period=timedelta(seconds=1), - call_limit=20, - matchers=[ - HttpRequestMatcher(url="https://api.stripe.com/v1/files"), - HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), - ], - ), - FixedWindowCallRatePolicy( - next_reset_ts=datetime.now(), - period=timedelta(seconds=1), - call_limit=call_limit, - matchers=[], - ), - ] - ) - return call_budget + policies = [ + MovingWindowCallRatePolicy( + rates=[Rate(limit=20, interval=timedelta(seconds=1))], + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), + ], + ), + MovingWindowCallRatePolicy( + rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], + matchers=[], + ), + ] + + return HttpAPIBudget(policies=policies) def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index c39b56f8ddcc..ff70b2c1c572 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -6,11 +6,10 @@ from contextlib import nullcontext as does_not_raise from unittest.mock import patch -import freezegun import pytest import source_stripe import stripe -from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession +from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.utils import AirbyteTracedException from source_stripe import SourceStripe @@ -73,30 +72,19 @@ def test_given_stripe_error_when_check_connection_then_connection_not_available( ({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 30}, 25), ), ) -@freezegun.freeze_time("2021-01-01") def test_call_budget_creation(mocker, input_config, default_call_limit): """Test that call_budget was created with specific config i.e., that first policy has specific matchers.""" - fixed_window_mock = mocker.patch("source_stripe.source.FixedWindowCallRatePolicy") + policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy") matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher") source = SourceStripe() source.get_api_call_budget(input_config) - fixed_window_mock.assert_has_calls( + policy_mock.assert_has_calls( calls=[ - mocker.call( - matchers=[mocker.ANY, mocker.ANY], - call_limit=20, - next_reset_ts=datetime.datetime.now(), - period=datetime.timedelta(seconds=1), - ), - mocker.call( - matchers=[], - call_limit=default_call_limit, - next_reset_ts=datetime.datetime.now(), - period=datetime.timedelta(seconds=1), - ), + mocker.call(matchers=[mocker.ANY, mocker.ANY], rates=[Rate(limit=20, interval=datetime.timedelta(seconds=1))]), + mocker.call(matchers=[], rates=[Rate(limit=default_call_limit, interval=datetime.timedelta(seconds=1))]), ], ) From 830c89d2c4e76109a3baea7f7fe8dd350acfc8e6 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 14 Nov 2023 19:10:50 +0200 Subject: [PATCH 06/15] add call_ratee_policy parameter (for testing) --- .../source-stripe/source_stripe/source.py | 56 +++++++++++++------ .../source-stripe/source_stripe/spec.yaml | 6 ++ 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index c5f24d752f61..d052705eae8c 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -4,7 +4,7 @@ import logging import os -from datetime import timedelta +from datetime import timedelta, datetime from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -15,7 +15,8 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate +from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate, \ + FixedWindowCallRatePolicy from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator @@ -37,7 +38,7 @@ logger = logging.getLogger("airbyte") -_MAX_CONCURRENCY = 3 +_MAX_CONCURRENCY = 20 _CACHE_DISABLED = os.environ.get("CACHE_DISABLED") USE_CACHE = not _CACHE_DISABLED STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_" @@ -147,19 +148,40 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: else: call_limit = max_call_rate - policies = [ - MovingWindowCallRatePolicy( - rates=[Rate(limit=20, interval=timedelta(seconds=1))], - matchers=[ - HttpRequestMatcher(url="https://api.stripe.com/v1/files"), - HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), - ], - ), - MovingWindowCallRatePolicy( - rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], - matchers=[], - ), - ] + if config.get("call_rate_policy") == "fixed_window": + policies = [ + FixedWindowCallRatePolicy( + next_reset_ts=datetime.now(), + period=timedelta(seconds=1), + call_limit=20, + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), + ], + ), + FixedWindowCallRatePolicy( + next_reset_ts=datetime.now(), + period=timedelta(seconds=1), + call_limit=call_limit, + matchers=[], + ), + ] + elif config.get("call_rate_policy") == "moving_window": + policies = [ + MovingWindowCallRatePolicy( + rates=[Rate(limit=20, interval=timedelta(seconds=1))], + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), + ], + ), + MovingWindowCallRatePolicy( + rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], + matchers=[], + ), + ] + else: + policies = [] return HttpAPIBudget(policies=policies) @@ -493,7 +515,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: if self._use_concurrent_cdk: # We cap the number of workers to avoid hitting the Stripe rate limit # The limit can be removed or increased once we have proper rate limiting - concurrency_level = min(config.get("num_workers", 2), _MAX_CONCURRENCY) + concurrency_level = min(config.get("num_workers", 3), _MAX_CONCURRENCY) streams[0].logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}") # The state is known to be empty because concurrent CDK is currently only used for full refresh diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index f1d95b1e8e57..3630f50bf8f2 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -76,3 +76,9 @@ connectionSpecification: The number of API calls per second that you allow connector to make. This value can not be bigger than real API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100 calls per second for test and production tokens respectively. + call_rate_policy: + type: string + title: Call rate policy to use + examples: [fixed_window, moving_window, none] + description: >- + The call rate algorithm to use for tracking and limiting calls. From aa3d3ed3c28e5c91762e56cb9d6fea083e37c0c8 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 14 Nov 2023 19:22:17 +0200 Subject: [PATCH 07/15] increase maximum_attempts_to_acquire to never hit it --- .../connectors/source-stripe/source_stripe/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index d052705eae8c..74729f3d3833 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -183,7 +183,7 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: else: policies = [] - return HttpAPIBudget(policies=policies) + return HttpAPIBudget(policies=policies, maximum_attempts_to_acquire=10000) def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) From f85b6d780bb5b9ad5bb3e7ed9b5be161b39c497a Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 14 Nov 2023 22:35:02 +0200 Subject: [PATCH 08/15] update CDK version --- airbyte-integrations/connectors/source-stripe/setup.py | 2 +- .../connectors/source-stripe/source_stripe/spec.yaml | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/setup.py b/airbyte-integrations/connectors/source-stripe/setup.py index ea2b6ece5e5c..55bb256393b6 100644 --- a/airbyte-integrations/connectors/source-stripe/setup.py +++ b/airbyte-integrations/connectors/source-stripe/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.2", "stripe==2.56.0", "pendulum==2.1.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.6", "stripe==2.56.0", "pendulum==2.1.2"] TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"] diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index 3630f50bf8f2..41781fdb39a7 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -62,7 +62,7 @@ connectionSpecification: title: Number of concurrent workers minimum: 1 maximum: 3 - default: 2 + default: 20 examples: [1, 2, 3] description: >- The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be. @@ -76,9 +76,3 @@ connectionSpecification: The number of API calls per second that you allow connector to make. This value can not be bigger than real API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100 calls per second for test and production tokens respectively. - call_rate_policy: - type: string - title: Call rate policy to use - examples: [fixed_window, moving_window, none] - description: >- - The call rate algorithm to use for tracking and limiting calls. From 48504b1b8e441d58ac195a1e1f3e89d60a4ef092 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 15 Nov 2023 16:23:45 +0200 Subject: [PATCH 09/15] update version and changelog --- airbyte-integrations/connectors/source-stripe/metadata.yaml | 2 +- docs/integrations/sources/stripe.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-stripe/metadata.yaml b/airbyte-integrations/connectors/source-stripe/metadata.yaml index 0c34e7b7c27d..1aaaa9ee97c4 100644 --- a/airbyte-integrations/connectors/source-stripe/metadata.yaml +++ b/airbyte-integrations/connectors/source-stripe/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: e094cb9a-26de-4645-8761-65c0c425d1de - dockerImageTag: 4.5.3 + dockerImageTag: 4.5.4 dockerRepository: airbyte/source-stripe documentationUrl: https://docs.airbyte.com/integrations/sources/stripe githubIssueLabel: source-stripe diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 30e1967bf371..15723be7a9f6 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -192,6 +192,7 @@ The Stripe connector should not run into Stripe API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.5.4 | 2023-11-15 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting | | 4.5.3 | 2023-11-14 | [32473](https://github.com/airbytehq/airbyte/pull/32473/) | Have all full_refresh stream syncs be concurrent | | 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | From efb11dd4e980d2222815bb52431a5b4ca8a615e4 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 16 Nov 2023 00:42:56 +0200 Subject: [PATCH 10/15] fix unit tests after merge --- .../connectors/source-stripe/unit_tests/test_source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py index be2cf24425a9..476dbd38a689 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/test_source.py @@ -112,7 +112,7 @@ def test_call_budget_creation(mocker, input_config, default_call_limit): policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy") matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher") - source = SourceStripe() + source = SourceStripe(catalog=None) source.get_api_call_budget(input_config) @@ -135,7 +135,7 @@ def test_call_budget_passed_to_every_stream(mocker): """Test that each stream has call_budget passed and creates a proper session""" prod_config = {"account_id": 1, "client_secret": "secret"} - source = SourceStripe() + source = SourceStripe(catalog=None) get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget") streams = source.streams(prod_config) From 7a440f4348aac2ad858821615dc0498e4e7e9c08 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 16 Nov 2023 00:45:57 +0200 Subject: [PATCH 11/15] remove fixed window policy --- .../source-stripe/source_stripe/source.py | 52 ++++++------------- 1 file changed, 15 insertions(+), 37 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 041f84c3be0a..dff4bb20751c 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -4,7 +4,7 @@ import logging import os -from datetime import datetime, timedelta +from datetime import timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Tuple import pendulum @@ -17,7 +17,6 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.call_rate import ( AbstractAPIBudget, - FixedWindowCallRatePolicy, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, @@ -157,42 +156,21 @@ def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget: else: call_limit = max_call_rate - if config.get("call_rate_policy") == "fixed_window": - policies = [ - FixedWindowCallRatePolicy( - next_reset_ts=datetime.now(), - period=timedelta(seconds=1), - call_limit=20, - matchers=[ - HttpRequestMatcher(url="https://api.stripe.com/v1/files"), - HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), - ], - ), - FixedWindowCallRatePolicy( - next_reset_ts=datetime.now(), - period=timedelta(seconds=1), - call_limit=call_limit, - matchers=[], - ), - ] - elif config.get("call_rate_policy") == "moving_window": - policies = [ - MovingWindowCallRatePolicy( - rates=[Rate(limit=20, interval=timedelta(seconds=1))], - matchers=[ - HttpRequestMatcher(url="https://api.stripe.com/v1/files"), - HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), - ], - ), - MovingWindowCallRatePolicy( - rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], - matchers=[], - ), - ] - else: - policies = [] + policies = [ + MovingWindowCallRatePolicy( + rates=[Rate(limit=20, interval=timedelta(seconds=1))], + matchers=[ + HttpRequestMatcher(url="https://api.stripe.com/v1/files"), + HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"), + ], + ), + MovingWindowCallRatePolicy( + rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))], + matchers=[], + ), + ] - return HttpAPIBudget(policies=policies, maximum_attempts_to_acquire=10000) + return HttpAPIBudget(policies=policies) def streams(self, config: Mapping[str, Any]) -> List[Stream]: config = self.validate_and_fill_with_defaults(config) From e0db4653bc7c3d64897f3c92cb50c7254dc7ccb1 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 16 Nov 2023 00:49:03 +0200 Subject: [PATCH 12/15] fix default value for num_workers --- .../connectors/source-stripe/source_stripe/source.py | 4 +--- .../connectors/source-stripe/source_stripe/spec.yaml | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index dff4bb20751c..934e15cc339b 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -500,9 +500,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), ] - # We cap the number of workers to avoid hitting the Stripe rate limit - # The limit can be removed or increased once we have proper rate limiting - concurrency_level = min(config.get("num_workers", 2), _MAX_CONCURRENCY) + concurrency_level = min(config.get("num_workers", 3), _MAX_CONCURRENCY) streams[0].logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}") return [ diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index 41781fdb39a7..1a5fb5af1bb7 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -61,8 +61,8 @@ connectionSpecification: type: integer title: Number of concurrent workers minimum: 1 - maximum: 3 - default: 20 + maximum: 20 + default: 3 examples: [1, 2, 3] description: >- The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be. From 109d6ab80069231f2d904174b47c83781844dd96 Mon Sep 17 00:00:00 2001 From: keu Date: Wed, 15 Nov 2023 22:52:23 +0000 Subject: [PATCH 13/15] Automated Commit - Formatting Changes --- .../connectors/source-stripe/source_stripe/source.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 934e15cc339b..31d5d0654be1 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -15,13 +15,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.call_rate import ( - AbstractAPIBudget, - HttpAPIBudget, - HttpRequestMatcher, - MovingWindowCallRatePolicy, - Rate, -) +from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator From 5ad58799050429f250b42c8bdf36c174a8ab2dde Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 16 Nov 2023 00:54:29 +0200 Subject: [PATCH 14/15] fix default number of workers --- .../connectors/source-stripe/source_stripe/source.py | 2 +- .../connectors/source-stripe/source_stripe/spec.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py index 934e15cc339b..5c847258cfd0 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/source.py +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/source.py @@ -500,7 +500,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), ] - concurrency_level = min(config.get("num_workers", 3), _MAX_CONCURRENCY) + concurrency_level = min(config.get("num_workers", 10), _MAX_CONCURRENCY) streams[0].logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}") return [ diff --git a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml index 1a5fb5af1bb7..5a31b610cd27 100644 --- a/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml +++ b/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml @@ -62,10 +62,10 @@ connectionSpecification: title: Number of concurrent workers minimum: 1 maximum: 20 - default: 3 + default: 10 examples: [1, 2, 3] description: >- - The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be. + The number of worker thread to use for the sync. The performance upper boundary depends on call_rate_limit setting and type of account. order: 5 call_rate_limit: From 98ee96f94e2684a5e7b97dcc21c3ac935e8211e7 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 16 Nov 2023 03:48:48 +0200 Subject: [PATCH 15/15] stripe: update date in changelog --- docs/integrations/sources/stripe.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/integrations/sources/stripe.md b/docs/integrations/sources/stripe.md index 8a96717a866f..bde9f9d6d0e6 100644 --- a/docs/integrations/sources/stripe.md +++ b/docs/integrations/sources/stripe.md @@ -216,7 +216,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 4.5.4 | 2023-11-15 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting | +| 4.5.4 | 2023-11-16 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting | | 4.5.3 | 2023-11-14 | [32473](https://github.com/airbytehq/airbyte/pull/32473/) | Have all full_refresh stream syncs be concurrent | | 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues | | 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 | @@ -300,4 +300,4 @@ Each record is marked with `is_deleted` flag when the appropriate event happens | 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts | | 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors | - \ No newline at end of file +