Skip to content

Commit

Permalink
Improve SubstreamResumableFullRefreshCursor performance (#44428)
Browse files Browse the repository at this point in the history
Co-authored-by: Brian Lai <51336873+brianjlai@users.noreply.github.com>
  • Loading branch information
szubster and brianjlai authored Oct 10, 2024
1 parent 1d711a0 commit df80472
Showing 1 changed file with 7 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,7 @@ def __init__(self) -> None:
self._partition_serializer = PerPartitionKeySerializer()

def get_stream_state(self) -> StreamState:
states = []
for partition_tuple, partition_state in self._per_partition_state.items():
states.append(
{
"partition": self._to_dict(partition_tuple),
"cursor": partition_state,
}
)
state: dict[str, Any] = {"states": states}

return state
return {"states": list(self._per_partition_state.values())}

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down Expand Up @@ -76,7 +66,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
)

for state in stream_state["states"]:
self._per_partition_state[self._to_partition_key(state["partition"])] = state["cursor"]
self._per_partition_state[self._to_partition_key(state["partition"])] = state

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand All @@ -85,7 +75,10 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
pass

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
self._per_partition_state[self._to_partition_key(stream_slice.partition)] = FULL_REFRESH_COMPLETE_STATE
self._per_partition_state[self._to_partition_key(stream_slice.partition)] = {
"partition": stream_slice.partition,
"cursor": FULL_REFRESH_COMPLETE_STATE,
}

def should_be_synced(self, record: Record) -> bool:
"""
Expand All @@ -104,7 +97,7 @@ def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[S
if not stream_slice:
raise ValueError("A partition needs to be provided in order to extract a state")

return self._per_partition_state.get(self._to_partition_key(stream_slice.partition))
return self._per_partition_state.get(self._to_partition_key(stream_slice.partition), {}).get("cursor")

def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
return self._partition_serializer.to_partition_key(partition)
Expand Down

0 comments on commit df80472

Please sign in to comment.