Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up method GlobalSubstreamCursor.set_initial_state by 65% in PR #45415 (tolik0/airbyte-cdk/perpartition-with-global-regression-tests) #45418

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Sep 12, 2024

⚡️ This pull request contains optimizations for PR #45415

If you approve this dependent PR, these changes will be merged into the original PR branch tolik0/airbyte-cdk/perpartition-with-global-regression-tests.

This PR will be automatically closed if the original PR is merged.


📄 GlobalSubstreamCursor.set_initial_state() in airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

📈 Performance improved by 65% (0.65x faster)

⏱️ Runtime went down from 3.27 milliseconds to 1.98 millisecond

Explanation and details

To optimize the provided code for better performance, the main areas of focus include reducing redundant computations and minimizing potential thread contention. The logic and structure remain the same. Here is the refactored and optimized code.

Summary of Optimizations

  1. Consolidated Conditional Checks: Simplified boolean checks to make them more efficient.
  2. Lazy Evaluations/Short-circuiting: Used more efficient conditional statements to avoid redundant operations when not necessary.
  3. Intermediate Variables: Directly accessed dictionary elements where possible to avoid redundant intermediate variables.
  4. Reduced Lock Contention: No change here as the current implementation handles threading with minimal contention; the threading lock and semaphore usage seem appropriate for their purpose.

Note that no memory or extensive runtime profiling was done, but these changes streamline the control flow and simplify some operations, which should yield runtime improvements.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

🔘 (none found) − ⚙️ Existing Unit Tests

✅ 8 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
# function to test
from __future__ import annotations

import datetime
import threading
from abc import abstractmethod
from typing import Any, Mapping, Optional
from unittest.mock import MagicMock

import pytest  # used for our unit tests
from airbyte_cdk.sources.declarative.datetime.datetime_parser import \
    DatetimeParser
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import \
    MinMaxDatetime
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import \
    DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import \
    DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import \
    GlobalSubstreamCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import \
    InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import \
    JinjaInterpolation
from airbyte_cdk.sources.declarative.partition_routers.partition_router import \
    PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import \
    StreamSlicer
from airbyte_cdk.sources.types import StreamState

StreamState = Mapping[str, Any]


# unit tests

# Helper function to create a mock cursor
def create_mock_cursor():
    mock_cursor = MagicMock(spec=DatetimeBasedCursor)
    mock_cursor.set_initial_state = MagicMock()
    return mock_cursor
    # Outputs were verified to be equal to the original implementation

# Helper function to create a mock partition router
def create_mock_partition_router():
    mock_router = MagicMock(spec=PartitionRouter)
    mock_router.set_initial_state = MagicMock()
    return mock_router
    # Outputs were verified to be equal to the original implementation

# Basic Initialization
def test_empty_state():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    cursor.set_initial_state(None)
    # Outputs were verified to be equal to the original implementation

def test_simple_state_initialization():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {"state": {"last_updated": "2023-05-27T00:00:00Z"}}
    cursor.set_initial_state(stream_state)
    cursor._stream_cursor.set_initial_state.assert_called_once_with(stream_state["state"])
    # Outputs were verified to be equal to the original implementation

# Lookback Window Handling
def test_valid_lookback_window():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {"lookback_window": 132}
    cursor.set_initial_state(stream_state)
    # Outputs were verified to be equal to the original implementation


def test_valid_state():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {"state": {"last_updated": "2023-05-27T00:00:00Z"}}
    cursor.set_initial_state(stream_state)
    cursor._stream_cursor.set_initial_state.assert_called_once_with(stream_state["state"])
    # Outputs were verified to be equal to the original implementation


def test_valid_parent_state():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {
        "parent_state": {
            "parent_stream_name": {"last_updated": "2023-05-27T00:00:00Z"}
        }
    }
    cursor.set_initial_state(stream_state)
    cursor._partition_router.set_initial_state.assert_called_once_with(stream_state)
    # Outputs were verified to be equal to the original implementation


def test_state_and_lookback_window():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {
        "state": {"last_updated": "2023-05-27T00:00:00Z"},
        "lookback_window": 132
    }
    cursor.set_initial_state(stream_state)
    cursor._stream_cursor.set_initial_state.assert_called_once_with(stream_state["state"])
    # Outputs were verified to be equal to the original implementation

# Edge Cases
def test_large_state():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    large_state = {"state": {f"key_{i}": f"value_{i}" for i in range(10000)}}
    cursor.set_initial_state(large_state)
    cursor._stream_cursor.set_initial_state.assert_called_once_with(large_state["state"])
    # Outputs were verified to be equal to the original implementation

def test_high_lookback_window():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    stream_state = {"lookback_window": 999999999}
    cursor.set_initial_state(stream_state)
    # Outputs were verified to be equal to the original implementation

# Error Handling

def test_large_scale_performance():
    cursor = GlobalSubstreamCursor(stream_cursor=create_mock_cursor(), partition_router=create_mock_partition_router())
    large_state = {"state": {f"key_{i}": f"value_{i}" for i in range(1000000)}}
    import time
    start_time = time.time()
    cursor.set_initial_state(large_state)
    end_time = time.time()
    # Outputs were verified to be equal to the original implementation

🔘 (none found) − ⏪ Replay Tests

…n PR #45415 (`tolik0/airbyte-cdk/perpartition-with-global-regression-tests`)

To optimize the provided code for better performance, the main areas of focus include reducing redundant computations and minimizing potential thread contention. The logic and structure remain the same. Here is the refactored and optimized code.



### Summary of Optimizations
1. **Consolidated Conditional Checks**: Simplified boolean checks to make them more efficient.
2. **Lazy Evaluations/Short-circuiting**: Used more efficient conditional statements to avoid redundant operations when not necessary.
3. **Intermediate Variables**: Directly accessed dictionary elements where possible to avoid redundant intermediate variables.
4. **Reduced Lock Contention**: No change here as the current implementation handles threading with minimal contention; the threading lock and semaphore usage seem appropriate for their purpose. 

Note that no memory or extensive runtime profiling was done, but these changes streamline the control flow and simplify some operations, which should yield runtime improvements.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Sep 12, 2024
Copy link

vercel bot commented Sep 12, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 12, 2024 1:21pm

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@octavia-squidington-iii octavia-squidington-iii added CDK Connector Development Kit community labels Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit ⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants