Skip to content

Commit

Permalink
Source-stripe: Revert 3.17.2 and apply atm_fee changes (#29309)
Browse files Browse the repository at this point in the history
* Revert ":bug: Source Stripe: add availability strategy (#28911)"

This reverts commit cd51881.

* Run git checkout cd51881 -- airbyte-integrations/connectors/source-stripe/source_stripe/schemas/transactions.json

* Bump version to 3.17.3

* Update change log

* update test config

* Remove skip
  • Loading branch information
bnchrch authored Aug 11, 2023
1 parent 45c1de3 commit 1219e66
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 197 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-stripe/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=3.17.2
LABEL io.airbyte.version=3.17.3
LABEL io.airbyte.name=airbyte/source-stripe
3 changes: 1 addition & 2 deletions airbyte-integrations/connectors/source-stripe/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 3.17.2
dockerImageTag: 3.17.3
dockerRepository: airbyte/source-stripe
githubIssueLabel: source-stripe
icon: stripe.svg
Expand All @@ -14,7 +14,6 @@ data:
registries:
cloud:
enabled: true
dockerImageTag: 3.17.1 # p0-stripe-object-nodes-23-08-02
oss:
enabled: true
releaseStage: generally_available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,6 @@
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests import HTTPError

STRIPE_ERROR_CODES = {
"more_permissions_required": "This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate",
"account_invalid": "The card, or account the card is connected to, is invalid. You need to contact your card issuer "
"to check that the card is working correctly.",
"oauth_not_supported": "Please use a different authentication method.",
}


class StripeAvailabilityStrategy(HttpAvailabilityStrategy):
def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
status_code = error.response.status_code
if status_code not in [400, 403]:
raise error
parsed_error = error.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = STRIPE_ERROR_CODES.get(error_code, parsed_error.get("error", {}).get("message"))
if not error_message:
raise error
doc_ref = self._visit_docs_message(logger, source)
reason = f"The endpoint {error.response.url} returned {status_code}: {error.response.reason}. {error_message}. {doc_ref} "
response_error_message = stream.parse_response_error_message(error.response)
if response_error_message:
reason += response_error_message
return False, reason


class StripeSubStreamAvailabilityStrategy(HttpAvailabilityStrategy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,27 @@
"type": ["null", "string"]
},
"shipping_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
}
}
},
Expand All @@ -681,7 +701,27 @@
"type": ["null", "object"],
"properties": {
"billing_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
},
"email": {
"type": ["null", "string"]
Expand All @@ -690,7 +730,27 @@
"type": ["null", "string"]
},
"shipping_address": {
"$ref": "address.json"
"type": ["null", "object"],
"properties": {
"city": {
"type": ["null", "string"]
},
"country": {
"type": ["null", "string"]
},
"line1": {
"type": ["null", "string"]
},
"line2": {
"type": ["null", "string"]
},
"postal_code": {
"type": ["null", "string"]
},
"state": {
"type": ["null", "string"]
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from source_stripe.availability_strategy import StripeAvailabilityStrategy, StripeSubStreamAvailabilityStrategy

from source_stripe.availability_strategy import StripeSubStreamAvailabilityStrategy

STRIPE_ERROR_CODES: List = [
# stream requires additional permissions
"more_permissions_required",
# account_id doesn't have the access to the stream
"account_invalid",
]
STRIPE_API_VERSION = "2022-11-15"


Expand All @@ -24,10 +30,6 @@ class StripeStream(HttpStream, ABC):
DEFAULT_SLICE_RANGE = 365
transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return StripeAvailabilityStrategy()

def __init__(self, start_date: int, account_id: str, slice_range: int = DEFAULT_SLICE_RANGE, **kwargs):
super().__init__(**kwargs)
self.account_id = account_id
Expand Down Expand Up @@ -64,6 +66,27 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
response_json = response.json()
yield from response_json.get("data", []) # Stripe puts records in a container array "data"

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
try:
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
parsed_error = e.response.json()
error_code = parsed_error.get("error", {}).get("code")
error_message = parsed_error.get("message")
# if the API Key doesn't have required permissions to particular stream, this stream will be skipped
if status_code == 403 and error_code in STRIPE_ERROR_CODES:
self.logger.warn(f"Stream {self.name} is skipped, due to {error_code}. Full message: {error_message}")
pass
else:
self.logger.error(f"Syncing stream {self.name} is failed, due to {error_code}. Full message: {error_message}")


class BasePaginationStripeStream(StripeStream, ABC):
def request_params(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,30 @@ def test_source_streams():
assert len(streams) == 46


@pytest.fixture(name="config")
def config_fixture():
config = {"client_secret": "sk_test(live)_<secret>",
"account_id": "<account_id>", "start_date": "2020-05-01T00:00:00Z"}
return config


@pytest.fixture(name="logger_mock")
def logger_mock_fixture():
return patch("source_tiktok_marketing.source.logger")


@patch.object(source_stripe.source, "stripe")
def test_source_check_connection_ok(mocked_client, config):
assert SourceStripe().check_connection(None, config=config) == (True, None)
def test_source_check_connection_ok(mocked_client, config, logger_mock):
assert SourceStripe().check_connection(
logger_mock, config=config) == (True, None)


@patch.object(source_stripe.source, "stripe")
def test_source_check_connection_failure(mocked_client, config):
def test_source_check_connection_failure(mocked_client, config, logger_mock):
exception = Exception("Test")
mocked_client.Account.retrieve = Mock(side_effect=exception)
assert SourceStripe().check_connection(None, config=config) == (False, exception)
assert SourceStripe().check_connection(
logger_mock, config=config) == (False, exception)


@patch.object(source_stripe.source, "stripe")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging

import pendulum
import pytest
from airbyte_cdk.models import SyncMode
from source_stripe.availability_strategy import STRIPE_ERROR_CODES
from source_stripe.streams import (
ApplicationFees,
ApplicationFeesRefunds,
Expand Down Expand Up @@ -155,6 +152,12 @@ def test_sub_stream(requests_mock):
]


@pytest.fixture(name="config")
def config_fixture():
config = {"authenticator": "authenticator", "account_id": "<account_id>", "start_date": 1596466368}
return config


@pytest.mark.parametrize(
"stream_cls, kwargs, expected",
[
Expand Down Expand Up @@ -195,9 +198,9 @@ def test_path_and_headers(
stream_cls,
kwargs,
expected,
stream_args,
config,
):
stream = stream_cls(**stream_args)
stream = stream_cls(**config)
assert stream.path(**kwargs) == expected
headers = stream.request_headers(**kwargs)
assert headers["Stripe-Version"] == "2022-11-15"
Expand Down Expand Up @@ -238,44 +241,6 @@ def test_request_params(
stream,
kwargs,
expected,
stream_args,
config,
):
assert stream(**stream_args).request_params(**kwargs) == expected


@pytest.mark.parametrize(
"stream_cls",
(
ApplicationFees,
Customers,
BalanceTransactions,
Charges,
Coupons,
Disputes,
Events,
Invoices,
InvoiceItems,
Payouts,
Plans,
Prices,
Products,
Subscriptions,
SubscriptionSchedule,
Transfers,
Refunds,
PaymentIntents,
CheckoutSessions,
PromotionCodes,
ExternalAccount,
SetupIntents,
ShippingRates
)
)
def test_403_error_handling(stream_args, stream_cls, requests_mock):
stream = stream_cls(**stream_args)
logger = logging.getLogger("airbyte")
for error_code in STRIPE_ERROR_CODES:
requests_mock.get(f"{stream.url_base}{stream.path()}", status_code=403, json={"error": {"code": f"{error_code}"}})
available, message = stream.check_availability(logger)
assert not available
assert STRIPE_ERROR_CODES[error_code] in message
assert stream(**config).request_params(**kwargs) == expected
Loading

0 comments on commit 1219e66

Please sign in to comment.