Skip to content

Commit

Permalink
Mixpanel: Filtering out individual items based on datetime in state (#…
Browse files Browse the repository at this point in the history
…15091)

* Mixpanel: Filtering out individual items based on datetime in state

* Mixpanel: Filtering out individual items based on datetime in state

* Mixpanel: Use where API option to further filter out events

* Fixing unit tests

* Fixing unit test timezone issues

* Version bump + docs

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
Kris Sikora and octavia-squidington-iii authored Aug 23, 2022
1 parent 5819733 commit a4916a4
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5685,7 +5685,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.19"
- dockerImage: "airbyte/source-mixpanel:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
"date": "2021-07-01"
},
"export": {
"date": "2021-06-16"
"date": "2021-06-16T12:00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
for record in data:
yield record

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
**kwargs,
) -> Iterable[Mapping]:

# parse the whole response
yield from self.process_response(response, **kwargs)
yield from self.process_response(response, stream_state=stream_state, **kwargs)

if self.reqs_per_hour_limit > 0:
# we skip this block, if self.reqs_per_hour_limit = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import json
from datetime import datetime
from typing import Any, Iterable, Mapping
from typing import Any, Iterable, Mapping, MutableMapping

import requests
from airbyte_cdk.models import SyncMode
Expand Down Expand Up @@ -149,3 +149,11 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema["properties"][result.transformed_name] = {"type": ["null", "string"]}

return schema

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
mapping = {"from_date": stream_slice["start_date"], "to_date": stream_slice["end_date"]}
if stream_state and "date" in stream_state:
mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})"
return mapping
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
from datetime import timedelta
from unittest.mock import MagicMock

Expand Down Expand Up @@ -407,7 +408,7 @@ def export_response():
{
"event": "Viewed E-commerce Page",
"properties": {
"time": 1623860880,
"time": 1623860880, # 2021-06-16T16:28:00
"distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed",
"$browser": "Chrome",
"$browser_version": "91.0.4472.101",
Expand All @@ -426,10 +427,26 @@ def test_export_stream(requests_mock, export_response):

stream = Export(authenticator=MagicMock())
requests_mock.register_uri("GET", get_url_to_mock(stream), export_response)

stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)

records_length = sum(1 for _ in records)
assert records_length == 1


def test_export_stream_request_params():
stream = Export(authenticator=MagicMock())
stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_state = {"date": "2021-06-16T17:00:00"}

request_params = stream.request_params(stream_state=None, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state={}, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice)
assert "where" in request_params
timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp())
assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support |
| 0.1.19 | 2022-08-18 | [15739](https://github.com/airbytehq/airbyte/pull/15739) | Update `titile` and `description` for `Project Secret` field |
| 0.1.18 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas and specs |
| 0.1.17 | 2022-06-01 | [12801](https://github.com/airbytehq/airbyte/pull/13372) | Acceptance tests fix, fixing some bugs for beta release |
Expand Down

0 comments on commit a4916a4

Please sign in to comment.