Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source-stripe: Revert 3.17.2 and apply atm_fee changes #29309

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=3.17.2
LABEL io.airbyte.version=3.17.3
LABEL io.airbyte.name=airbyte/source-stripe
3 changes: 1 addition & 2 deletions airbyte-integrations/connectors/source-stripe/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 3.17.2
dockerImageTag: 3.17.3
dockerRepository: airbyte/source-stripe
githubIssueLabel: source-stripe
icon: stripe.svg
Expand All @@ -14,7 +14,6 @@ data:
registries:
cloud:
enabled: true
dockerImageTag: 3.17.1 # p0-stripe-object-nodes-23-08-02
oss:
enabled: true
releaseStage: generally_available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,6 @@
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests import HTTPError

STRIPE_ERROR_CODES = {
"more_permissions_required": "This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate",
"account_invalid": "The card, or account the card is connected to, is invalid. You need to contact your card issuer "
"to check that the card is working correctly.",
"oauth_not_supported": "Please use a different authentication method.",
}


class StripeAvailabilityStrategy(HttpAvailabilityStrategy):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need leave availability strategy and test with it.

def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
status_code = error.response.status_code
if status_code not in [400, 403]:
raise error
parsed_error = error.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = STRIPE_ERROR_CODES.get(error_code, parsed_error.get("error", {}).get("message"))
if not error_message:
raise error
doc_ref = self._visit_docs_message(logger, source)
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} "
response_error_message = stream.parse_response_error_message(error.response)
if response_error_message:
reason += response_error_message
return False, reason


class StripeSubStreamAvailabilityStrategy(HttpAvailabilityStrategy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,27 @@
"type": ["null", "string"]
},
"shipping_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
}
}
},
Expand All @@ -681,7 +701,27 @@
"type": ["null", "object"],
"properties": {
"billing_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
},
"email": {
"type": ["null", "string"]
Expand All @@ -690,7 +730,27 @@
"type": ["null", "string"]
},
"shipping_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy

from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy

STRIPE_ERROR_CODES: List = [
# stream requires additional permissions
"more_permissions_required",
# account_id doesn't have the access to the stream
"account_invalid",
]
STRIPE_API_VERSION = "2022-11-15"


Expand All @@ -24,10 +30,6 @@ class StripeStream(HttpStream, ABC):
DEFAULT_SLICE_RANGE = 365
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return StripeAvailabilityStrategy()

def __init__(self, start_date: int, account_id: str, slice_range: int = DEFAULT_SLICE_RANGE, **kwargs):
super().__init__(**kwargs)
self.account_id = account_id
Expand Down Expand Up @@ -64,6 +66,27 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
response_json = response.json()
yield from response_json.get("data", []) # Stripe puts records in a container array "data"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
parsed_error = e.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = parsed_error.get("message")
# if the API Key doesn't have required permissions to particular stream, this stream will be skipped
if status_code == 403 and error_code in STRIPE_ERROR_CODES:
self.logger.warn(f"Stream {self.name} is skipped, due to {error_code}. Full message: {error_message}")
pass
else:
self.logger.error(f"Syncing stream {self.name} is failed, due to {error_code}. Full message: {error_message}")


class BasePaginationStripeStream(StripeStream, ABC):
def request_params(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,30 @@ def test_source_streams():
assert len(streams) == 46


@pytest.fixture(name="config")
def config_fixture():
config = {"client_secret": "sk_test(live)_<secret>",
"account_id": "<account_id>", "start_date": "2020-05-01T00:00:00Z"}
return config


@pytest.fixture(name="logger_mock")
def logger_mock_fixture():
return patch("source_tiktok_marketing.source.logger")


@patch.object(source_stripe.source, "stripe")
def test_source_check_connection_ok(mocked_client, config):
assert SourceStripe().check_connection(None, config=config) == (True, None)
def test_source_check_connection_ok(mocked_client, config, logger_mock):
assert SourceStripe().check_connection(
logger_mock, config=config) == (True, None)


@patch.object(source_stripe.source, "stripe")
def test_source_check_connection_failure(mocked_client, config):
def test_source_check_connection_failure(mocked_client, config, logger_mock):
exception = Exception("Test")
mocked_client.Account.retrieve = Mock(side_effect=exception)
assert SourceStripe().check_connection(None, config=config) == (False, exception)
assert SourceStripe().check_connection(
logger_mock, config=config) == (False, exception)


@patch.object(source_stripe.source, "stripe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging

import pendulum
import pytest
from airbyte_cdk.models import SyncMode
from source_stripe.availability_strategy import STRIPE_ERROR_CODES
from source_stripe.streams import (
ApplicationFees,
ApplicationFeesRefunds,
Expand Down Expand Up @@ -155,6 +152,12 @@ def test_sub_stream(requests_mock):
]


@pytest.fixture(name="config")
def config_fixture():
config = {"authenticator": "authenticator", "account_id": "<account_id>", "start_date": 1596466368}
return config


@pytest.mark.parametrize(
"stream_cls, kwargs, expected",
[
Expand Down Expand Up @@ -195,9 +198,9 @@ def test_path_and_headers(
stream_cls,
kwargs,
expected,
stream_args,
config,
):
stream = stream_cls(**stream_args)
stream = stream_cls(**config)
assert stream.path(**kwargs) == expected
headers = stream.request_headers(**kwargs)
assert headers["Stripe-Version"] == "2022-11-15"
Expand Down Expand Up @@ -238,44 +241,6 @@ def test_request_params(
stream,
kwargs,
expected,
stream_args,
config,
):
assert stream(**stream_args).request_params(**kwargs) == expected


@pytest.mark.parametrize(
"stream_cls",
(
ApplicationFees,
Customers,
BalanceTransactions,
Charges,
Coupons,
Disputes,
Events,
Invoices,
InvoiceItems,
Payouts,
Plans,
Prices,
Products,
Subscriptions,
SubscriptionSchedule,
Transfers,
Refunds,
PaymentIntents,
CheckoutSessions,
PromotionCodes,
ExternalAccount,
SetupIntents,
ShippingRates
)
)
def test_403_error_handling(stream_args, stream_cls, requests_mock):
stream = stream_cls(**stream_args)
logger = logging.getLogger("airbyte")
for error_code in STRIPE_ERROR_CODES:
requests_mock.get(f"{stream.url_base}{stream.path()}", status_code=403, json={"error": {"code": f"{error_code}"}})
available, message = stream.check_availability(logger)
assert not available
assert STRIPE_ERROR_CODES[error_code] in message
assert stream(**config).request_params(**kwargs) == expected
Loading