Skip to content

Commit

Permalink
1058 source mailchimp - fix the way request params are built
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Dec 21, 2022
1 parent c0838f8 commit 5f493bd
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 139 deletions.
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-mailchimp/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ FROM python:3.9-slim
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

WORKDIR /airbyte/integration_code
COPY source_mailchimp ./source_mailchimp
COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_mailchimp ./source_mailchimp
COPY main.py ./

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.name=airbyte/source-mailchimp
Original file line number Diff line number Diff line change
@@ -1,66 +1,57 @@
connector_image: airbyte/source-mailchimp:dev
tests:
test_strictness_level: high
acceptance_tests:
spec:
- spec_path: "source_mailchimp/spec.json"
timeout_seconds: 60
tests:
- spec_path: "source_mailchimp/spec.json"
connection:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
status: "succeed"
timeout_seconds: 180
# for auth with API token
- config_path: "secrets/config_apikey.json"
status: "succeed"
timeout_seconds: 180
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
status: "succeed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config.json"
status: "failed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config_apikey.json"
status: "failed"
timeout_seconds: 180
- config_path: "integration_tests/invalid_config_oauth.json"
status: "failed"
timeout_seconds: 180
tests:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
status: "succeed"
# for auth with API token
- config_path: "secrets/config_apikey.json"
status: "succeed"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_config_apikey.json"
status: "failed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "failed"
discovery:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
# for auth with API token
- config_path: "secrets/config_apikey.json"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
tests:
# for old spec config (without oneOf)
- config_path: "secrets/config.json"
# for auth with API token
- config_path: "secrets/config_apikey.json"
# for auth with oauth2 token
- config_path: "secrets/config_oauth.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1800
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1800
# THIS TEST IS COMMENTED OUT. Tests are supposed to accept
# `state = {cursor_field: value}`. When we have dependent endpoint path
# `path_begin/{some_id}/path_end` we need a complex state like below:
# `{"id1": {cursor_field: value}, "id2": {cursor_field: value}...}`
# The test currently is not supposed to accept this desired construction,
# so it is commented out

# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/state.json"
# cursor_paths:
# lists: [ "date_created" ]
# campaigns: [ "create_time" ]
# Email_activity: [ "timestamp" ]

tests:
- config_path: "secrets/config.json"
expect_records:
bypass_reason: "Risk to disclose internal data. Need to set up a sandbox account - https://github.com/airbytehq/airbyte/issues/20726"
- config_path: "secrets/config_oauth.json"
expect_records:
bypass_reason: "Risk to disclose internal data. Need to set up a sandbox account - https://github.com/airbytehq/airbyte/issues/20726"
incremental:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/state.json"
cursor_paths:
lists: ["date_created"]
campaigns: ["create_time"]
email_activity: ["49d68626f3", "timestamp"]
# Email activities stream has working campaigns with email newsletters.
# Due to this sequential_reads test could be failed.
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
timeout_seconds: 1800
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
timeout_seconds: 1800
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "integration_tests/configured_catalog_without_email_activities.json"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["create_time"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
Expand All @@ -22,20 +23,23 @@
"default_cursor_field": ["date_created"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"sync_mode": "incremental",
"cursor_field": ["date_created"],
"destination_sync_mode": "append"
"destination_sync_mode": "append",
"primary_key": [["id"]]
},
{
"stream": {
"name": "email_activity",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"source_defined_cursor": true,
"default_cursor_field": ["timestamp"]
"default_cursor_field": ["timestamp"],
"source_defined_primary_key": [["timestamp"], ["email_id"], ["action"]]
},
"sync_mode": "incremental",
"cursor_field": ["timestamp"],
"primary_key": [["timestamp"], ["email_id"], ["action"]],
"destination_sync_mode": "append"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["create_time"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
Expand All @@ -24,6 +25,7 @@
},
"sync_mode": "incremental",
"cursor_field": ["date_created"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
{
"type": "STREAM",
"stream": {
"stream_state": { "create_time": "2020-11-23T05:42:11+00:00" },
"stream_state": { "create_time": "2220-11-23T05:42:11+00:00" },
"stream_descriptor": { "name": "campaigns" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "date_created": "2020-09-25T04:47:31+00:00" },
"stream_state": { "date_created": "2220-09-25T04:47:31+00:00" },
"stream_descriptor": { "name": "lists" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"49d68626f3": { "timestamp": "2020-11-23T05:42:10+00:00" }
"49d68626f3": { "timestamp": "2220-11-23T05:42:10+00:00" }
},
"stream_descriptor": { "name": "email_activity" }
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mailchimp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=[
"airbyte-cdk~=0.1",
"airbyte-cdk",
"pytest~=6.1",
],
package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]},
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = MailChimpAuthenticator().get_auth(config)
return [Lists(authenticator=authenticator), Campaigns(authenticator=authenticator), EmailActivity(authenticator=authenticator)]
campaign_id = config.get("campaign_id")
return [
Lists(authenticator=authenticator),
Campaigns(authenticator=authenticator),
EmailActivity(authenticator=authenticator, campaign_id=campaign_id)
]
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
}
}
]
},
"campaign_id": {
"type": "string",
"title": "ID of a campaign to sync email activities",
"airbyte_hidden": true
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import math
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import requests
from airbyte_cdk.models import SyncMode
Expand All @@ -14,7 +14,7 @@

class MailChimpStream(HttpStream, ABC):
primary_key = "id"
page_size = 100
page_size = 1000

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -72,6 +72,14 @@ def cursor_field(self) -> str:
"""
pass

@property
def filter_field(self):
return f"since_{self.cursor_field}"

@property
def sort_field(self):
return self.cursor_field

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
Expand All @@ -81,13 +89,21 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_state = current_stream_state.get(self.cursor_field) or latest_state
return {self.cursor_field: max(latest_state, current_state)}

def request_params(self, stream_state=None, **kwargs):
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
slice_ = {}
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)
default_params = {"sort_field": self.cursor_field, "sort_dir": "ASC"}
since_value = stream_state.get(self.cursor_field)
if since_value:
default_params[f"since_{self.cursor_field}"] = since_value
cursor_value = stream_state.get(self.cursor_field)
if cursor_value:
slice_[self.filter_field] = cursor_value
yield slice_

def request_params(self, stream_state=None, stream_slice=None, **kwargs):
stream_state = stream_state or {}
stream_slice = stream_slice or {}
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
default_params = {"sort_field": self.sort_field, "sort_dir": "ASC", **stream_slice}
params.update(default_params)
return params

Expand All @@ -110,13 +126,30 @@ def path(self, **kwargs) -> str:

class EmailActivity(IncrementalMailChimpStream):
cursor_field = "timestamp"
filter_field = "since"
sort_field = "create_time"
data_field = "emails"
primary_key = ["timestamp", "email_id", "action"]

def stream_slices(self, **kwargs):
campaign_stream = Campaigns(authenticator=self.authenticator)
for campaign in campaign_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"campaign_id": campaign["id"]}
def __init__(self, campaign_id: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.campaign_id = campaign_id

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_state = stream_state or {}
if self.campaign_id:
# this is a workaround to speed up SATs and enable incremental tests
campaigns = [{"id": self.campaign_id}]
else:
campaigns = Campaigns(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh)
for campaign in campaigns:
slice_ = {"campaign_id": campaign["id"]}
cursor_value = stream_state.get(campaign["id"], {}).get(self.cursor_field)
if cursor_value:
slice_[self.filter_field] = cursor_value
yield slice_

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
campaign_id = stream_slice["campaign_id"]
Expand All @@ -140,17 +173,6 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
current_stream_state[campaign_id] = new_value
return current_stream_state

def request_params(self, stream_state=None, stream_slice: Mapping[str, Any] = None, **kwargs):
stream_state = stream_state or {}
params = MailChimpStream.request_params(self, stream_state=stream_state, **kwargs)

since_value_camp = stream_state.get(stream_slice["campaign_id"])
if since_value_camp:
since_value = since_value_camp.get(self.cursor_field)
if since_value:
params["since"] = since_value
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
# transform before save
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def test_next_page_token(auth):
expected_token = None
assert stream.next_page_token(**inputs) == expected_token

resp = {"lists": [{"id": i} for i in range(101)]}
resp = {"lists": [{"id": i} for i in range(1001)]}
inputs = {"response": MagicMock(json=MagicMock(return_value=resp))}
expected_token = {"offset": 100}
expected_token = {"offset": 1000}
assert stream.next_page_token(**inputs) == expected_token


Expand All @@ -52,11 +52,11 @@ def test_next_page_token(auth):
[
(
{"stream_slice": None, "stream_state": None, "next_page_token": None},
{"count": 100, "sort_dir": "ASC", "sort_field": "date_created"},
{"count": 1000, "sort_dir": "ASC", "sort_field": "date_created"},
),
(
{"stream_slice": None, "stream_state": None, "next_page_token": {"offset": 100}},
{"count": 100, "sort_dir": "ASC", "sort_field": "date_created", "offset": 100},
{"stream_slice": None, "stream_state": None, "next_page_token": {"offset": 1000}},
{"count": 1000, "sort_dir": "ASC", "sort_field": "date_created", "offset": 1000},
),
],
)
Expand Down
Loading

0 comments on commit 5f493bd

Please sign in to comment.