diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index c057b60721c4..a1f4e9d97c9e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -615,7 +615,7 @@ - name: Mixpanel sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a dockerRepository: airbyte/source-mixpanel - dockerImageTag: 0.1.19 + dockerImageTag: 0.1.20 documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel icon: mixpanel.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ddb2ea8bbb56..c50e4ef8bcbe 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5685,7 +5685,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-mixpanel:0.1.19" +- dockerImage: "airbyte/source-mixpanel:0.1.20" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index 51152d5dda22..1d81d3f9b885 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.19 +LABEL io.airbyte.version=0.1.20 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json index 8c40297a83d6..667b6a67d26d 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json @@ -7,6 +7,6 @@ "date": "2021-07-01" }, "export": { - "date": "2021-06-16" + "date": "2021-06-16T12:00:00" } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 97e46a9e02f1..44abe6d75628 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -79,10 +79,15 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma for record in data: yield record - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + **kwargs, + ) -> Iterable[Mapping]: # parse the whole response - yield from self.process_response(response, **kwargs) + yield from self.process_response(response, stream_state=stream_state, **kwargs) if self.reqs_per_hour_limit > 0: # we skip this block, if self.reqs_per_hour_limit = 0, diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index bac592bdc8fd..10b693a7d38f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -4,7 +4,7 @@ import json from datetime import datetime -from typing import Any, Iterable, Mapping +from typing import Any, Iterable, Mapping, MutableMapping import requests from airbyte_cdk.models import SyncMode @@ -149,3 +149,11 @@ def get_json_schema(self) -> Mapping[str, Any]: schema["properties"][result.transformed_name] = {"type": ["null", "string"]} return schema + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + mapping = {"from_date": stream_slice["start_date"], "to_date": stream_slice["end_date"]} + if stream_state and "date" in stream_state: + mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})" + return mapping diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 0f91b0424ee2..dc8ebc808c8c 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import datetime from datetime import timedelta from unittest.mock import MagicMock @@ -407,7 +408,7 @@ def export_response(): { "event": "Viewed E-commerce Page", "properties": { - "time": 1623860880, + "time": 1623860880, # 2021-06-16T16:28:00 "distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed", "$browser": "Chrome", "$browser_version": "91.0.4472.101", @@ -426,10 +427,26 @@ def test_export_stream(requests_mock, export_response): stream = Export(authenticator=MagicMock()) requests_mock.register_uri("GET", get_url_to_mock(stream), export_response) - stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} # read records for single slice records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) records_length = sum(1 for _ in records) assert records_length == 1 + + +def test_export_stream_request_params(): + stream = Export(authenticator=MagicMock()) + stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} + stream_state = {"date": "2021-06-16T17:00:00"} + + request_params = stream.request_params(stream_state=None, stream_slice=stream_slice) + assert "where" not in request_params + + request_params = stream.request_params(stream_state={}, stream_slice=stream_slice) + assert "where" not in request_params + + request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice) + assert "where" in request_params + timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp()) + assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})' diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 1f0e0f0ba1a4..c075223be5e4 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------| +| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support | | 0.1.19 | 2022-08-18 | [15739](https://github.com/airbytehq/airbyte/pull/15739) | Update `titile` and `description` for `Project Secret` field | | 0.1.18 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas and specs | | 0.1.17 | 2022-06-01 | [12801](https://github.com/airbytehq/airbyte/pull/13372) | Acceptance tests fix, fixing some bugs for beta release |