Skip to content

Commit

Permalink
✨ Source Hubspot: handle connection errors (#28909)
Browse files Browse the repository at this point in the history
* Source Hubspot: handle connection errors

* Update changelog

* Fix format errors

* Fix imports

* Update custom errors, fix unit tests

* Remove unused import

* Update expected records, fix docs

* Update deals expected records
  • Loading branch information
arsenlosenko authored Aug 7, 2023
1 parent eec86e9 commit 7944410
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 31 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.2.0
LABEL io.airbyte.version=1.3.0
LABEL io.airbyte.name=airbyte/source-hubspot

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerImageTag: 1.2.0
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-hubspot
githubIssueLabel: source-hubspot
icon: hubspot.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,31 @@

from typing import Any

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils import AirbyteTracedException
from requests import HTTPError


class HubspotError(HTTPError):
class HubspotError(AirbyteTracedException):
"""
Base error class.
Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors.
"""

def __init__(
self,
internal_message: str = None,
message: str = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
response: requests.Response = None,
):
super().__init__(internal_message, message, failure_type, exception)
self.response = response

class HubspotTimeout(HubspotError):

class HubspotTimeout(HTTPError):
"""502/504 HubSpot has processing limits in place to prevent a single client from causing degraded performance,
and these responses indicate that those limits have been hit. You'll normally only see these timeout responses
when making a large number of requests over a sustained period. If you get one of these responses,
Expand All @@ -35,6 +49,10 @@ class HubspotRateLimited(HubspotError):
"""429 Rate Limit Reached"""


class HubspotBadRequest(HubspotError):
"""400 Bad Request"""


class InvalidStartDateConfigError(Exception):
"""Raises when the User inputs wrong or invalid `start_date` in inout configuration"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
try:
contacts = Contacts(**common_params)
_ = contacts.properties
except HubspotInvalidAuth:
alive = False
error_msg = "Authentication failed: Please check if provided credentials are valid and try again."
except HTTPError as error:
alive = False
error_msg = repr(error)
if error.response.status_code == HTTPStatus.BAD_REQUEST:
response_json = error.response.json()
error_msg = f"400 Bad Request: {response_json['message']}, please check if provided credentials are valid."
except HubspotInvalidAuth as e:
alive = False
error_msg = repr(e)
return alive, error_msg

def get_granted_scopes(self, authenticator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pendulum as pendulum
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import FailureType
from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
Expand All @@ -25,9 +26,17 @@
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils import AirbyteTracedException
from requests import HTTPError, codes
from source_hubspot.constants import OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout, InvalidStartDateConfigError
from source_hubspot.errors import (
HubspotAccessDenied,
HubspotBadRequest,
HubspotInvalidAuth,
HubspotRateLimited,
HubspotTimeout,
InvalidStartDateConfigError,
)
from source_hubspot.helpers import APIv1Property, APIv3Property, GroupByKey, IRecordPostProcessor, IURLPropertyRepresentation, StoreAsIs

# we got this when provided API Token has incorrect format
Expand Down Expand Up @@ -183,20 +192,27 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M
if response.headers.get("content-type") == "application/json;charset=utf-8" and response.status_code != HTTPStatus.OK:
message = response.json().get("message")

if response.status_code == HTTPStatus.FORBIDDEN:
"""Once hit the forbidden endpoint, we return the error message from response."""
pass
if response.status_code == HTTPStatus.BAD_REQUEST:
message = f"Request to {response.url} didn't succeed. Please verify your credentials and try again.\nError message from Hubspot API: {message}"
raise HubspotBadRequest(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code == HTTPStatus.FORBIDDEN:
message = f"The authenticated user does not have permissions to access the URL {response.url}. Verify your permissions to access this endpoint."
raise HubspotAccessDenied(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code in (HTTPStatus.UNAUTHORIZED, CLOUDFLARE_ORIGIN_DNS_ERROR):
raise HubspotInvalidAuth(message, response=response)
message = (
"The user cannot be authorized with provided credentials. Please verify that your credentails are valid and try again."
)
raise HubspotInvalidAuth(internal_message=message, failure_type=FailureType.config_error, response=response)
elif response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = response.headers.get("Retry-After")
raise HubspotRateLimited(
f"429 Rate Limit Exceeded: API rate-limit has been reached until {retry_after} seconds."
internal_message=f"You have reached your Hubspot API limit. We will resume replication once after {retry_after} seconds."
" See https://developers.hubspot.com/docs/api/usage-details",
failure_type=FailureType.config_error,
response=response,
)
elif response.status_code in (HTTPStatus.BAD_GATEWAY, HTTPStatus.SERVICE_UNAVAILABLE):
raise HubspotTimeout(message, response=response)
raise HubspotTimeout(message, response)
else:
response.raise_for_status()

Expand Down Expand Up @@ -462,7 +478,11 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []
except requests.exceptions.HTTPError as e:
raise e
response = e.response
if response.status_code == HTTPStatus.UNAUTHORIZED:
raise AirbyteTracedException("The authentication to HubSpot has expired. Re-authenticate to restore access to HubSpot.")
else:
raise e

def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pendulum
import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type
from source_hubspot.errors import HubspotRateLimited, InvalidStartDateConfigError
from source_hubspot.errors import HubspotAccessDenied, HubspotRateLimited, InvalidStartDateConfigError
from source_hubspot.helpers import APIv3Property
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, Companies, Deals, Engagements, MarketingEmails, Products, Stream
Expand Down Expand Up @@ -179,10 +179,11 @@ def test_stream_forbidden(requests_mock, config, caplog):
}
)

records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records
with pytest.raises(HubspotAccessDenied):
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


def test_parent_stream_forbidden(requests_mock, config, caplog, fake_properties_list):
Expand Down Expand Up @@ -219,10 +220,11 @@ def test_parent_stream_forbidden(requests_mock, config, caplog, fake_properties_
}
)

records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records
with pytest.raises(HubspotAccessDenied):
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


class TestSplittingPropertiesFunctionality:
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ There are two types of incremental sync:

1. Incremental (standard server-side, where API returns only the data updated or generated since the last sync)
2. Client-Side Incremental (API returns all available data and connector filters out only new records)
:::
:::

## Supported streams

Expand Down Expand Up @@ -204,6 +204,7 @@ Now that you have set up the Hubspot source connector, check out the following H

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1.3.0 | 2023-08-01 | [28909](https://github.com/airbytehq/airbyte/pull/28909) | Add handling of source connection errors |
| 1.2.0 | 2023-07-27 | [27091](https://github.com/airbytehq/airbyte/pull/27091) | Add new stream `ContactsMergedAudit` |
| 1.1.2 | 2023-07-27 | [28558](https://github.com/airbytehq/airbyte/pull/28558) | Improve error messages during connector setup |
| 1.1.1 | 2023-07-25 | [28705](https://github.com/airbytehq/airbyte/pull/28705) | Fix retry handler for token expired error |
Expand Down

0 comments on commit 7944410

Please sign in to comment.