Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hubspot -> Postgres Not Syncing Incrementally #2028

Closed
staufman opened this issue Feb 10, 2021 · 6 comments
Closed

Hubspot -> Postgres Not Syncing Incrementally #2028

staufman opened this issue Feb 10, 2021 · 6 comments

Comments

@staufman
Copy link

Expected Behavior

I perform a sync from Hubspot -> Postgres. After it completes, I sync again without changing anything in Hubspot. In this case, I expect nothing new to be synced over.

Current Behavior

Instead, I am seeing duplicate Contact and Company records synced over.

Steps to Reproduce

  1. Setup a source for Hubspot
  2. Setup a destination for Postgres
  3. Manually sync
  4. Manually sync again
  5. Notice how in your Contacts or Company table, you will have entries with a duplicate vid or companyid, respectively.

image

Severity of the bug for you

High

Airbyte Version

VERSION=0.14.1-alpha

Additional context

N/A

@staufman staufman added the type/bug Something isn't working label Feb 10, 2021
@sherifnada
Copy link
Contributor

sherifnada commented Feb 10, 2021

@staufman thanks for reporting the issue! I believe this is correct albeit confusing behavior from the Hubspot source.

The Hubspot source uses a cursor value (like updated_at when replicating contacts) to checkpoint what it replicated in the past. So when a sync runs again, the source reads records whose cursor value is greater-than-or-equal to this value. This way, when a record is updated, its updated_at value is incremented to something greater than the stored cursor, and the next incremental sync will know to replicate this record.

There are two potentially confusing parts here:

  1. The cursor field updated_at is not the primary key for the entity (vid). This is because updated_at can tell us when a record has been changed and needs to be synced again, whereas vid cannot. Because Airbyte's incremental sync is append-only, records are appended to the destination instead of upserted.
  2. We replicate records whose cursor value is greater-than- or-equal to the last state. The connector does this to guard against eventual consistency in the underlying data source. For example, if airbyte syncs data from Hubspot at 5pm (let's say the sync takes 1 minute), but Hubspot's underlying DB updates at 5:02pm, changing the updated_at field on a record to be 5pm, then we want the next sync to pick that up.

If you want a deduplicated view of your records based on their primary key, you'll need to create a view that dedupes on the primary key and maxes by _airbyte_emitted_at. LMK if there is anything else I can clarify. I'll close this issue otherwise.

@staufman
Copy link
Author

I see. That makes sense. Given the data in an eventual consistency model can grow unbounded, do people typically write cleanup jobs to discard unneeded data?

Also, I'm very much looking forward to this PR (#1491) being released. As it stands, to get the details of a Contact, it looks like we would need to join a vid with the latest _airbyte_emitted_at in contacts with a row in the _airbyte_raw_contacts table with the latest _airbyte_emitted_at. Does that sound right?

@sherifnada
Copy link
Contributor

sherifnada commented Feb 10, 2021

Does that sound right?

That is correct -- we are actively working on #1491 and hope to deliver it soon so you don't have to do this rigamarole :)

do people typically write cleanup jobs to discard unneeded data?

yes, having a "vacuum" query that is run periodically is recommended. It might make sense for us to offer this at the end of each sync (or each N syncs) or as a separate "job" type in Airbyte, but it requires knowledge of primary key, which is something we'll be working on in the near-ish future.

@sherifnada sherifnada added type/question Further information is requested and removed type/bug Something isn't working labels Feb 10, 2021
@sherifnada
Copy link
Contributor

sherifnada commented Feb 11, 2021

@staufman I added an example deduplication query in this doc under the "Getting the latest snapshot" section https://docs.airbyte.io/architecture/incremental#getting-the-latest-snapshot-of-data

@staufman
Copy link
Author

Awesome! Just a heads up that I had to write the query as follows for Postgres (also, I adapted it for my particular situation with contacts). Note: it has been a long time since I last wrote some SQL :)

CREATE VIEW contacts_view AS (SELECT * FROM (SELECT *, MAX(_airbyte_emitted_at) OVER (PARTITION BY vid) as max_emitted_at FROM contacts) AS contacts_paritioned WHERE _airbyte_emitted_at=max_emitted_at);

@michel-tricot
Copy link
Contributor

Super nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants