Skip to content

Commit

Permalink
Source Hubspot: turn on default HttpAvailabilityStrategy (#22479)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Feb 9, 2023
1 parent 17c77fc commit e2100c4
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6521,7 +6521,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.3.1"
- dockerImage: "airbyte/source-hubspot:0.3.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/source-hubspot

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@

import logging
from itertools import chain
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, List, Mapping, Optional, Tuple

import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.utils.schema_helpers import split_config
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import HTTPError
from source_hubspot.streams import (
API,
Expand Down Expand Up @@ -136,51 +131,3 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
available_streams = streams

return available_streams

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""
This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream.
"""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
self._stream_to_instance_map = stream_instances
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance and configured_stream.stream.name == "quotes":
logger.warning("Stream `quotes` does not exist in the source. Skip reading `quotes` stream.")
continue
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
)

try:
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
finally:
logger.info(f"Finished syncing {self.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -206,14 +205,9 @@ class Stream(HttpStream, ABC):
primary_key = None
filter_old_records: bool = True
denormalize_records: bool = False # one record from API response can result in multiple records emitted
raise_on_http_errors: bool = True
granted_scopes: Set = None
properties_scopes: Set = None

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

@property
@abstractmethod
def scopes(self) -> Set[str]:
Expand Down Expand Up @@ -263,12 +257,6 @@ def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credenti
if creds_title in (OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS):
self._authenticator = api.get_authenticator()

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == HTTPStatus.FORBIDDEN:
setattr(self, "raise_on_http_errors", False)
logger.warning("You have not permission to API for this stream. " "Please check your scopes for Hubspot account.")
return super().should_retry(response)

def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == codes.too_many_requests:
return float(response.headers.get("Retry-After", 3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ def some_credentials_fixture():
return {"credentials_title": "Private App Credentials", "access_token": "wrong token"}


@pytest.fixture(name="creds_with_wrong_permissions")
def creds_with_wrong_permissions():
return {"credentials_title": "Private App Credentials", "access_token": "THIS-IS-THE-ACCESS_TOKEN"}


@pytest.fixture(name="fake_properties_list")
def fake_properties_list():
return [f"property_number_{i}" for i in range(NUMBER_OF_PROPERTIES)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from source_hubspot.errors import HubspotRateLimited
from source_hubspot.helpers import APIv3Property
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream, Workflows
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream

from .utils import read_full_refresh, read_incremental

Expand Down Expand Up @@ -134,38 +134,30 @@ def test_check_connection_backoff_on_server_error(requests_mock, config):
assert not error


def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions, common_params, caplog):
"""
Error with API Key Permissions to particular stream,
typically this issue raises along with calling `workflows` stream with API Key
that doesn't have required permissions to read the stream.
"""

# Mapping tipical response for mocker
def test_stream_forbidden(requests_mock, config, caplog):
json = {
"status": "error",
"message": f'This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
"correlationId": "2fe0a9af-3609-45c9-a4d7-83a1774121aa",
}

# We expect something like this
expected_warining_message = {
"type": "LOG",
"log": {
"level": "WARN",
"message": f'Stream `workflows` cannot be procced. This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
},
"message": "This access_token does not have proper permissions!",
}
requests_mock.get("https://api.hubapi.com/automation/v3/workflows", json=json, status_code=403)

# Create test_stream instance
test_stream = Workflows(**common_params)

# Mocking Request
requests_mock.register_uri("GET", test_stream.url, json=json, status_code=403)
records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
catalog = ConfiguredAirbyteCatalog.parse_obj({
"streams": [
{
"stream": {
"name": "workflows",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
})

# match logged expected logged warning message with output given from preudo-output
assert expected_warining_message["log"]["message"] in caplog.text
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


Expand Down Expand Up @@ -328,17 +320,6 @@ def configured_catalog_fixture():
return ConfiguredAirbyteCatalog.parse_obj(configured_catalog)


def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_config, configured_catalog):
"""
If 'quotes' stream is not in the client, it should skip it.
"""
source = SourceHubspot()

all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None))
records = [record for record in all_records if record.type == Type.RECORD]
assert not records


def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ Now that you have set up the Hubspot source connector, check out the following H

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.1 | 2023-01-27 | [22009](https://github.com/airbytehq/airbyte/pull/22009) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.3.0 | 2022-10-27 | [18546](https://github.com/airbytehq/airbyte/pull/18546) | Sunsetting API Key authentication. `Quotes` stream is no longer available |
| 0.3.2 | 2023-02-07 | [22479](https://github.com/airbytehq/airbyte/pull/22479) | Turn on default HttpAvailabilityStrategy |
| 0.3.1 | 2023-01-27 | [22009](https://github.com/airbytehq/airbyte/pull/22009) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.3.0 | 2022-10-27 | [18546](https://github.com/airbytehq/airbyte/pull/18546) | Sunsetting API Key authentication. `Quotes` stream is no longer available |
| 0.2.2 | 2022-10-03 | [16914](https://github.com/airbytehq/airbyte/pull/16914) | Fix 403 forbidden error validation |
| 0.2.1 | 2022-09-26 | [17120](https://github.com/airbytehq/airbyte/pull/17120) | Migrate to per-stream state. |
| 0.2.0 | 2022-09-13 | [16632](https://github.com/airbytehq/airbyte/pull/16632) | Remove Feedback Submissions stream as the one using unstable (beta) API. |
Expand Down

0 comments on commit e2100c4

Please sign in to comment.