Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source HubSpot: New stream for contacts merged audit #27091

Merged
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
e3a83ed
feat: add ContactsMergedAudit stream
aballiet Jun 6, 2023
ac82f04
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 6, 2023
9847b4f
Update hubspot.md
aballiet Jun 6, 2023
b3a2f0d
apply formatting
aballiet Jun 6, 2023
c90a2ab
Add to source
aballiet Jun 7, 2023
e0a6eef
Update catalog
aballiet Jun 7, 2023
d55d416
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 7, 2023
12cd3c6
bump version in metadata.yaml
sh4sh Jun 7, 2023
2aaf7c9
Merge branch 'master' into source-hubspot-capture-merge-contacts
sh4sh Jun 7, 2023
0b962ed
use vid-to-merge as primary key
aballiet Jun 8, 2023
6ff8c57
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 8, 2023
c642f26
fix format
aballiet Jun 8, 2023
4757644
add additionalProperties prop in schema
aballiet Jun 8, 2023
05391d1
refactor by creating functions
aballiet Jun 8, 2023
a8ff359
More details about API version choices
aballiet Jun 8, 2023
9cf2a11
Add docstring
aballiet Jun 8, 2023
a3f1548
more explicit properties definition
aballiet Jun 9, 2023
89e78bf
unit testing
aballiet Jun 9, 2023
f5b7807
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 9, 2023
d1bdc77
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jun 9, 2023
3b689fa
fix: modify expected_records for contacts_merged_audit stream
sajarin Jun 9, 2023
b2fe95b
Add abnormal state for contacts_merged_audit stream
aballiet Jun 9, 2023
35d8d56
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 13, 2023
689ae51
Update README.md
aballiet Jun 13, 2023
6c0e271
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 16, 2023
d439323
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 20, 2023
1634cc0
WIP substream
aballiet Jun 21, 2023
a44825f
Apply formatting
aballiet Jun 21, 2023
773c9d7
remove
aballiet Jun 21, 2023
0e6a748
Merge branch 'airbytehq:master' into source-hubspot-capture-merge-con…
aballiet Jun 26, 2023
7b2652c
fix parent child stream
aballiet Jun 28, 2023
09cca0b
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 28, 2023
9c0b656
remove diff title bar changelog
aballiet Jun 28, 2023
80ef479
fix unit tests
aballiet Jun 28, 2023
16fe870
improve naming
aballiet Jun 28, 2023
16a9286
Remove unused import
aballiet Jun 28, 2023
5a83fde
updated schema
aballiet Jun 28, 2023
778e2de
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 29, 2023
00b2e56
simplify batch creation
aballiet Jun 29, 2023
3cd2f8e
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jun 30, 2023
6a320d0
Merge branch 'master' into source-hubspot-capture-merge-contacts
marcosmarxm Jul 7, 2023
a0ae86b
fix stream which doesnt allow incremental
marcosmarxm Jul 7, 2023
fa14106
dockerfile
marcosmarxm Jul 7, 2023
9421918
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 10, 2023
0d05b9c
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 10, 2023
4b891ba
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 11, 2023
19e0c06
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 11, 2023
d00b775
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 19, 2023
b3f460d
Merge remote-tracking branch 'upstream/master' into source-hubspot-ca…
aballiet Jul 20, 2023
0bd70da
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 20, 2023
4cb220c
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 20, 2023
c04c7ba
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 20, 2023
4f46ea7
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 21, 2023
78ca516
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 24, 2023
deaa85e
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 24, 2023
eb89c68
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 25, 2023
6c53c5d
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 26, 2023
81fb2d5
Merge branch 'master' into source-hubspot-capture-merge-contacts
aballiet Jul 26, 2023
7d16fbc
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 26, 2023
a122059
fix: updated expected_records for contacts stream
sajarin Jul 26, 2023
c23fd43
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 26, 2023
6043cda
docs: bump metadata.yml file and update connector changelog
sajarin Jul 27, 2023
5302152
Merge branch 'master' into source-hubspot-capture-merge-contacts
sajarin Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte-integrations/connectors/source-hubspot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ The primary key for the following streams is `pipelineId`:

- deal_pipelines

The primary key for the following streams is `vid-to-merge`:

- contacts_merged_audit

The following streams do not have a primary key:

- contact_lists (The primary key could potentially be a composite key (portalId, listId) - https://legacydocs.hubspot.com/docs/methods/lists/get_lists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
}
}
},
{
"type": "STREAM",
"stream": {
"stream_descriptor": {
"name": "contacts_merged_audit"
},
"stream_state": {
"updatedAt": "2221-10-12T13:37:56.412000+00:00"
}
}
},
{
"type": "STREAM",
"stream": {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "contacts_merged_audit",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "contacts_merged_audit",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "contacts_merged_audit",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "deal_pipelines",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "contacts_merged_audit",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"canonical-vid": {
"type": ["null", "integer"]
},
"vid-to-merge": {
"type": ["null", "integer"]
},
"timestamp": {
"type": ["null", "integer"]
},
"entity-id": {
"type": ["null", "string"]
},
"user-id": {
"type": ["null", "integer"]
},
"num-properties-moved": {
"type": ["null", "integer"]
},
"merged_from_email": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"source-vids": {
"type": ["null", "array"],
"items": {
"type": ["null", "integer"]
}
},
"updated-by-user-id": {
"type": ["null", "integer"]
},
"source-label": {
"type": ["null", "string"]
},
"source-type": {
"type": ["null", "string"]
},
"value": {
"type": ["null", "string"]
},
"source-id": {
"type": ["null", "string"]
},
"selected": {
"type": ["null", "boolean"]
},
"timestamp": {
"type": ["null", "integer"]
}
}
},
"merged_to_email": {
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"updated-by-user-id": {
"type": ["null", "integer"]
},
"source-label": {
"type": ["null", "string"]
},
"source-type": {
"type": ["null", "string"]
},
"value": {
"type": ["null", "string"]
},
"source-id": {
"type": ["null", "string"]
},
"selected": {
"type": ["null", "boolean"]
},
"timestamp": {
"type": ["null", "integer"]
}
}
},
"first-name": {
"type": ["null", "string"]
},
"last-name": {
"type": ["null", "string"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ContactLists,
Contacts,
ContactsListMemberships,
ContactsMergedAudit,
CustomObject,
DealPipelines,
Deals,
Expand Down Expand Up @@ -93,6 +94,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
ContactLists(**common_params),
Contacts(**common_params),
ContactsListMemberships(**common_params),
ContactsMergedAudit(**common_params),
DealPipelines(**common_params),
Deals(**common_params),
DealsArchived(**common_params),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pendulum as pendulum
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import IncrementalMixin, Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
Expand Down Expand Up @@ -1775,6 +1775,74 @@ class Contacts(CRMSearchStream):
scopes = {"crm.objects.contacts.read"}


class ContactsMergedAudit(Stream):

url = "/contacts/v1/contact/vids/batch/"
updated_at_field = "timestamp"
scopes = {"crm.objects.contacts.read"}

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.config = kwargs

def get_json_schema(self) -> Mapping[str, Any]:
"""Override get_json_schema defined in Stream class
Final object does not have properties field
We return JSON schema as defined in :
source_hubspot/schemas/contacts_merged_audit.json
"""
return super(Stream, self).get_json_schema()

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:

slices = []

# we can query a max of 100 contacts at a time
max_contacts = 100
slices = []
contact_batch = []

contacts = Contacts(**self.config)
contacts._sync_mode = SyncMode.full_refresh
contacts.filter_old_records = False

for contact in contacts.read_records(sync_mode=SyncMode.full_refresh):
if contact["properties"].get("hs_merged_object_ids"):
contact_batch.append(contact["id"])

if len(contact_batch) == max_contacts:
slices.append({"vid": contact_batch})
contact_batch = []

if contact_batch:
slices.append({"vid": contact_batch})

return slices

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return {"vid": stream_slice["vid"]}

def parse_response(
self,
response: requests.Response,
*,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
response = self._parse_response(response)
if response.get("status", None) == "error":
self.logger.warning(f"Stream `{self.name}` cannot be procced. {response.get('message')}")
return

for contact_id in list(response.keys()):
yield from response[contact_id]["merge-audits"]


class EngagementsCalls(CRMSearchStream):
entity = "calls"
last_modified_field = "hs_lastmodifieddate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_streams(requests_mock, config):

streams = SourceHubspot().streams(config)

assert len(streams) == 28
assert len(streams) == 29


def test_check_credential_title_exception(config):
Expand Down
Loading