Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Mixpanel: add page size to configuration to increase sync speed #41976

Merged

Conversation

descampsk
Copy link
Contributor

What

The engage stream retrieves 1000 records per request and has a 60s delay between two requests.
To synchronise 1 Million records, we need 16.6 hours.

If we increase the page size to 40 000, we then need only 25 minutes.

How

This PR only adds the possibility to set the page size to a user defined value to be able to set higher page size to increase the sync speed.

Review guide

See comments.

User Impact

Great performance increase.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌

Copy link

vercel bot commented Jul 16, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jul 25, 2024 2:47pm

@@ -291,7 +291,7 @@ def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Op
if total:
self._total = total

if self._total and page_number is not None and self._total > self.page_size * (page_number + 1):
if self._total and page_number is not None and self._total > self._page_size * (page_number + 1):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get the real page size from PageIncrement and not '{{ config.page_size }}'

@@ -150,7 +153,7 @@ definitions:
type: CustomPaginationStrategy
class_name: "source_mixpanel.components.EngagePaginationStrategy"
start_from_page: 1
page_size: 1000
page_size: "{{ config.page_size }}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow the configuration of the page size

@@ -72,7 +72,7 @@ def validate_date(name: str, date_str: str, default: pendulum.date) -> pendulum.

@adapt_validate_if_testing
def _validate_and_transform(self, config: MutableMapping[str, Any]):
project_timezone, start_date, end_date, attribution_window, select_properties_by_default, region, date_window_size, project_id = (
project_timezone, start_date, end_date, attribution_window, select_properties_by_default, region, date_window_size, project_id, page_size = (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is needed

@@ -119,6 +119,14 @@
"type": "integer",
"minimum": 1,
"default": 30
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the page size ton the user defined configuration

@@ -94,37 +94,37 @@ def test_streams_string_date(requests_mock, config_raw):
"config, success, expected_error_message",
(
(
{"credentials": {"api_secret": "secret"}, "project_timezone": "Miami"},
{"credentials": {"api_secret": "secret"}, "project_timezone": "Miami", "page_size": 1000},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed otherwise the tests fail

@descampsk
Copy link
Contributor Author

@natikgadzhi @ChristoGrab
Hello,
Here is the PR we discussed to greatly improve the sync speed of the engage stream :)

Co-authored-by: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com>
@marcosmarxm
Copy link
Member

Running CI Tests to validate changes.

@descampsk
Copy link
Contributor Author

descampsk commented Jul 23, 2024

@ChristoGrab @marcosmarxm
What can I do to make this PR ready to merge ?

Tests seems failing but I cannot see the logs

@marcosmarxm
Copy link
Member

�[35m49:�[0m [1.29s] {"type": "LOG", "log": {"level": "FATAL", "message": " is of type <class 'str'>. Expected <class 'int'>\nTraceback (most recent call last):\n File "/airbyte/integration_code/main.py", line 8, in \n run()\n File "/airbyte/integration_code/source_mixpanel/run.py", line 16, in run\n launch(source, sys.argv[1:])\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py", line 235, in launch\n for message in source_entrypoint.run(parsed_args):\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py", line 115, in run\n yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.check(source_spec, config))\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py", line 139, in check\n check_result = self.source.check(self.logger, config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 157, in check\n return super().check(logger, config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py", line 87, in check\n check_succeeded, error = self.check_connection(logger, config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/declarative_source.py", line 33, in check_connection\n return self.connection_checker.check_connection(self, logger, config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/checks/check_stream.py", line 31, in check_connection\n streams = source.streams(config)\n File "/airbyte/integration_code/source_mixpanel/source.py", line 45, in streams\n streams = super().streams(config=config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 97, in streams\n source_streams = [\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py", line 98, in \n self._constructor.create_component(\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 240, in create_component\n return self._create_component_from_model(model=declarative_component_model, config=config, **kwargs)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 248, in _create_component_from_model\n return component_constructor(model=model, config=config, **kwargs)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 607, in create_declarative_stream\n retriever = self._create_component_from_model(\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 248, in _create_component_from_model\n return component_constructor(model=model, config=config, **kwargs)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 1009, in create_simple_retriever\n self._create_component_from_model(\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 248, in _create_component_from_model\n return component_constructor(model=model, config=config, **kwargs)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 712, in create_default_paginator\n pagination_strategy = self._create_component_from_model(model=model.pagination_strategy, config=config)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 248, in _create_component_from_model\n return component_constructor(model=model, config=config, **kwargs)\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py", line 462, in create_custom_component\n return custom_component_class(**kwargs)\n File "", line 7, in init\n File "/usr/local/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py", line 37, in post_init\n raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")\nException: is of type <class 'str'>. Expected <class 'int'>"}}

@descampsk
Copy link
Contributor Author

Thanks, will investigate that

@descampsk
Copy link
Contributor Author

Do you know why the default value from spec.json

      "page_size": {
        "order": 9,
        "title": "Page Size",
        "description": "The number of records to fetch per request. Default is 1000.",
        "type": "integer",
        "minimum": 1,
        "default": 1000
      }

is not taken ?

I can reproduce in the UI
image
With default, I have the error, If i use a value, I have no error.

@descampsk
Copy link
Contributor Author

@marcosmarxm I fixed the issue, could you run the tests again ? :)

Copy link
Contributor

@ChristoGrab ChristoGrab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@descampsk Thanks for yet another solid contribution! Regression tests just came back green so we're all good to merge, nicely done 🙌

@ChristoGrab ChristoGrab merged commit 0a304a9 into airbytehq:master Jul 25, 2024
35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation community connectors/source/mixpanel
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants