diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json index e3042f0e1be5..cd73c479261a 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/12928b32-bf0a-4f1e-964f-07e12e37153a.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "12928b32-bf0a-4f1e-964f-07e12e37153a", "name": "Mixpanel", "dockerRepository": "airbyte/source-mixpanel", - "dockerImageTag": "0.1.0", + "dockerImageTag": "0.1.1", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mixpanel", "icon": "mixpanel.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 509359fb228c..2d154b912462 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -212,7 +212,7 @@ - sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a name: Mixpanel dockerRepository: airbyte/source-mixpanel - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel icon: mixpanel.svg - sourceDefinitionId: aea2fd0d-377d-465e-86c0-4fdc4f688e51 diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index c7f197999c7b..d77882fdd02d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index 905e8270c0e5..6d226a9d03de 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -56,7 +56,10 @@ class MixpanelStream(HttpStream, ABC): send requests with planned delay: 3600/reqs_per_hour_limit seconds """ - url_base = "https://mixpanel.com/api/2.0/" + @property + def url_base(self): + prefix = "eu." if self.region == "EU" else "" + return f"https://{prefix}mixpanel.com/api/2.0/" # https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits reqs_per_hour_limit = 400 # 1 req in 9 secs @@ -64,6 +67,7 @@ class MixpanelStream(HttpStream, ABC): def __init__( self, authenticator: HttpAuthenticator, + region: str = None, start_date: Union[date, str] = None, end_date: Union[date, str] = None, date_window_size: int = 30, # in days @@ -76,6 +80,7 @@ def __init__( self.date_window_size = date_window_size self.attribution_window = attribution_window self.additional_properties = select_properties_by_default + self.region = region if region else "US" super().__init__(authenticator=authenticator) @@ -112,6 +117,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp # wait for X seconds to match API limitations time.sleep(3600 / self.reqs_per_hour_limit) + def get_stream_params(self) -> Mapping[str, Any]: + """ + Fetch required parameters in a given stream. Used to create sub-streams + """ + return {"authenticator": self.authenticator, "region": self.region} + class IncrementalMixpanelStream(MixpanelStream, ABC): def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: @@ -229,7 +240,7 @@ def path(self, **kwargs) -> str: return "funnels" def funnel_slices(self, sync_mode) -> List[dict]: - funnel_slices = FunnelsList(authenticator=self.authenticator).read_records(sync_mode=sync_mode) + funnel_slices = FunnelsList(**self.get_stream_params()).read_records(sync_mode=sync_mode) funnel_slices = list(funnel_slices) # [{'funnel_id': , 'name': }, {...}] # save all funnels in dict(:, ...) @@ -523,7 +534,7 @@ def get_json_schema(self) -> Mapping[str, Any]: } # read existing Engage schema from API - schema_properties = EngageSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) + schema_properties = EngageSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh) for property_entry in schema_properties: property_name: str = property_entry["name"] property_type: str = property_entry["type"] @@ -553,7 +564,7 @@ def stream_slices( self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: stream_slices = [] - cohorts = Cohorts(authenticator=self.authenticator).read_records(sync_mode=sync_mode) + cohorts = Cohorts(**self.get_stream_params()).read_records(sync_mode=sync_mode) for cohort in cohorts: stream_slices.append({"id": cohort["id"]}) @@ -692,7 +703,10 @@ class Export(DateSlicesMixin, IncrementalMixpanelStream): cursor_field = "time" reqs_per_hour_limit = 60 # 1 query per minute - url_base = "https://data.mixpanel.com/api/2.0/" + @property + def url_base(self): + prefix = "-eu" if self.region == "EU" else "" + return f"https://data{prefix}.mixpanel.com/api/2.0/" def path(self, **kwargs) -> str: return "export" @@ -716,6 +730,10 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp } } """ + if response.text == "terminated early\n": + # no data available + self.logger.warn(f"Couldn't fetch data from Export API. Response: {response.text}") + return [] for record_line in response.text.splitlines(): record = json.loads(record_line) @@ -758,7 +776,7 @@ def get_json_schema(self) -> Mapping[str, Any]: schema["additionalProperties"] = self.additional_properties # read existing Export schema from API - schema_properties = ExportSchema(authenticator=self.authenticator).read_records(sync_mode=SyncMode.full_refresh) + schema_properties = ExportSchema(**self.get_stream_params()).read_records(sync_mode=SyncMode.full_refresh) for property_entry in schema_properties: property_name: str = property_entry if property_name.startswith("$"): @@ -781,7 +799,7 @@ def __init__(self, token: str, auth_method: str = "Basic", **kwargs): class SourceMixpanel(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: + def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]: """ See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232 for an example. @@ -790,14 +808,15 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: :param logger: logger object :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. """ - authenticator = TokenAuthenticatorBase64(token=config["api_secret"]) + auth = TokenAuthenticatorBase64(token=config["api_secret"]) + funnels = FunnelsList(authenticator=auth, **config) try: response = requests.request( "GET", - url="https://mixpanel.com/api/2.0/funnels/list", + url=funnels.url_base + funnels.path(), headers={ "Accept": "application/json", - **authenticator.get_auth_header(), + **auth.get_auth_header(), }, ) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json index cead1e425fc1..d04cf04aeedf 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -38,6 +38,11 @@ "description": "The default value to use if no bookmark exists for an endpoint. Default is 1 year ago.", "examples": ["2021-11-16"], "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" + }, + "region": { + "type": "string", + "enum": ["US", "EU"], + "default": "US" } } } diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py index eccba1bcf421..7f49206280cb 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/unit_test.py @@ -32,12 +32,14 @@ def test_date_slices(): now = date.today() # Test with start_date now range - stream_slices = Annotations(authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1).stream_slices(sync_mode="any") - assert 1 == len(stream_slices) - - stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=1), end_date=now, date_window_size=1).stream_slices( + stream_slices = Annotations(authenticator=NoAuth(), start_date=now, end_date=now, date_window_size=1, region="EU").stream_slices( sync_mode="any" ) + assert 1 == len(stream_slices) + + stream_slices = Annotations( + authenticator=NoAuth(), start_date=now - timedelta(days=1), end_date=now, date_window_size=1, region="US" + ).stream_slices(sync_mode="any") assert 2 == len(stream_slices) stream_slices = Annotations(authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1).stream_slices( @@ -52,23 +54,40 @@ def test_date_slices(): # test with attribution_window stream_slices = Annotations( - authenticator=NoAuth(), start_date=now - timedelta(days=2), end_date=now, date_window_size=1, attribution_window=5 + authenticator=NoAuth(), + start_date=now - timedelta(days=2), + end_date=now, + date_window_size=1, + attribution_window=5, + region="US", ).stream_slices(sync_mode="any") assert 8 == len(stream_slices) # Test with start_date end_date range stream_slices = Annotations( - authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-01"), date_window_size=1 + authenticator=NoAuth(), + start_date=date.fromisoformat("2021-07-01"), + end_date=date.fromisoformat("2021-07-01"), + date_window_size=1, + region="US", ).stream_slices(sync_mode="any") assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}] == stream_slices stream_slices = Annotations( - authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-02"), date_window_size=1 + authenticator=NoAuth(), + start_date=date.fromisoformat("2021-07-01"), + end_date=date.fromisoformat("2021-07-02"), + date_window_size=1, + region="EU", ).stream_slices(sync_mode="any") assert [{"start_date": "2021-07-01", "end_date": "2021-07-01"}, {"start_date": "2021-07-02", "end_date": "2021-07-02"}] == stream_slices stream_slices = Annotations( - authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1 + authenticator=NoAuth(), + start_date=date.fromisoformat("2021-07-01"), + end_date=date.fromisoformat("2021-07-03"), + date_window_size=1, + region="US", ).stream_slices(sync_mode="any") assert [ {"start_date": "2021-07-01", "end_date": "2021-07-01"}, @@ -77,12 +96,19 @@ def test_date_slices(): ] == stream_slices stream_slices = Annotations( - authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=2 + authenticator=NoAuth(), + start_date=date.fromisoformat("2021-07-01"), + end_date=date.fromisoformat("2021-07-03"), + date_window_size=2, + region="US", ).stream_slices(sync_mode="any") assert [{"start_date": "2021-07-01", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices # test with stream_state stream_slices = Annotations( - authenticator=NoAuth(), start_date=date.fromisoformat("2021-07-01"), end_date=date.fromisoformat("2021-07-03"), date_window_size=1 + authenticator=NoAuth(), + start_date=date.fromisoformat("2021-07-01"), + end_date=date.fromisoformat("2021-07-03"), + date_window_size=1, ).stream_slices(sync_mode="any", stream_state={"date": "2021-07-02"}) assert [{"start_date": "2021-07-02", "end_date": "2021-07-02"}, {"start_date": "2021-07-03", "end_date": "2021-07-03"}] == stream_slices diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 14f30d49dc14..3a9ca1006b18 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -43,14 +43,18 @@ The Mixpanel connector should not run into Mixpanel API limitations under normal * Mixpanel API Secret +* Project region `US` or `EU` + ### Setup guide Please read [Find API Secret](https://help.mixpanel.com/hc/en-us/articles/115004502806-Find-Project-Token-). +Select the correct region (EU or US) for your Mixpanel project. See detail [here](https://help.mixpanel.com/hc/en-us/articles/360039135652-Data-Residency-in-EU) ## CHANGELOG | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| `0.1.1` | 2021-09-16 | [6075](https://github.com/airbytehq/airbyte/issues/6075) | Added option to select project region | | `0.1.0` | 2021-07-06 | [3698](https://github.com/airbytehq/airbyte/issues/3698) | created CDK native mixpanel connector |