Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Mailchimp - fix the way request params are built #20765

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mailchimp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ FROM python:3.9-slim
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

WORKDIR /airbyte/integration_code
COPY source_mailchimp ./source_mailchimp
COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_mailchimp ./source_mailchimp
COPY main.py ./

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/source-mailchimp
Original file line number Diff line number Diff line change
@@ -1,66 +1,57 @@
connector_image: airbyte/source-mailchimp:dev
tests:
test_strictness_level: high
acceptance_tests:
spec:
- spec_path: "source_mailchimp/spec.json"
timeout_seconds: 60
tests:
- spec_path: "source_mailchimp/spec.json"
connection:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
status: "succeed"
timeout_seconds: 180
# for auth with API token
- config_path: "secrets/config_apikey.json"
status: "succeed"
timeout_seconds: 180
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
status: "succeed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config.json"
status: "failed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config_apikey.json"
status: "failed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config_oauth.json"
status: "failed"
timeout_seconds: 180
tests:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
status: "succeed"
# for auth with API token
- config_path: "secrets/config_apikey.json"
status: "succeed"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_config_apikey.json"
status: "failed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "failed"
discovery:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
# for auth with API token
- config_path: "secrets/config_apikey.json"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
tests:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
# for auth with API token
- config_path: "secrets/config_apikey.json"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1800
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1800
# THIS TEST IS COMMENTED OUT. Tests are supposed to accept
# `state = {cursor_field: value}`. When we have dependent endpoint path
# `path_begin/{some_id}/path_end` we need a complex state like below:
# `{"id1": {cursor_field: value}, "id2": {cursor_field: value}...}`
# The test currently is not supposed to accept this desired construction,
# so it is commented out

# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/state.json"
# cursor_paths:
# lists: [ "date_created" ]
# campaigns: [ "create_time" ]
# Email_activity: [ "timestamp" ]

tests:
- config_path: "secrets/config.json"
expect_records:
bypass_reason: "Risk to disclose internal data. Need to set up a sandbox account - https://github.com/airbytehq/airbyte/issues/20726"
- config_path: "secrets/config_oauth.json"
expect_records:
bypass_reason: "Risk to disclose internal data. Need to set up a sandbox account - https://github.com/airbytehq/airbyte/issues/20726"
incremental:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/state.json"
cursor_paths:
lists: ["date_created"]
campaigns: ["create_time"]
email_activity: ["49d68626f3", "timestamp"]
# Email activities stream has working campaigns with email newsletters.
# Due to this sequential_reads test could be failed.
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
timeout_seconds: 1800
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
timeout_seconds: 1800
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["create_time"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
Expand All @@ -22,20 +23,23 @@
"default_cursor_field": ["date_created"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"sync_mode": "incremental",
"cursor_field": ["date_created"],
"destination_sync_mode": "append"
"destination_sync_mode": "append",
"primary_key": [["id"]]
},
{
"stream": {
"name": "email_activity",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"source_defined_cursor": true,
"default_cursor_field": ["timestamp"]
"default_cursor_field": ["timestamp"],
"source_defined_primary_key": [["timestamp"], ["email_id"], ["action"]]
},
"sync_mode": "incremental",
"cursor_field": ["timestamp"],
"primary_key": [["timestamp"], ["email_id"], ["action"]],
"destination_sync_mode": "append"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["create_time"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
Expand All @@ -24,6 +25,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["date_created"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "create_time": "2020-11-23T05:42:11+00:00" },
"stream_state": { "create_time": "2220-11-23T05:42:11+00:00" },
"stream_descriptor": { "name": "campaigns" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "date_created": "2020-09-25T04:47:31+00:00" },
"stream_state": { "date_created": "2220-09-25T04:47:31+00:00" },
"stream_descriptor": { "name": "lists" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"49d68626f3": { "timestamp": "2020-11-23T05:42:10+00:00" }
"49d68626f3": { "timestamp": "2220-11-23T05:42:10+00:00" }
},
"stream_descriptor": { "name": "email_activity" }
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mailchimp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=[
"airbyte-cdk~=0.1",
"airbyte-cdk",
"pytest~=6.1",
],
package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]},
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = MailChimpAuthenticator().get_auth(config)
return [Lists(authenticator=authenticator), Campaigns(authenticator=authenticator), EmailActivity(authenticator=authenticator)]
campaign_id = config.get("campaign_id")
return [
Lists(authenticator=authenticator),
Campaigns(authenticator=authenticator),
EmailActivity(authenticator=authenticator, campaign_id=campaign_id)
]
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
}
}
]
},
"campaign_id": {
"type": "string",
"title": "ID of a campaign to sync email activities",
"airbyte_hidden": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import math
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import requests
from airbyte_cdk.models import SyncMode
Expand All @@ -14,7 +14,7 @@

class MailChimpStream(HttpStream, ABC):
primary_key = "id"
page_size = 100
page_size = 1000

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -72,6 +72,14 @@ def cursor_field(self) -> str:
"""
pass

@property
def filter_field(self):
return f"since_{self.cursor_field}"

@property
def sort_field(self):
return self.cursor_field

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
Expand All @@ -81,13 +89,21 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_state = current_stream_state.get(self.cursor_field) or latest_state
return {self.cursor_field: max(latest_state, current_state)}

def request_params(self, stream_state=None, **kwargs):
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
slice_ = {}
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)
default_params = {"sort_field": self.cursor_field, "sort_dir": "ASC"}
since_value = stream_state.get(self.cursor_field)
if since_value:
default_params[f"since_{self.cursor_field}"] = since_value
cursor_value = stream_state.get(self.cursor_field)
if cursor_value:
slice_[self.filter_field] = cursor_value
yield slice_

def request_params(self, stream_state=None, stream_slice=None, **kwargs):
stream_state = stream_state or {}
stream_slice = stream_slice or {}
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
default_params = {"sort_field": self.sort_field, "sort_dir": "ASC", **stream_slice}
params.update(default_params)
return params

Expand All @@ -110,13 +126,30 @@ def path(self, **kwargs) -> str:

class EmailActivity(IncrementalMailChimpStream):
cursor_field = "timestamp"
filter_field = "since"
sort_field = "create_time"
data_field = "emails"
primary_key = ["timestamp", "email_id", "action"]

def stream_slices(self, **kwargs):
campaign_stream = Campaigns(authenticator=self.authenticator)
for campaign in campaign_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"campaign_id": campaign["id"]}
def __init__(self, campaign_id: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.campaign_id = campaign_id

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_state = stream_state or {}
if self.campaign_id:
# this is a workaround to speed up SATs and enable incremental tests
campaigns = [{"id": self.campaign_id}]
else:
campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh)
for campaign in campaigns:
slice_ = {"campaign_id": campaign["id"]}
cursor_value = stream_state.get(campaign["id"], {}).get(self.cursor_field)
Copy link

@murphpdx murphpdx Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like cursor_field is set to timestamp. I know you have a sort_field of create_time but Mailchimp does not allow you to change the sorting. That means that you're going to lose a lot of records. The timestamp should stay the same the offset is what should be used to paginate. You will set the offset too offset = offset + pagesize; The since param should not change.
https://mailchimp.com/developer/marketing/docs/methods-parameters/#pagination
As you can see from the list-email-activity docs, there is no sort_field. I believe you're sort variable is just getting ignored.
https://mailchimp.com/developer/marketing/api/email-activity-reports/list-email-activity/

Copy link
Collaborator Author

@davydov-d davydov-d Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murphpdx as for now there's just a single slice per campaign, this means we request all the data after the since date from the API and dont actually care if there is sorting or not. Can you confirm the bug still persists with you after upgrading to the latest version?

Copy link

@murphpdx murphpdx Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided not to use airbyte last summer due in part to this bug. So I don't have the time to test it out for you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we are paginating and the since param remains the same throughout all the pages within one campaign. The thing is you are pointing to the stream_slices method which yields one slice per campaign and the pagination is made within one slice.

if cursor_value:
slice_[self.filter_field] = cursor_value
yield slice_

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
campaign_id = stream_slice["campaign_id"]
Expand All @@ -140,17 +173,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_stream_state[campaign_id] = new_value
return current_stream_state

def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs):
stream_state = stream_state or {}
params = MailChimpStream.request_params(self, stream_state=stream_state, **kwargs)

since_value_camp = stream_state.get(stream_slice["campaign_id"])
if since_value_camp:
since_value = since_value_camp.get(self.cursor_field)
if since_value:
params["since"] = since_value
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# transform before save
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def test_next_page_token(auth):
expected_token = None
assert stream.next_page_token(**inputs) == expected_token

resp = {"lists": [{"id": i} for i in range(101)]}
resp = {"lists": [{"id": i} for i in range(1001)]}
inputs = {"response": MagicMock(json=MagicMock(return_value=resp))}
expected_token = {"offset": 100}
expected_token = {"offset": 1000}
assert stream.next_page_token(**inputs) == expected_token


Expand All @@ -52,11 +52,11 @@ def test_next_page_token(auth):
[
(
{"stream_slice": None, "stream_state": None, "next_page_token": None},
{"count": 100, "sort_dir": "ASC", "sort_field": "date_created"},
{"count": 1000, "sort_dir": "ASC", "sort_field": "date_created"},
),
(
{"stream_slice": None, "stream_state": None, "next_page_token": {"offset": 100}},
{"count": 100, "sort_dir": "ASC", "sort_field": "date_created", "offset": 100},
{"stream_slice": None, "stream_state": None, "next_page_token": {"offset": 1000}},
{"count": 1000, "sort_dir": "ASC", "sort_field": "date_created", "offset": 1000},
),
],
)
Expand Down
Loading