Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Hubspot: handle connection errors #28909

Merged
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.2.0
LABEL io.airbyte.version=1.3.0
LABEL io.airbyte.name=airbyte/source-hubspot

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 1.2.0
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-hubspot
githubIssueLabel: source-hubspot
icon: hubspot.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,31 @@

from typing import Any

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils import AirbyteTracedException
from requests import HTTPError


class HubspotError(HTTPError):
class HubspotError(AirbyteTracedException):
"""
Base error class.
Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors.
"""

def __init__(
self,
internal_message: str = None,
message: str = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
response: requests.Response = None,
):
super().__init__(internal_message, message, failure_type, exception)
self.response = response

class HubspotTimeout(HubspotError):

class HubspotTimeout(HTTPError):
"""502/504 HubSpot has processing limits in place to prevent a single client from causing degraded performance,
and these responses indicate that those limits have been hit. You'll normally only see these timeout responses
when making a large number of requests over a sustained period. If you get one of these responses,
Expand All @@ -35,6 +49,10 @@ class HubspotRateLimited(HubspotError):
"""429 Rate Limit Reached"""


class HubspotBadRequest(HubspotError):
"""400 Bad Request"""


class InvalidStartDateConfigError(Exception):
"""Raises when the User inputs wrong or invalid `start_date` in inout configuration"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
try:
contacts = Contacts(**common_params)
_ = contacts.properties
except HubspotInvalidAuth:
alive = False
error_msg = "Authentication failed: Please check if provided credentials are valid and try again."
except HTTPError as error:
alive = False
error_msg = repr(error)
if error.response.status_code == HTTPStatus.BAD_REQUEST:
response_json = error.response.json()
error_msg = f"400 Bad Request: {response_json['message']}, please check if provided credentials are valid."
except HubspotInvalidAuth as e:
alive = False
error_msg = repr(e)
return alive, error_msg

def get_granted_scopes(self, authenticator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pendulum as pendulum
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import FailureType
from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
Expand All @@ -25,9 +26,17 @@
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils import AirbyteTracedException
from requests import HTTPError, codes
from source_hubspot.constants import OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout, InvalidStartDateConfigError
from source_hubspot.errors import (
HubspotAccessDenied,
HubspotBadRequest,
HubspotInvalidAuth,
HubspotRateLimited,
HubspotTimeout,
InvalidStartDateConfigError,
)
from source_hubspot.helpers import APIv1Property, APIv3Property, GroupByKey, IRecordPostProcessor, IURLPropertyRepresentation, StoreAsIs

# we got this when provided API Token has incorrect format
Expand Down Expand Up @@ -183,20 +192,27 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M
if response.headers.get("content-type") == "application/json;charset=utf-8" and response.status_code != HTTPStatus.OK:
message = response.json().get("message")

if response.status_code == HTTPStatus.FORBIDDEN:
"""Once hit the forbidden endpoint, we return the error message from response."""
pass
if response.status_code == HTTPStatus.BAD_REQUEST:
message = f"Request to {response.url} didn't succeed. Please verify your credentials and try again.\nError message from Hubspot API: {message}"
raise HubspotBadRequest(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code == HTTPStatus.FORBIDDEN:
message = f"The authenticated user does not have permissions to access the URL {response.url}. Verify your permissions to access this endpoint."
raise HubspotAccessDenied(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code in (HTTPStatus.UNAUTHORIZED, CLOUDFLARE_ORIGIN_DNS_ERROR):
raise HubspotInvalidAuth(message, response=response)
message = (
"The user cannot be authorized with provided credentials. Please verify that your credentails are valid and try again."
)
raise HubspotInvalidAuth(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = response.headers.get("Retry-After")
raise HubspotRateLimited(
f"429 Rate Limit Exceeded: API rate-limit has been reached until {retry_after} seconds."
internal_message=f"You have reached your Hubspot API limit. We will resume replication once after {retry_after} seconds."
" See https://developers.hubspot.com/docs/api/usage-details",
failure_type=FailureType.config_error,
response=response,
)
elif response.status_code in (HTTPStatus.BAD_GATEWAY, HTTPStatus.SERVICE_UNAVAILABLE):
raise HubspotTimeout(message, response=response)
raise HubspotTimeout(message, response)
else:
response.raise_for_status()

Expand Down Expand Up @@ -462,7 +478,11 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []
except requests.exceptions.HTTPError as e:
raise e
response = e.response
if response.status_code == HTTPStatus.UNAUTHORIZED:
raise AirbyteTracedException("The authentication to HubSpot has expired. Re-authenticate to restore access to HubSpot.")
else:
raise e

def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pendulum
import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
from source_hubspot.errors import HubspotRateLimited, InvalidStartDateConfigError
from source_hubspot.errors import HubspotAccessDenied, HubspotRateLimited, InvalidStartDateConfigError
from source_hubspot.helpers import APIv3Property
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, Companies, Deals, Engagements, MarketingEmails, Products, Stream
Expand Down Expand Up @@ -179,10 +179,11 @@ def test_stream_forbidden(requests_mock, config, caplog):
}
)

records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records
with pytest.raises(HubspotAccessDenied):
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


def test_parent_stream_forbidden(requests_mock, config, caplog, fake_properties_list):
Expand Down Expand Up @@ -219,10 +220,11 @@ def test_parent_stream_forbidden(requests_mock, config, caplog, fake_properties_
}
)

records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records
with pytest.raises(HubspotAccessDenied):
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


class TestSplittingPropertiesFunctionality:
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ There are two types of incremental sync:

1. Incremental (standard server-side, where API returns only the data updated or generated since the last sync)
2. Client-Side Incremental (API returns all available data and connector filters out only new records)
:::
:::

## Supported streams

Expand Down Expand Up @@ -204,6 +204,7 @@ Now that you have set up the Hubspot source connector, check out the following H

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1.3.0 | 2023-08-01 | [28909](https://github.com/airbytehq/airbyte/pull/28909) | Add handling of source connection errors |
| 1.2.0 | 2023-07-27 | [27091](https://github.com/airbytehq/airbyte/pull/27091) | Add new stream `ContactsMergedAudit` |
| 1.1.2 | 2023-07-27 | [28558](https://github.com/airbytehq/airbyte/pull/28558) | Improve error messages during connector setup |
| 1.1.1 | 2023-07-25 | [28705](https://github.com/airbytehq/airbyte/pull/28705) | Fix retry handler for token expired error |
Expand Down