Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable client-side rate limiting on source-stripe #31512 #32284

Merged
merged 22 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cbe519b
add call_rate_limit parameter
eugene-kulak Nov 8, 2023
8be4462
customize call rate limit for files API
eugene-kulak Nov 8, 2023
5b95449
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 8, 2023
c39d5ef
add file_links call rate checks
eugene-kulak Nov 8, 2023
c503ed9
unit tests
eugene-kulak Nov 14, 2023
6f0608e
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
4dfafc4
change call rate policy to use MovingWindow
eugene-kulak Nov 14, 2023
830c89d
add call_ratee_policy parameter (for testing)
eugene-kulak Nov 14, 2023
aa3d3ed
increase maximum_attempts_to_acquire to never hit it
eugene-kulak Nov 14, 2023
2bba7f7
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
404e41e
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 14, 2023
f85b6d7
update CDK version
eugene-kulak Nov 14, 2023
48504b1
update version and changelog
eugene-kulak Nov 15, 2023
b22c7be
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 15, 2023
efb11dd
fix unit tests after merge
eugene-kulak Nov 15, 2023
7a440f4
remove fixed window policy
eugene-kulak Nov 15, 2023
cafa299
Merge remote-tracking branch 'origin/master' into keu/source-stripe/c…
eugene-kulak Nov 15, 2023
e0db465
fix default value for num_workers
eugene-kulak Nov 15, 2023
109d6ab
Automated Commit - Formatting Changes
keu Nov 15, 2023
5ad5879
fix default number of workers
eugene-kulak Nov 15, 2023
5836114
Merge remote-tracking branch 'origin/keu/source-stripe/call_rate' int…
eugene-kulak Nov 15, 2023
98ee96f
stripe: update date in changelog
eugene-kulak Nov 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk==0.52.8", "stripe==2.56.0", "pendulum==2.1.2"]
MAIN_REQUIREMENTS = ["airbyte-cdk==0.53.2", "stripe==2.56.0", "pendulum==2.1.2"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we'll need to bump this to the latest version with the higher retry count


TEST_REQUIREMENTS = ["pytest-mock~=3.6.1", "pytest~=6.1", "requests-mock", "requests_mock~=1.8", "freezegun==1.2.2"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
import os
from datetime import datetime, timedelta
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
Expand All @@ -13,6 +15,14 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.call_rate import (
AbstractAPIBudget,
FixedWindowCallRatePolicy,
HttpAPIBudget,
HttpRequestMatcher,
MovingWindowCallRatePolicy,
Rate,
)
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import NoopCursor
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
Expand All @@ -33,9 +43,12 @@
UpdatedCursorIncrementalStripeStream,
)

_MAX_CONCURRENCY = 3
logger = logging.getLogger("airbyte")

_MAX_CONCURRENCY = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the limit in the spec should be aligned with this limit

_CACHE_DISABLED = os.environ.get("CACHE_DISABLED")
USE_CACHE = not _CACHE_DISABLED
STRIPE_TEST_ACCOUNT_PREFIX = "sk_test_"


class SourceStripe(AbstractSource):
Expand Down Expand Up @@ -114,6 +127,73 @@ def customers(**args):
**args,
)

@staticmethod
def is_test_account(config: Mapping[str, Any]) -> bool:
"""Check if configuration uses Stripe test account (https://stripe.com/docs/keys#obtain-api-keys)

:param config:
:return: True if configured to use a test account, False - otherwise
"""

return str(config["client_secret"]).startswith(STRIPE_TEST_ACCOUNT_PREFIX)

def get_api_call_budget(self, config: Mapping[str, Any]) -> AbstractAPIBudget:
"""Get API call budget which connector is allowed to use.

:param config:
:return:
"""

max_call_rate = 25 if self.is_test_account(config) else 100
if config.get("call_rate_limit"):
call_limit = config["call_rate_limit"]
if call_limit > max_call_rate:
keu marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
"call_rate_limit is larger than maximum allowed %s, fallback to default %s.",
max_call_rate,
max_call_rate,
)
call_limit = max_call_rate
else:
call_limit = max_call_rate

if config.get("call_rate_policy") == "fixed_window":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call_rate_policy isn't in the pec so this will never be true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something went wrong with merge, so this was an old code, updated

policies = [
FixedWindowCallRatePolicy(
next_reset_ts=datetime.now(),
period=timedelta(seconds=1),
call_limit=20,
matchers=[
HttpRequestMatcher(url="https://api.stripe.com/v1/files"),
HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"),
],
),
FixedWindowCallRatePolicy(
next_reset_ts=datetime.now(),
period=timedelta(seconds=1),
call_limit=call_limit,
matchers=[],
),
]
elif config.get("call_rate_policy") == "moving_window":
policies = [
MovingWindowCallRatePolicy(
rates=[Rate(limit=20, interval=timedelta(seconds=1))],
matchers=[
HttpRequestMatcher(url="https://api.stripe.com/v1/files"),
HttpRequestMatcher(url="https://api.stripe.com/v1/file_links"),
],
),
MovingWindowCallRatePolicy(
rates=[Rate(limit=call_limit, interval=timedelta(seconds=1))],
matchers=[],
),
]
else:
policies = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default should use a policy


return HttpAPIBudget(policies=policies, maximum_attempts_to_acquire=10000)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config = self.validate_and_fill_with_defaults(config)
authenticator = TokenAuthenticator(config["client_secret"])
Expand All @@ -122,6 +202,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"account_id": config["account_id"],
"start_date": config["start_date"],
"slice_range": config["slice_range"],
"api_budget": self.get_api_call_budget(config),
}
incremental_args = {**args, "lookback_window_days": config["lookback_window_days"]}
subscriptions = IncrementalStripeStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,19 @@ connectionSpecification:
examples: [1, 2, 3]
description: >-
The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The number of worker thread to use for the sync. The bigger the value is, the faster the sync will be.
The number of worker thread to use for the sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this statement isn't true

Be careful as rate limiting is not implemented.
The performance upper boundary depends on call_rate_limit setting and type of account.
order: 5
call_rate_limit:
type: integer
title: Max number of API calls per second
examples: [25, 100]
description: >-
The number of API calls per second that you allow connector to make. This value can not be bigger than real
API call rate limit (https://stripe.com/docs/rate-limits). If not specified the default maximum is 25 and 100
calls per second for test and production tokens respectively.
call_rate_policy:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this field only for testing purposes? This looks like an implementation detail to me. a simpler option for the user would be to configure the rate limit in terms of requests per seconds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this just for your convenience to test with different policies :)

type: string
title: Call rate policy to use
examples: [fixed_window, moving_window, none]
description: >-
The call rate algorithm to use for tracking and limiting calls.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
import logging
from contextlib import nullcontext as does_not_raise
from unittest.mock import patch
Expand All @@ -10,7 +10,9 @@
import source_stripe
import stripe
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources.streams.call_rate import CachedLimiterSession, LimiterSession, Rate
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.utils import AirbyteTracedException
from source_stripe import SourceStripe

Expand Down Expand Up @@ -92,3 +94,57 @@ def test_when_streams_return_full_refresh_as_concurrent():
).streams(_a_valid_config())

assert len(list(filter(lambda stream: isinstance(stream, StreamFacade), streams))) == 1


@pytest.mark.parametrize(
"input_config, default_call_limit",
(
({"account_id": 1, "client_secret": "secret"}, 100),
({"account_id": 1, "client_secret": "secret", "call_rate_limit": 10}, 10),
({"account_id": 1, "client_secret": "secret", "call_rate_limit": 110}, 100),
({"account_id": 1, "client_secret": "sk_test_some_secret"}, 25),
({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 10}, 10),
({"account_id": 1, "client_secret": "sk_test_some_secret", "call_rate_limit": 30}, 25),
),
)
def test_call_budget_creation(mocker, input_config, default_call_limit):
"""Test that call_budget was created with specific config i.e., that first policy has specific matchers."""

policy_mock = mocker.patch("source_stripe.source.MovingWindowCallRatePolicy")
matcher_mock = mocker.patch("source_stripe.source.HttpRequestMatcher")
source = SourceStripe()

source.get_api_call_budget(input_config)

policy_mock.assert_has_calls(
calls=[
mocker.call(matchers=[mocker.ANY, mocker.ANY], rates=[Rate(limit=20, interval=datetime.timedelta(seconds=1))]),
mocker.call(matchers=[], rates=[Rate(limit=default_call_limit, interval=datetime.timedelta(seconds=1))]),
],
)

matcher_mock.assert_has_calls(
calls=[
mocker.call(url="https://api.stripe.com/v1/files"),
mocker.call(url="https://api.stripe.com/v1/file_links"),
]
)


def test_call_budget_passed_to_every_stream(mocker):
"""Test that each stream has call_budget passed and creates a proper session"""

prod_config = {"account_id": 1, "client_secret": "secret"}
source = SourceStripe()
get_api_call_budget_mock = mocker.patch.object(source, "get_api_call_budget")

streams = source.streams(prod_config)

assert streams
get_api_call_budget_mock.assert_called_once()

for stream in streams:
assert isinstance(stream, HttpStream)
session = stream.request_session()
assert isinstance(session, (CachedLimiterSession, LimiterSession))
assert session._api_budget == get_api_call_budget_mock.return_value
Loading