Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Source Klaviyo: add state_checkpoint_interval #32291

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def read_records(
stream_state: Optional[Mapping[str, Any]] = None,
) -> Iterable[StreamData]:
stream_state = stream_state or {}
starting_point = stream_state.get(self.cursor_field) or self._start_ts
starting_point = stream_state.get(self.cursor_field, self._start_ts)
for record in super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
Expand Down Expand Up @@ -237,11 +237,12 @@ def read_records(


class Profiles(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get_profiles"""
"""Docs: https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles"""

cursor_field = "updated"
api_revision = "2023-02-22"
page_size = 100
state_checkpoint_interval = 100 # API can return maximum 100 records per page
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit afraid of this change as we don't use sorting in the API endpoints. How can we validate that the records are emitted in the order of the cursor field? If they are not, this is dangerous as we could have:

  • emit record with cursor value 10
  • emit a state message with cursor value 10
  • emit record with cursor value 5
  • the sync crash

In that case, we wouldn't lose the record with cursor value 5 but we can see that if the crash happens just before, we would have lost the record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is only applied to the streams which have sorting by cursor field at the API level - we add sorting param here: https://github.com/airbytehq/airbyte/pull/32291/files#diff-eb5f2206f720a1d96317e5a8b62e21de2eaa12b401666efccb798362b337eaf3R148

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given all the checkpoint_interval you added are for IncrementalKlaviyoStream, everything looks good then.

One non blocking question though: why the comment next to the checkpoint interval? How is the page size linked to the checkpoint_interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector is emitting its state after every page it's read. If the page is not full, the state will be emitted at the end of the read, otherwise, since pages are sorted by cursor, we can checkpointing after each page.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has been changed somewhat recently : we are emitting a state message after X number of records not after X number of records in a slice. Does that change the way you look at this problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since there are no overridden stream_slices


def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return "profiles"
Expand Down Expand Up @@ -269,7 +270,7 @@ def path(self, **kwargs) -> str:

class GlobalExclusions(Profiles):
"""
Docs: https://developers.klaviyo.com/en/reference/get_profiles
Docs: https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles
This stream takes data from 'profiles' endpoint, but suppressed records only
"""

Expand All @@ -293,6 +294,7 @@ class Events(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get_events"""

cursor_field = "datetime"
state_checkpoint_interval = 200 # API can return maximum 200 records per page

def path(self, **kwargs) -> str:
return "events"
Expand All @@ -302,6 +304,7 @@ class Flows(ArchivedRecordsMixin, IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get_flows"""

cursor_field = "updated"
state_checkpoint_interval = 50 # API can return maximum 50 records per page

def path(self, **kwargs) -> str:
return "flows"
Expand All @@ -311,6 +314,7 @@ class EmailTemplates(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/get_templates"""

cursor_field = "updated"
state_checkpoint_interval = 10 # API can return maximum 10 records per page

def path(self, **kwargs) -> str:
return "templates"
18 changes: 10 additions & 8 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ This page contains the setup guide and reference information for the Klaviyo sou
### Step 1: Set up Klaviyo

1. Create a [Klaviyo account](https://www.klaviyo.com)
2. Create a [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). Make sure you selected all [scopes](https://help.klaviyo.com/hc/en-us/articles/7423954176283) corresponding to the streams you would like to replicate.
2. Create a [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). Make sure you selected all [scopes](https://help.klaviyo.com/hc/en-us/articles/7423954176283) corresponding to the streams you would like to replicate. You can find which scope is required for a specific stream by navigating to the relevant API documentation for the streams Airbyte supports.

### Step 2: Set up the Klaviyo connector in Airbyte

1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account.
2. Click **Sources** and then click **+ New source**.
2. Click **Sources** and then click **+ new source**.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locally, I have a different behavior than prod hence, I can't validate this fix.

Locally
image

Prod
image

How does removing the capital N removes the bullet list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me either, unfortunately. This is just my assumption based on what I've seen on other connectors which don't have such issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! Keep me posted on the result of this change and we'll update the checklist based on that. Thanks!

3. On the Set up the source page, select **Klaviyo** from the **Source type** dropdown.
4. Enter a name for the Klaviyo connector.
5. For **Api Key**, enter the Klaviyo [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3).
Expand All @@ -35,13 +35,14 @@ The Klaviyo source connector supports the following [sync modes](https://docs.ai

## Supported Streams

- [Campaigns](https://developers.klaviyo.com/en/v1-2/reference/get-campaigns#get-campaigns)
- [Events](https://developers.klaviyo.com/en/v1-2/reference/metrics-timeline)
- [GlobalExclusions](https://developers.klaviyo.com/en/v1-2/reference/get-global-exclusions)
- [Lists](https://developers.klaviyo.com/en/v1-2/reference/get-lists)
- [Metrics](https://developers.klaviyo.com/en/v1-2/reference/get-metrics)
- [Campaigns](https://developers.klaviyo.com/en/v2023-06-15/reference/get_campaigns)
- [Email Templates](https://developers.klaviyo.com/en/reference/get_templates)
- [Events](https://developers.klaviyo.com/en/reference/get_events)
- [Flows](https://developers.klaviyo.com/en/reference/get_flows)
- [Profiles](https://developers.klaviyo.com/en/reference/get_profiles)
- [GlobalExclusions](https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles)
- [Lists](https://developers.klaviyo.com/en/reference/get_lists)
- [Metrics](https://developers.klaviyo.com/en/reference/get_metrics)
- [Profiles](https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles)

## Performance considerations

Expand All @@ -62,6 +63,7 @@ The Klaviyo connector should not run into Klaviyo API limitations under normal u

| Version | Date | Pull Request | Subject |
|:---------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------|
| `2.0.1` | 2023-11-08 | [32291](https://github.com/airbytehq/airbyte/pull/32291) | Add logic to have regular checkpointing schedule |
| `2.0.0` | 2023-11-03 | [32128](https://github.com/airbytehq/airbyte/pull/32128) | Use the latest API for streams `campaigns`, `email_templates`, `events`, `flows`, `global_exclusions`, `lists`, and `metrics`|
| `1.1.0` | 2023-10-23 | [31710](https://github.com/airbytehq/airbyte/pull/31710) | Make `start_date` config field optional |
| `1.0.0` | 2023-10-18 | [31565](https://github.com/airbytehq/airbyte/pull/31565) | added new known fields for 'events' stream |
Expand Down
Loading