Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable client-side rate limiting on source-stripe #31512 #32284

Merged
merged 22 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cbe519b
add call_rate_limit parameter
eugene-kulak Nov 8, 2023
8be4462
customize call rate limit for files API
eugene-kulak Nov 8, 2023
5b95449
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 8, 2023
c39d5ef
add file_links call rate checks
eugene-kulak Nov 8, 2023
c503ed9
unit tests
eugene-kulak Nov 14, 2023
6f0608e
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
4dfafc4
change call rate policy to use MovingWindow
eugene-kulak Nov 14, 2023
830c89d
add call_ratee_policy parameter (for testing)
eugene-kulak Nov 14, 2023
aa3d3ed
increase maximum_attempts_to_acquire to never hit it
eugene-kulak Nov 14, 2023
2bba7f7
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
404e41e
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
f85b6d7
update CDK version
eugene-kulak Nov 14, 2023
48504b1
update version and changelog
eugene-kulak Nov 15, 2023
b22c7be
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 15, 2023
efb11dd
fix unit tests after merge
eugene-kulak Nov 15, 2023
7a440f4
remove fixed window policy
eugene-kulak Nov 15, 2023
cafa299
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 15, 2023
e0db465
fix default value for num_workers
eugene-kulak Nov 15, 2023
109d6ab
Automated Commit - Formatting Changes
keu Nov 15, 2023
5ad5879
fix default number of workers
eugene-kulak Nov 15, 2023
5836114
Merge remote-tracking branch 'origin/keu/source-stripe/call_rate' int…
eugene-kulak Nov 15, 2023
98ee96f
stripe: update date in changelog
eugene-kulak Nov 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 4.5.3
dockerImageTag: 4.5.4
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
githubIssueLabel: source-stripe
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk==0.52.8", "stripe==2.56.0", "pendulum==2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.6", "stripe==2.56.0", "pendulum==2.1.2"]

TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
import os
from datetime import timedelta
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
Expand All @@ -13,6 +15,7 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.call_rate import AbstractAPIBudget, HttpAPIBudget, HttpRequestMatcher, MovingWindowCallRatePolicy, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
Expand All @@ -33,9 +36,12 @@
UpdatedCursorIncrementalStripeStream,
)

_MAX_CONCURRENCY = 3
logger = logging.getLogger("airbyte")

_MAX_CONCURRENCY = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the limit in the spec should be aligned with this limit

_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
USE_CACHE = not _CACHE_DISABLED
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"


class SourceStripe(AbstractSource):
Expand Down Expand Up @@ -114,6 +120,52 @@ def customers(**args):
**args,
)

@staticmethod
def is_test_account(config: Mapping[str, Any]) -> bool:
"""Check if configuration uses Stripe test account (https://stripe.com/docs/keys#obtain-api-keys)

:param config:
:return: True if configured to use a test account, False - otherwise
"""

return str(config["client_secret"]).startswith(STRIPE_TEST_ACCOUNT_PREFIX)

def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
"""Get API call budget which connector is allowed to use.

:param config:
:return:
"""

max_call_rate = 25 if self.is_test_account(config) else 100
if config.get("call_rate_limit"):
call_limit = config["call_rate_limit"]
if call_limit > max_call_rate:
keu marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
"call_rate_limit is larger than maximum allowed %s, fallback to default %s.",
max_call_rate,
max_call_rate,
)
call_limit = max_call_rate
else:
call_limit = max_call_rate

policies = [
MovingWindowCallRatePolicy(
rates=[Rate(limit=20, interval=timedelta(seconds=1))],
matchers=[
HttpRequestMatcher(url="https://api.stripe.com/v1/files"),
HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"),
],
),
MovingWindowCallRatePolicy(
rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))],
matchers=[],
),
]

return HttpAPIBudget(policies=policies)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])
Expand All @@ -122,6 +174,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"account_id": config["account_id"],
"start_date": config["start_date"],
"slice_range": config["slice_range"],
"api_budget": self.get_api_call_budget(config),
}
incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]}
subscriptions = IncrementalStripeStream(
Expand Down Expand Up @@ -441,9 +494,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
),
]

# We cap the number of workers to avoid hitting the Stripe rate limit
# The limit can be removed or increased once we have proper rate limiting
concurrency_level = min(config.get("num_workers", 2), _MAX_CONCURRENCY)
concurrency_level = min(config.get("num_workers", 10), _MAX_CONCURRENCY)
streams[0].logger.info(f"Using concurrent cdk with concurrency level {concurrency_level}")

return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ connectionSpecification:
type: integer
title: Number of concurrent workers
minimum: 1
maximum: 3
default: 2
maximum: 20
default: 10
examples: [1, 2, 3]
description: >-
The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be.
Be careful as rate limiting is not implemented.
The number of worker thread to use for the sync.
The performance upper boundary depends on call_rate_limit setting and type of account.
order: 5
call_rate_limit:
type: integer
title: Max number of API calls per second
examples: [25, 100]
description: >-
The number of API calls per second that you allow connector to make. This value can not be bigger than real
API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100
calls per second for test and production tokens respectively.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
import logging
from contextlib import nullcontext as does_not_raise
from unittest.mock import patch
Expand All @@ -10,7 +10,9 @@
import source_stripe
import stripe
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils import AirbyteTracedException
from source_stripe import SourceStripe

Expand Down Expand Up @@ -92,3 +94,57 @@ def test_when_streams_return_full_refresh_as_concurrent():
).streams(_a_valid_config())

assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 1


@pytest.mark.parametrize(
"input_config, default_call_limit",
(
({"account_id": 1, "client_secret": "secret"}, 100),
({"account_id": 1, "client_secret": "secret", "call_rate_limit": 10}, 10),
({"account_id": 1, "client_secret": "secret", "call_rate_limit": 110}, 100),
({"account_id": 1, "client_secret": "sk_test_some_secret"}, 25),
({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 10}, 10),
({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 30}, 25),
),
)
def test_call_budget_creation(mocker, input_config, default_call_limit):
"""Test that call_budget was created with specific config i.e., that first policy has specific matchers."""

policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy")
matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher")
source = SourceStripe(catalog=None)

source.get_api_call_budget(input_config)

policy_mock.assert_has_calls(
calls=[
mocker.call(matchers=[mocker.ANY, mocker.ANY], rates=[Rate(limit=20, interval=datetime.timedelta(seconds=1))]),
mocker.call(matchers=[], rates=[Rate(limit=default_call_limit, interval=datetime.timedelta(seconds=1))]),
],
)

matcher_mock.assert_has_calls(
calls=[
mocker.call(url="https://api.stripe.com/v1/files"),
mocker.call(url="https://api.stripe.com/v1/file_links"),
]
)


def test_call_budget_passed_to_every_stream(mocker):
"""Test that each stream has call_budget passed and creates a proper session"""

prod_config = {"account_id": 1, "client_secret": "secret"}
source = SourceStripe(catalog=None)
get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget")

streams = source.streams(prod_config)

assert streams
get_api_call_budget_mock.assert_called_once()

for stream in streams:
assert isinstance(stream, HttpStream)
session = stream.request_session()
assert isinstance(session, (CachedLimiterSession, LimiterSession))
assert session._api_budget == get_api_call_budget_mock.return_value
3 changes: 2 additions & 1 deletion docs/integrations/sources/stripe.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ Each record is marked with `is_deleted` flag when the appropriate event happens

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.5.4 | 2023-11-16 | [32284](https://github.com/airbytehq/airbyte/pull/32284/) | Enable client-side rate limiting |
| 4.5.3 | 2023-11-14 | [32473](https://github.com/airbytehq/airbyte/pull/32473/) | Have all full_refresh stream syncs be concurrent |
| 4.5.2 | 2023-11-03 | [32146](https://github.com/airbytehq/airbyte/pull/32146/) | Fix multiple BankAccount issues |
| 4.5.1 | 2023-11-01 | [32056](https://github.com/airbytehq/airbyte/pull/32056/) | Use CDK version 0.52.8 |
Expand Down Expand Up @@ -299,4 +300,4 @@ Each record is marked with `is_deleted` flag when the appropriate event happens
| 0.1.9 | 2021-05-13 | [3367](https://github.com/airbytehq/airbyte/pull/3367) | Add acceptance tests for connected accounts |
| 0.1.8 | 2021-05-11 | [3566](https://github.com/airbytehq/airbyte/pull/3368) | Bump CDK connectors |

</HideInUI>
</HideInUI>
Loading