Skip to content

Commit

Permalink
✨ Source Mailchimp: Implement SegmentMembers stream (#32782)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored and git-phu committed Nov 28, 2023
1 parent a0d17dd commit aeb2e63
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "segment_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["last_changed"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": ["last_changed"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "segments",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "segment_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["last_changed"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": ["last_changed"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "segments",
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"streams": [
{
"stream": {
"name": "segment_members",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["last_changed"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "incremental",
"cursor_field": ["last_changed"],
"primary_key": [["id"]],
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@
"stream_descriptor": { "name": "reports" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"16d6ec4ffc": { "last_changed": "2230-02-26T05:42:10+00:00" }
},
"stream_descriptor": { "name": "segment_members" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"13506120": {"last_changed": "2222-12-27T08:34:39+00:00"},
"13506136": {"last_changed": "2222-12-27T08:34:39+00:00"},
"14351124": {"last_changed": "2222-12-27T08:34:39+00:00"},
"14351504": {"last_changed": "2222-12-27T07:56:47+00:00"},
"14351128": {"last_changed": "2222-12-27T08:34:39+00:00"},
"13506132": {"last_changed": "2222-12-27T08:34:39+00:00"}
},
"stream_descriptor": { "name": "segment_members" }
}
},
{
"type": "STREAM",
"stream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b03a9f3e-22a5-11eb-adc1-0242ac120002
dockerImageTag: 0.9.0
dockerImageTag: 0.10.0
dockerRepository: airbyte/source-mailchimp
documentationUrl: https://docs.airbyte.com/integrations/sources/mailchimp
githubIssueLabel: source-mailchimp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "string"]
},
"email_address": {
"type": ["null", "string"]
},
"unique_email_id": {
"type": ["null", "string"]
},
"email_type": {
"type": ["null", "string"]
},
"status": {
"type": ["null", "string"]
},
"merge_fields": {
"type": ["null", "object"],
"additionalProperties": true
},
"interests": {
"type": ["null", "object"],
"additionalProperties": true
},
"stats": {
"type": ["null", "object"],
"properties": {
"avg_open_rate": {
"type": ["null", "number"]
},
"avg_click_rate": {
"type": ["null", "number"]
}
}
},
"ip_signup": {
"type": ["null", "string"]
},
"timestamp_signup": {
"type": ["null", "string"],
"format": "date-time"
},
"ip_opt": {
"type": ["null", "string"]
},
"timestamp_opt": {
"type": ["null", "string"]
},
"member_rating": {
"type": ["null", "integer"]
},
"last_changed": {
"type": ["null", "string"],
"format": "date-time"
},
"language": {
"type": ["null", "string"]
},
"vip": {
"type": ["null", "boolean"]
},
"email_client": {
"type": ["null", "string"]
},
"location": {
"type": ["null", "object"],
"properties": {
"latitude": {
"type": ["null", "number"]
},
"longitude": {
"type": ["null", "number"]
},
"gmtoff": {
"type": ["null", "integer"]
},
"dstoff": {
"type": ["null", "integer"]
},
"country_code": {
"type": ["null", "string"]
},
"timezone": {
"type": ["null", "string"]
}
}
},
"last_note": {
"type": ["null", "object"],
"properties": {
"note_id": {
"type": ["null", "integer"]
},
"created_at": {
"type": ["null", "string"],
"format": "date-time"
},
"created_by": {
"type": ["null", "string"]
},
"note": {
"type": ["null", "string"]
}
}
},
"list_id": {
"type": ["null", "string"]
},
"segment_id": {
"type": ["null", "integer"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ListMembers,
Lists,
Reports,
SegmentMembers,
Segments,
Tags,
Unsubscribes,
Expand Down Expand Up @@ -114,6 +115,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
lists,
ListMembers(authenticator=authenticator),
Reports(authenticator=authenticator),
SegmentMembers(authenticator=authenticator),
Segments(authenticator=authenticator),
Tags(authenticator=authenticator, parent=lists),
Unsubscribes(authenticator=authenticator, campaign_id=campaign_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,80 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield self.remove_empty_datetime_fields(record)


class SegmentMembers(MailChimpListSubStream):
"""
Get information about members in a specific segment.
Docs link: https://mailchimp.com/developer/marketing/api/list-segment-members/list-members-in-segment/
"""

cursor_field = "last_changed"
data_field = "members"

def nullify_empty_string_fields(self, element: Mapping[str, Any]) -> Mapping[str, Any]:
"""
SegmentMember records may contain multiple fields that are returned as empty strings, which causes validation issues for fields with declared "datetime" formats.
Since all fields are nullable, replacing any string value of "" with None is a safe way to handle these edge cases.
:param element: A SegmentMember record, dictionary or list
"""

if isinstance(element, dict):
# If the element is a dictionary, apply the method recursively to each value,
# replacing the empty string value with None.
element = {k: self.nullify_empty_string_fields(v) if v != "" else None for k, v in element.items()}
elif isinstance(element, list):
# If the element is a list, apply the method recursively to each item in the list.
element = [self.nullify_empty_string_fields(v) for v in element]

return element

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Each slice consists of a list_id and segment_id pair
"""
segments_slices = Segments(authenticator=self.authenticator).stream_slices(sync_mode=SyncMode.full_refresh)

for slice in segments_slices:
segment_records = Segments(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice)

for segment in segment_records:
yield {"list_id": segment["list_id"], "segment_id": segment["id"]}

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
list_id = stream_slice.get("list_id")
segment_id = stream_slice.get("segment_id")
return f"lists/{list_id}/segments/{segment_id}/members"

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice, **kwargs) -> Iterable[Mapping]:
"""
SegmentMembers endpoint does not support sorting, so we need to filter out records that are older than the current state
"""
response = super().parse_response(response, **kwargs)

for record in response:
# Add the segment_id foreign_key to each record
record["segment_id"] = stream_slice.get("segment_id")

current_cursor_value = stream_state.get(str(record.get("segment_id")), {}).get(self.cursor_field)
record_cursor_value = record.get(self.cursor_field)
if current_cursor_value is None or record_cursor_value >= current_cursor_value:
yield self.nullify_empty_string_fields(record)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
current_stream_state = current_stream_state or {}
segment_id = str(latest_record.get("segment_id"))
latest_cursor_value = latest_record.get(self.cursor_field)

# Get the current state value for this list, if it exists
segment_state = current_stream_state.get(segment_id, {})
current_cursor_value = segment_state.get(self.cursor_field, latest_cursor_value)

# Update the cursor value and set it in state
updated_cursor_value = max(current_cursor_value, latest_cursor_value)
current_stream_state[segment_id] = {self.cursor_field: updated_cursor_value}
return current_stream_state


class Segments(MailChimpListSubStream):
"""
Get information about all available segments for a specific list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ def test_wrong_config(wrong_config):

def test_streams_count(config):
streams = SourceMailchimp().streams(config)
assert len(streams) == 11
assert len(streams) == 12
Loading

0 comments on commit aeb2e63

Please sign in to comment.