Skip to content

Commit

Permalink
🎉 Source Hubspot: Update Tickets, fix missing properties and change h…
Browse files Browse the repository at this point in the history
…ow state is updated. (#16214)

* fix: missing properties

* update: change Tickets stream to use SearchStream

* WIP: change how state is update

* fix unit tests

* update docs and dockerfile

* update: docs and add comments to code to explain the new state strategy

* update hubspot.md

* update: redo how state is saved

* fix: unit tests

* remove trivial log

* fix: add safe way to save last_slice

* fix: unit tests

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
vladimir-remar and octavia-squidington-iii authored Sep 9, 2022
1 parent 108b9d3 commit 30ac38e
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.82
dockerImageTag: 0.1.83
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4315,7 +4315,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.82"
- dockerImage: "airbyte/source-hubspot:0.1.83"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.82
LABEL io.airbyte.version=0.1.83
LABEL io.airbyte.name=airbyte/source-hubspot
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,13 @@ def _cast_record_fields_if_needed(self, record: Mapping, properties: Mapping[str
properties = properties or self.properties

for field_name, field_value in record["properties"].items():
if field_name not in properties:
self.logger.info(
"Property discarded: not maching with properties schema: record id:{}, property_value: {}".format(
record.get("id"), field_name
)
)
continue
declared_field_types = properties[field_name].get("type", [])
if not isinstance(declared_field_types, Iterable):
declared_field_types = [declared_field_types]
Expand Down Expand Up @@ -702,6 +709,7 @@ class IncrementalStream(Stream, ABC):
# False -> chunk size is max (only one slice), True -> chunk_size is 30 days
need_chunk = True
state_checkpoint_interval = 500
last_slice = None

@property
def cursor_field(self) -> Union[str, List[str]]:
Expand All @@ -725,7 +733,11 @@ def read_records(
cursor = self._field_to_datetime(record[self.updated_at_field])
latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor
yield record
self._update_state(latest_cursor=latest_cursor)

is_last_slice = False
if self.last_slice:
is_last_slice = stream_slice == self.last_slice
self._update_state(latest_cursor=latest_cursor, is_last_record=is_last_slice)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return self.state
Expand Down Expand Up @@ -756,14 +768,24 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._state = None
self._sync_mode = None
self._init_sync = pendulum.now("utc")

def _update_state(self, latest_cursor):
def _update_state(self, latest_cursor, is_last_record=False):
"""
The first run uses an endpoint that is not sorted by updated_at but is
sorted by id because of this instead of updating the state by reading
the latest cursor the state will set it at the end with the time the synch
started. With the proposed `state strategy`, it would capture all possible
updated entities in incremental synch.
"""
if latest_cursor:
new_state = max(latest_cursor, self._state) if self._state else latest_cursor
if new_state != self._state:
logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}")
self._state = new_state
self._start_date = self._state
if is_last_record:
self._state = self._init_sync

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
Expand All @@ -785,6 +807,9 @@ def stream_slices(
"endTimestamp": end_ts,
}
)
# Save the last slice to ensure we save the lastest state as the initial sync date
if len(slices) > 0:
self.last_slice = slices[-1]

return slices

Expand Down Expand Up @@ -921,7 +946,9 @@ def read_records(
self._update_state(latest_cursor=latest_cursor)
next_page_token = None

self._update_state(latest_cursor=latest_cursor)
# Since Search stream does not have slices is safe to save the latest
# state as the initial sync date
self._update_state(latest_cursor=latest_cursor, is_last_record=True)
# Always return an empty generator just in case no records were ever yielded
yield from []

Expand Down Expand Up @@ -1265,7 +1292,7 @@ def read_records(
# Always return an empty generator just in case no records were ever yielded
yield from []

self._update_state(latest_cursor=latest_cursor)
self._update_state(latest_cursor=latest_cursor, is_last_record=True)


class Forms(Stream):
Expand Down Expand Up @@ -1520,11 +1547,12 @@ class Products(CRMObjectIncrementalStream):
scopes = {"e-commerce"}


class Tickets(CRMObjectIncrementalStream):
class Tickets(CRMSearchStream):
entity = "ticket"
associations = ["contacts", "deals", "companies"]
primary_key = "id"
scopes = {"tickets"}
last_modified_field = "hs_lastmodifieddate"


class Quotes(CRMObjectIncrementalStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req

# Create test_stream instance with some state
test_stream = Companies(**common_params)
test_stream._init_sync = pendulum.parse("2022-02-24T16:43:11Z")
test_stream.state = {"updatedAt": "2022-02-24T16:43:11Z"}

# Mocking Request
Expand All @@ -412,7 +413,7 @@ def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(req
# The stream should not attempt to get more than 10K records.
# Instead, it should use the new state to start a new search query.
assert len(records) == 11000
assert test_stream.state["updatedAt"] == "2022-03-01T00:00:00Z"
assert test_stream.state["updatedAt"] == test_stream._init_sync.to_iso8601_string()


def test_engagements_stream_pagination_works(requests_mock, common_params):
Expand Down Expand Up @@ -488,12 +489,13 @@ def test_engagements_stream_pagination_works(requests_mock, common_params):
records = read_full_refresh(test_stream)
# The stream should handle pagination correctly and output 600 records.
assert len(records) == 600
assert test_stream.state["lastUpdated"] == 1641234595251
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)

test_stream = Engagements(**common_params)
records, _ = read_incremental(test_stream, {})
# The stream should handle pagination correctly and output 600 records.
# The stream should handle pagination correctly and output 250 records.
assert len(records) == 250
assert test_stream.state["lastUpdated"] == 1641234595252
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)


def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, common_params, fake_properties_list):
Expand All @@ -517,10 +519,9 @@ def test_incremental_engagements_stream_stops_at_10K_records(requests_mock, comm
# Create test_stream instance with some state
test_stream = Engagements(**common_params)
test_stream.state = {"lastUpdated": 1641234595251}

# Mocking Request
requests_mock.register_uri("GET", "/engagements/v1/engagements/recent/modified?hapikey=test_api_key&count=100", responses)
records, _ = read_incremental(test_stream, {})
# The stream should not attempt to get more than 10K records.
assert len(records) == 10000
assert test_stream.state["lastUpdated"] == +1641234595252
assert test_stream.state["lastUpdated"] == int(test_stream._init_sync.timestamp() * 1000)
6 changes: 6 additions & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ Objects in the `engagements` stream can have one of the following types: `note`,
HubSpot API currently only supports `quotes` endpoint using API Key, using OAuth it is impossible to access this stream (as reported by [community.hubspot.com](https://community.hubspot.com/t5/APIs-Integrations/Help-with-using-Feedback-CRM-API-and-Quotes-CRM-API/m-p/449104/highlight/true#M44411)).
:::

### New state strategy on Incremental streams

Due to some data loss because an entity was updated during the synch, instead of updating the state by reading the latest record the state will be save with the initial synch time. With the proposed `state strategy`, it would capture all possible updated entities in incremental synch.


## Performance considerations

The connector is restricted by normal HubSpot [rate limitations](https://legacydocs.hubspot.com/apps/api_guidelines).
Expand Down Expand Up @@ -129,6 +134,7 @@ Now that you have set up the Hubspot source connector, check out the following H

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.83 | 2022-09-01 | [16214](https://github.com/airbytehq/airbyte/pull/16214) | Update Tickets, fix missing properties and change how state is updated. |
| 0.1.82 | 2022-08-18 | [15110](https://github.com/airbytehq/airbyte/pull/15110) | Check if it has a state on search streams before first sync |
| 0.1.81 | 2022-08-05 | [15354](https://github.com/airbytehq/airbyte/pull/15354) | Fix `Deals` stream schema |
| 0.1.80 | 2022-08-01 | [15156](https://github.com/airbytehq/airbyte/pull/15156) | Fix 401 error while retrieving associations using OAuth |
Expand Down

0 comments on commit 30ac38e

Please sign in to comment.