Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up method GlobalSubstreamCursor.set_initial_state by 65% in PR #45415 (tolik0/airbyte-cdk/perpartition-with-global-regression-tests) #45418

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations
import datetime
from dataclasses import InitVar, dataclass, field
from datetime import timedelta
Expand Down Expand Up @@ -109,12 +110,38 @@ def get_stream_state(self) -> StreamState:

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called
before calling anything else

:param stream_state: The state of the stream as returned by get_stream_state
Set the initial state for the cursors.
Args:
stream_state (StreamState): The state of the streams to be set.
Format of the stream state should be:
{
"state": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
},
"lookback_window": 132
}
"""
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
if not stream_state:
return

lookback_window = stream_state.get("lookback_window")
if lookback_window is not None:
self._lookback_window = lookback_window
self._inject_lookback_into_stream_cursor(lookback_window)

state = stream_state.get("state")
if state:
self._stream_cursor.set_initial_state(state)

# Set parent state for partition routers based on parent streams
parent_state = stream_state.get("parent_state")
if parent_state:
self._partition_router.set_initial_state(parent_state)

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations
import threading
import time
from typing import Any, Iterable, Mapping, Optional, Union
Expand Down Expand Up @@ -114,8 +115,7 @@ def stream_slices(self) -> Iterable[StreamSlice]:

def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice)
for cursor_slice in self._stream_cursor.stream_slices()
StreamSlice(partition=partition, cursor_slice=cursor_slice) for cursor_slice in self._stream_cursor.stream_slices()
)

yield from slice_generator
Expand All @@ -137,49 +137,43 @@ def register_slice(self, last: bool) -> None:
def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the initial state for the cursors.

This method initializes the state for the global cursor using the provided stream state.

Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router
does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.

Args:
stream_state (StreamState): The state of the streams to be set. The format of the stream state should be:
{
"state": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
},
"lookback_window": 132
stream_state (StreamState): The state of the streams to be set.
Format of the stream state should be:
{
"state": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
},
"lookback_window": 132
}
"""
if not stream_state:
return

if "lookback_window" in stream_state:
self._lookback_window = stream_state["lookback_window"]
self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])
lookback_window = stream_state.get("lookback_window")
if lookback_window is not None:
self._lookback_window = lookback_window
self._inject_lookback_into_stream_cursor(lookback_window)

if "state" in stream_state:
self._stream_cursor.set_initial_state(stream_state["state"])
state = stream_state.get("state")
if state:
self._stream_cursor.set_initial_state(state)

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
parent_state = stream_state.get("parent_state")
if parent_state:
self._partition_router.set_initial_state(parent_state)

def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None:
"""
Modifies the stream cursor's lookback window based on the duration of the previous sync.
This adjustment ensures the cursor is set to the minimal lookback window necessary for
avoiding missing data.

Parameters:
lookback_window (int): The lookback duration in seconds to be set, derived from
the previous sync.

lookback_window (int): The lookback duration in seconds to be set, derived from the previous sync.
Raises:
ValueError: If the cursor does not support dynamic lookback window adjustments.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import Mapping, Optional
Expand All @@ -19,26 +20,40 @@ class PartitionRouter(StreamSlicer):
get_parent_state(): Get the state of the parent streams.
"""

@abstractmethod
def set_initial_state(self, stream_state: StreamState) -> None:
"""
Set the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
incrementally using the state.

Set the initial state for the cursors.
Args:
stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
'parent_state' which is a dictionary of parent state names to their corresponding state.
Example:
{
"parent_state": {
"parent_stream_name_1": { ... },
"parent_stream_name_2": { ... },
...
}
stream_state (StreamState): The state of the streams to be set.
Format of the stream state should be:
{
"state": {
"last_updated": "2023-05-27T00:00:00Z"
},
"parent_state": {
"parent_stream_name": {
"last_updated": "2023-05-27T00:00:00Z"
}
},
"lookback_window": 132
}
"""
if not stream_state:
return

lookback_window = stream_state.get("lookback_window")
if lookback_window is not None:
self._lookback_window = lookback_window
self._inject_lookback_into_stream_cursor(lookback_window)

state = stream_state.get("state")
if state:
self._stream_cursor.set_initial_state(state)

# Set parent state for partition routers based on parent streams
parent_state = stream_state.get("parent_state")
if parent_state:
self._partition_router.set_initial_state(parent_state)

@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
Expand Down
Loading