Skip to content

Commit

Permalink
✨ Source Shopify: add resiliency on some transient errors using the H…
Browse files Browse the repository at this point in the history
…ttpClient (#38084)

Co-authored-by: Oleksandr Bazarnov <oleksandr.bazarnov@globallogic.com>
Co-authored-by: maxi297 <maxime@airbyte.io>
Co-authored-by: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com>
  • Loading branch information
4 people authored Jun 6, 2024
1 parent df39bf8 commit 64d39cb
Show file tree
Hide file tree
Showing 19 changed files with 976 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.2.2
dockerImageTag: 2.2.3
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
340 changes: 324 additions & 16 deletions airbyte-integrations/connectors/source-shopify/poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions airbyte-integrations/connectors/source-shopify/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.2.2"
version = "2.2.3"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_shopify"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "0.81.4"
airbyte-cdk = "0.90.0"
sgqlc = "==16.3"
graphql-query = "^1.1.1"

Expand All @@ -28,3 +28,4 @@ source-shopify = "source_shopify.run:run"
requests-mock = "^1.11.0"
pytest-mock = "^3.12.0"
pytest = "^8.0.0"
freezegun = "^1.4.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Optional, Union

import requests
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, ResponseAction
from airbyte_protocol.models import FailureType
from requests import exceptions

RESPONSE_CONSUMPTION_EXCEPTIONS = (
exceptions.ChunkedEncodingError,
exceptions.JSONDecodeError,
)

TRANSIENT_EXCEPTIONS = (
exceptions.ConnectTimeout,
exceptions.ConnectionError,
exceptions.HTTPError,
exceptions.ReadTimeout,
# This error was added as part of the migration from REST to bulk (https://github.com/airbytehq/airbyte/commit/f5094041bebb80cd6602a98829c19a7515276ed3) but it is unclear in which case it occurs and why it is transient
exceptions.SSLError,
) + RESPONSE_CONSUMPTION_EXCEPTIONS

_NO_ERROR_RESOLUTION = ErrorResolution(ResponseAction.SUCCESS, None, None)


class ShopifyErrorHandler(ErrorHandler):
def __init__(self, stream_name: str = "<no specified stream>") -> None:
self._stream_name = stream_name

def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
if isinstance(response, TRANSIENT_EXCEPTIONS):
return ErrorResolution(
ResponseAction.RETRY,
FailureType.transient_error,
f"Error of type {type(response)} is considered transient. Try again later. (full error message is {response})",
)
elif isinstance(response, requests.Response):
if response.ok:
return _NO_ERROR_RESOLUTION

if response.status_code == 429 or response.status_code >= 500:
return ErrorResolution(
ResponseAction.RETRY,
FailureType.transient_error,
f"Status code `{response.status_code}` is considered transient. Try again later. (full error message is {response.content})",
)

return _NO_ERROR_RESOLUTION # Not all the error handling is defined here so it assumes the previous code will handle the error if there is one
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from typing import Any, Iterable, List, Mapping, Optional

import requests
from airbyte_cdk.sources.streams.http import HttpClient
from requests.exceptions import ConnectionError, InvalidURL, JSONDecodeError, SSLError

from .http_request import ShopifyErrorHandler
from .utils import ShopifyAccessScopesError, ShopifyBadJsonError, ShopifyConnectionError, ShopifyWrongShopNameError

SCOPES_MAPPING: Mapping[str, set[str]] = {
Expand Down Expand Up @@ -83,35 +85,32 @@ class ShopifyScopes:
logger = logging.getLogger("airbyte")

def __init__(self, config: Mapping[str, Any]) -> None:
self.permitted_streams: List[str] = list(ALWAYS_PERMITTED_STREAMS)
self.not_permitted_streams: List[set[str, str]] = []
self._error_handler = ShopifyErrorHandler()
self._http_client = HttpClient("ShopifyScopes", self.logger, self._error_handler, session=requests.Session())

self.user_scopes = self.get_user_scopes(config)
# for each stream check the authenticated user has all scopes required
self.get_streams_from_user_scopes()
# log if there are streams missing scopes and should be omitted
self.emit_missing_scopes()

# the list of validated streams
permitted_streams: List[str] = ALWAYS_PERMITTED_STREAMS
# the list of not permitted streams
not_permitted_streams: List[set[str, str]] = []
# template for the log message
missing_scope_message: str = (
"The stream `{stream}` could not be synced without the `{scope}` scope. Please check the `{scope}` is granted."
)

@staticmethod
def get_user_scopes(config) -> list[Any]:
session = requests.Session()
def get_user_scopes(self, config) -> list[Any]:
url = f"https://{config['shop']}.myshopify.com/admin/oauth/access_scopes.json"
headers = config["authenticator"].get_auth_header()
try:
response = session.get(url, headers=headers).json()
access_scopes = [scope.get("handle") for scope in response.get("access_scopes")]
_, response = self._http_client.send_request("GET", url, headers=headers, request_kwargs={})
access_scopes = [scope.get("handle") for scope in response.json().get("access_scopes")]
except InvalidURL:
raise ShopifyWrongShopNameError(url)
except JSONDecodeError as json_error:
raise ShopifyBadJsonError(json_error)
except (SSLError, ConnectionError) as con_error:
raise ShopifyConnectionError(con_error)

if access_scopes:
return access_scopes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,12 @@ class BulkJobTimout(BaseBulkException):
class BulkJobAccessDenied(BaseBulkException):
"""Raised when BULK Job has ACCESS_DENIED status"""

class BulkJobCreationFailedConcurrentError(BaseBulkException):
"""Raised when an attempt to create a job as failed because of concurrency limits."""

failure_type: FailureType = FailureType.transient_error

class BulkJobConcurrentError(BaseBulkException):
"""Raised when BULK Job could not be created, since the 1 Bulk job / shop quota is exceeded."""
"""Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError."""

failure_type: FailureType = FailureType.transient_error
Loading

0 comments on commit 64d39cb

Please sign in to comment.