Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Hubspot: implement new stream to read associations in incremental mode #15099

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.77
LABEL io.airbyte.version=0.1.78
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json

import pytest


@pytest.fixture(scope="session", name="config")
def config_fixture():
with open("secrets/config.json", "r") as config_file:
return json.load(config_file)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging

import pytest
from airbyte_cdk.models import ConfiguredAirbyteCatalog, Type
from source_hubspot.source import SourceHubspot


@pytest.fixture
def source():
return SourceHubspot()


@pytest.fixture
def associations(config, source):
streams = source.streams(config)
return {stream.name: getattr(stream, "associations", []) for stream in streams}


@pytest.fixture
def configured_catalog(config, source):
streams = source.streams(config)
return {
"streams": [
{
"stream": stream.as_airbyte_stream(),
"sync_mode": "incremental",
"cursor_field": [stream.cursor_field],
"destination_sync_mode": "append",
}
for stream in streams
if stream.supports_incremental and getattr(stream, "associations", [])
]
}


def test_incremental_read_fetches_associations(config, configured_catalog, source, associations):
messages = source.read(logging.getLogger("airbyte"), config, ConfiguredAirbyteCatalog.parse_obj(configured_catalog), {})

association_found = False
for message in messages:
if message and message.type != Type.RECORD:
continue
record = message.record
stream, data = record.stream, record.data
# assume at least one association id is present
stream_associations = associations[stream]
for association in stream_associations:
if data.get(association):
association_found = True
break
assert association_found
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_api(config: Mapping[str, Any]) -> API:
return API(credentials=credentials)

def get_common_params(self, config) -> Mapping[str, Any]:
start_date = config.get("start_date")
start_date = config["start_date"]
credentials = config["credentials"]
api = self.get_api(config=config)
common_params = dict(api=api, start_date=start_date, credentials=credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,15 @@ def _property_wrapper(self) -> IURLPropertyRepresentation:
return APIv1Property(properties)
return APIv3Property(properties)

def __init__(self, api: API, start_date: str = None, credentials: Mapping[str, Any] = None, **kwargs):
def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credentials: Mapping[str, Any] = None, **kwargs):
super().__init__(**kwargs)
self._api: API = api
self._start_date = pendulum.parse(start_date)
self._credentials = credentials

if credentials["credentials_title"] == API_KEY_CREDENTIALS:
self._start_date = start_date
if isinstance(self._start_date, str):
self._start_date = pendulum.parse(self._start_date)
if self._credentials["credentials_title"] == API_KEY_CREDENTIALS:
self._session.params["hapikey"] = credentials.get("api_key")

def backoff_time(self, response: requests.Response) -> Optional[float]:
Expand Down Expand Up @@ -642,6 +645,51 @@ def _flat_associations(self, records: Iterable[MutableMapping]) -> Iterable[Muta
yield record


class AssociationsStream(Stream):
"""
Designed to read associations of CRM objects during incremental syncs, since Search API does not support
retrieving associations.
"""

http_method = "POST"
filter_old_records = False

def __init__(self, parent_stream: Stream, identifiers: Iterable[Union[int, str]], *args, **kwargs):
super().__init__(*args, **kwargs)
self.parent_stream = parent_stream
self.identifiers = identifiers

@property
def url(self):
"""
although it is not used, it needs to be implemented because it is an abstract property
"""
return ""

def path(
self,
*,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> str:
return f"/crm/v4/associations/{self.parent_stream.entity}/{stream_slice}/batch/read"

def scopes(self) -> Set[str]:
return self.parent_stream.scopes

def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[str]:
return self.parent_stream.associations

def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
return {"inputs": [{"id": str(id_)} for id_ in self.identifiers]}


class IncrementalStream(Stream, ABC):
"""Stream that supports state and incremental read"""

Expand Down Expand Up @@ -807,6 +855,24 @@ def _process_search(

return list(stream_records.values()), raw_response

def _read_associations(self, records: Iterable, identifiers: Iterable[Union[int, str]]) -> Iterable[Mapping[str, Any]]:
records_by_pk = {record[self.primary_key]: record for record in records}
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
identifiers = list(identifiers)
associations_stream = AssociationsStream(
api=self._api, start_date=self._start_date, credentials=self._credentials, parent_stream=self, identifiers=identifiers
)
slices = associations_stream.stream_slices(sync_mode=SyncMode.full_refresh)

for _slice in slices:
davydov-d marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Reading {_slice} associations of {self.entity}")
associations = associations_stream.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
for group in associations:
current_record = records_by_pk[group["from"]["id"]]
associations_list = current_record.get(_slice, [])
associations_list.extend(association["toObjectId"] for association in group["to"])
current_record[_slice] = associations_list
return records_by_pk.values()

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -826,15 +892,16 @@ def read_records(
stream_state=stream_state,
stream_slice=stream_slice,
)

identifiers = map(lambda x: x[self.primary_key], records)
records = self._read_associations(records, identifiers)
else:
records, raw_response = self._read_stream_records(
stream_slice=stream_slice,
stream_state=stream_state,
next_page_token=next_page_token,
)
records = self._flat_associations(records)
records = self._filter_old_records(records)
records = self._flat_associations(records)

for record in records:
cursor = self._field_to_datetime(record[self.updated_at_field])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_check_connection_empty_config(config):
def test_check_connection_invalid_config(config):
config.pop("start_date")

with pytest.raises(TypeError):
with pytest.raises(KeyError):
SourceHubspot().check_connection(logger, config=config)


Expand Down Expand Up @@ -406,6 +406,8 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
requests_mock.register_uri("POST", test_stream.url, responses)
test_stream._sync_mode = None
requests_mock.register_uri("GET", "/properties/v2/company/properties", properties_response)
requests_mock.register_uri("POST", "/crm/v4/associations/company/contacts/batch/read", [{"status_code": 200, "json": {"results": []}}])

records, _ = read_incremental(test_stream, {})
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Now that you have set up the HubSpot source connector, check out the following H

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.78 | 2022-07-28 | [15099](https://github.com/airbytehq/airbyte/pull/15099) | Fix to fetch associations when using incremental mode |
| 0.1.77 | 2022-07-26 | [15035](https://github.com/airbytehq/airbyte/pull/15035) | Make PropertyHistory stream read historic data not limited to 30 days |
| 0.1.76 | 2022-07-25 | [14999](https://github.com/airbytehq/airbyte/pull/14999) | Partially revert changes made in v0.1.75 |
| 0.1.75 | 2022-07-18 | [14744](https://github.com/airbytehq/airbyte/pull/14744) | Remove override of private CDK method |
Expand Down