Skip to content

Commit

Permalink
🐛 Source MixPanel: allow pulling data from different regions (#6075)
Browse files Browse the repository at this point in the history
added region option
  • Loading branch information
yaroslav-dudar authored Sep 18, 2021
1 parent 5e4dddd commit 33a1981
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "12928b32-bf0a-4f1e-964f-07e12e37153a",
"name": "Mixpanel",
"dockerRepository": "airbyte/source-mixpanel",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mixpanel",
"icon": "mixpanel.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
- sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
name: Mixpanel
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
- sourceDefinitionId: aea2fd0d-377d-465e-86c0-4fdc4f688e51
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,18 @@ class MixpanelStream(HttpStream, ABC):
send requests with planned delay: 3600/reqs_per_hour_limit seconds
"""

url_base = "https://mixpanel.com/api/2.0/"
@property
def url_base(self):
prefix = "eu." if self.region == "EU" else ""
return f"https://{prefix}mixpanel.com/api/2.0/"

# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
reqs_per_hour_limit = 400 # 1 req in 9 secs

def __init__(
self,
authenticator: HttpAuthenticator,
region: str = None,
start_date: Union[date, str] = None,
end_date: Union[date, str] = None,
date_window_size: int = 30, # in days
Expand All @@ -76,6 +80,7 @@ def __init__(
self.date_window_size = date_window_size
self.attribution_window = attribution_window
self.additional_properties = select_properties_by_default
self.region = region if region else "US"

super().__init__(authenticator=authenticator)

Expand Down Expand Up @@ -112,6 +117,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
# wait for X seconds to match API limitations
time.sleep(3600 / self.reqs_per_hour_limit)

def get_stream_params(self) -> Mapping[str, Any]:
"""
Fetch required parameters in a given stream. Used to create sub-streams
"""
return {"authenticator": self.authenticator, "region": self.region}


class IncrementalMixpanelStream(MixpanelStream, ABC):
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
Expand Down Expand Up @@ -229,7 +240,7 @@ def path(self, **kwargs) -> str:
return "funnels"

def funnel_slices(self, sync_mode) -> List[dict]:
funnel_slices = FunnelsList(authenticator=self.authenticator).read_records(sync_mode=sync_mode)
funnel_slices = FunnelsList(**self.get_stream_params()).read_records(sync_mode=sync_mode)
funnel_slices = list(funnel_slices) # [{'funnel_id': <funnel_id1>, 'name': <name1>}, {...}]

# save all funnels in dict(<funnel_id1>:<name1>, ...)
Expand Down Expand Up @@ -523,7 +534,7 @@ def get_json_schema(self) -> Mapping[str, Any]:
}

# read existing Engage schema from API
schema_properties = EngageSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh)
schema_properties = EngageSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh)
for property_entry in schema_properties:
property_name: str = property_entry["name"]
property_type: str = property_entry["type"]
Expand Down Expand Up @@ -553,7 +564,7 @@ def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
stream_slices = []
cohorts = Cohorts(authenticator=self.authenticator).read_records(sync_mode=sync_mode)
cohorts = Cohorts(**self.get_stream_params()).read_records(sync_mode=sync_mode)
for cohort in cohorts:
stream_slices.append({"id": cohort["id"]})

Expand Down Expand Up @@ -692,7 +703,10 @@ class Export(DateSlicesMixin, IncrementalMixpanelStream):
cursor_field = "time"
reqs_per_hour_limit = 60 # 1 query per minute

url_base = "https://data.mixpanel.com/api/2.0/"
@property
def url_base(self):
prefix = "-eu" if self.region == "EU" else ""
return f"https://data{prefix}.mixpanel.com/api/2.0/"

def path(self, **kwargs) -> str:
return "export"
Expand All @@ -716,6 +730,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
}
}
"""
if response.text == "terminated early\n":
# no data available
self.logger.warn(f"Couldn't fetch data from Export API. Response: {response.text}")
return []

for record_line in response.text.splitlines():
record = json.loads(record_line)
Expand Down Expand Up @@ -758,7 +776,7 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema["additionalProperties"] = self.additional_properties

# read existing Export schema from API
schema_properties = ExportSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh)
schema_properties = ExportSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh)
for property_entry in schema_properties:
property_name: str = property_entry
if property_name.startswith("$"):
Expand All @@ -781,7 +799,7 @@ def __init__(self, token: str, auth_method: str = "Basic", **kwargs):


class SourceMixpanel(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
Expand All @@ -790,14 +808,15 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
authenticator = TokenAuthenticatorBase64(token=config["api_secret"])
auth = TokenAuthenticatorBase64(token=config["api_secret"])
funnels = FunnelsList(authenticator=auth, **config)
try:
response = requests.request(
"GET",
url="https://mixpanel.com/api/2.0/funnels/list",
url=funnels.url_base + funnels.path(),
headers={
"Accept": "application/json",
**authenticator.get_auth_header(),
**auth.get_auth_header(),
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
"description": "The default value to use if no bookmark exists for an endpoint. Default is 1 year ago.",
"examples": ["2021-11-16"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$"
},
"region": {
"type": "string",
"enum": ["US", "EU"],
"default": "US"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ def test_date_slices():

now = date.today()
# Test with start_date now range
stream_slices = Annotations(authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1).stream_slices(sync_mode="any")
assert 1 == len(stream_slices)

stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=1), end_date=now, date_window_size=1).stream_slices(
stream_slices = Annotations(authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1, region="EU").stream_slices(
sync_mode="any"
)
assert 1 == len(stream_slices)

stream_slices = Annotations(
authenticator=NoAuth(), start_date=now - timedelta(days=1), end_date=now, date_window_size=1, region="US"
).stream_slices(sync_mode="any")
assert 2 == len(stream_slices)

stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1).stream_slices(
Expand All @@ -52,23 +54,40 @@ def test_date_slices():

# test with attribution_window
stream_slices = Annotations(
authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1, attribution_window=5
authenticator=NoAuth(),
start_date=now - timedelta(days=2),
end_date=now,
date_window_size=1,
attribution_window=5,
region="US",
).stream_slices(sync_mode="any")
assert 8 == len(stream_slices)

# Test with start_date end_date range
stream_slices = Annotations(
authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-01"), date_window_size=1
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-01"),
date_window_size=1,
region="US",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-02"), date_window_size=1
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-02"),
date_window_size=1,
region="EU",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}, {"start_date": "2021-07-02", "end_date": "2021-07-02"}] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
date_window_size=1,
region="US",
).stream_slices(sync_mode="any")
assert [
{"start_date": "2021-07-01", "end_date": "2021-07-01"},
Expand All @@ -77,12 +96,19 @@ def test_date_slices():
] == stream_slices

stream_slices = Annotations(
authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=2
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
date_window_size=2,
region="US",
).stream_slices(sync_mode="any")
assert [{"start_date": "2021-07-01", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices

# test with stream_state
stream_slices = Annotations(
authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1
authenticator=NoAuth(),
start_date=date.fromisoformat("2021-07-01"),
end_date=date.fromisoformat("2021-07-03"),
date_window_size=1,
).stream_slices(sync_mode="any", stream_state={"date": "2021-07-02"})
assert [{"start_date": "2021-07-02", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices
4 changes: 4 additions & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ The Mixpanel connector should not run into Mixpanel API limitations under normal

* Mixpanel API Secret

* Project region `US` or `EU`

### Setup guide

Please read [Find API Secret](https://help.mixpanel.com/hc/en-us/articles/115004502806-Find-Project-Token-).

Select the correct region (EU or US) for your Mixpanel project. See detail [here](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU)


## CHANGELOG

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.1` | 2021-09-16 | [6075](https://github.com/airbytehq/airbyte/issues/6075) | Added option to select project region |
| `0.1.0` | 2021-07-06 | [3698](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native mixpanel connector |

0 comments on commit 33a1981

Please sign in to comment.