Skip to content

Commit

Permalink
windows: Implement per-partition window expiration for tumbling and h…
Browse files Browse the repository at this point in the history
…opping windows (#747)
  • Loading branch information
quentin-quix authored Mar 4, 2025
1 parent 397c2e2 commit bbfd643
Show file tree
Hide file tree
Showing 17 changed files with 1,522 additions and 700 deletions.
66 changes: 63 additions & 3 deletions docs/windowing.md
Original file line number Diff line number Diff line change
Expand Up @@ -990,10 +990,70 @@ Also, specifying a grace period using `grace_ms` will increase the latency, beca

You can use `final()` mode when some latency is allowed, but the emitted results must be complete and unique.

>***NOTE:*** Windows can be closed only by the records with the **same** message key.
> If some message keys appear irregularly in the stream, the latest windows
> can remain unprocessed until the message with the same key is received.
## Closing strategies

By default, windows use the **key** closing strategy.
In this strategy, messages advance time and close only windows with the **same** message key.

If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received.

```python
from datetime import timedelta
from quixstreams import Application

app = Application(...)
sdf = app.dataframe(...)

# Calculate a sum of values over a window of 10 seconds
# and use .final() to emit results only when the window is complete
sdf = sdf.tumbling_window(timedelta(seconds=10)).sum().final(closing_strategy="key")

# Details:
# -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
# -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
# -> Timestamp=10100, Key="B", value=2 -> emit one message with key "B" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "B" key only.
# -> Timestamp=8000, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=10001, Key="A", value=1 -> emit one message with key "A" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "A" key.

# Results:
# (key="B", value={"start": 0, "end": 10000, "value": 2})
# (key="A", value={"start": 0, "end": 10000, "value": 2})
# No message for key "C" as the window is never closed since no messages with key "C" and a timestamp later than 10000 was received
```

An alternative is to use the **partition** closing strategy.
In this strategy, messages advance time and close windows for the whole partition to which this key belongs.

If messages aren't ordered accross keys some message can be skipped if the windows are already closed.

```python
from datetime import timedelta
from quixstreams import Application

app = Application(...)
sdf = app.dataframe(...)

# Calculate a sum of values over a window of 10 seconds
# and use .final() to emit results only when the window is complete
sdf = sdf.tumbling_window(timedelta(seconds=10)).sum().final(closing_strategy="partition")

# Details:
# -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
# -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
# -> Timestamp=10100, Key="B", value=1 -> emit three messages, the time has progressed beyond the window end for all the keys in the partition
# 1. first one with key "A" and value {"start": 0, "end": 10000, "value": 1}
# 2. second one with key "B" and value {"start": 0, "end": 10000, "value": 2}
# 3. third one with key "C" and value {"start": 0, "end": 10000, "value": 3}
# -> Timestamp=8000, Key="A", value=1 -> emit nothing and value isn't part of the sum (the window is already closed)
# -> Timestamp=10001, Key="A", value=1 -> emit nothing (the window is not closed yet)

# Results:
# (key="A", value={"start": 0, "end": 10000, "value": 1})
# (key="B", value={"start": 0, "end": 10000, "value": 2})
# (key="C", value={"start": 0, "end": 10000, "value": 3})
```

## Implementation Details

Expand Down
2 changes: 0 additions & 2 deletions quixstreams/dataframe/windows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .base import WindowResult
from .definitions import (
HoppingCountWindowDefinition,
HoppingTimeWindowDefinition,
Expand All @@ -15,5 +14,4 @@
"HoppingTimeWindowDefinition",
"SlidingTimeWindowDefinition",
"TumblingTimeWindowDefinition",
"WindowResult",
]
73 changes: 53 additions & 20 deletions quixstreams/dataframe/windows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,24 @@
import logging
from abc import abstractmethod
from collections import deque
from typing import Any, Callable, Deque, Iterable, Optional, Protocol, cast

from typing_extensions import TYPE_CHECKING, TypedDict
from typing import (
TYPE_CHECKING,
Any,
Callable,
Deque,
Iterable,
Optional,
Protocol,
TypedDict,
cast,
)

from typing_extensions import TypeAlias

from quixstreams.context import message_context
from quixstreams.core.stream import TransformExpandedCallback
from quixstreams.processing import ProcessingContext
from quixstreams.state import WindowedPartitionTransaction, WindowedState
from quixstreams.state import WindowedPartitionTransaction

if TYPE_CHECKING:
from quixstreams.dataframe.dataframe import StreamingDataFrame
Expand All @@ -24,11 +34,15 @@ class WindowResult(TypedDict):
value: Any


WindowKeyResult: TypeAlias = tuple[Any, WindowResult]
Message: TypeAlias = tuple[WindowResult, Any, int, Any]

WindowAggregateFunc = Callable[[Any, Any], Any]
WindowMergeFunc = Callable[[Any], Any]

TransformRecordCallbackExpandedWindowed = Callable[
[Any, Any, int, Any, WindowedState], list[tuple[WindowResult, Any, int, Any]]
[Any, Any, int, Any, WindowedPartitionTransaction],
Iterable[Message],
]


Expand All @@ -54,11 +68,11 @@ def process_window(
value: Any,
key: Any,
timestamp_ms: int,
state: WindowedState,
) -> tuple[Iterable[WindowResult], Iterable[WindowResult]]:
transaction: WindowedPartitionTransaction,
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
pass

def register_store(self):
def register_store(self) -> None:
self._dataframe.processing_context.state_manager.register_windowed_store(
topic_name=self._dataframe.topic.name, store_name=self._name
)
Expand Down Expand Up @@ -107,13 +121,21 @@ def final(self) -> "StreamingDataFrame":
"""

def window_callback(
value: Any, key: Any, timestamp_ms: int, _headers: Any, state: WindowedState
) -> list[tuple[WindowResult, Any, int, Any]]:
value: Any,
key: Any,
timestamp_ms: int,
_headers: Any,
transaction: WindowedPartitionTransaction,
) -> Iterable[Message]:
_, expired_windows = self.process_window(
value=value, key=key, timestamp_ms=timestamp_ms, state=state
value=value,
key=key,
timestamp_ms=timestamp_ms,
transaction=transaction,
)
# Use window start timestamp as a new record timestamp
return [(window, key, window["start"], None) for window in expired_windows]
for key, window in expired_windows:
yield (window, key, window["start"], None)

return self._apply_window(
func=window_callback,
Expand All @@ -139,12 +161,24 @@ def current(self) -> "StreamingDataFrame":
"""

def window_callback(
value: Any, key: Any, timestamp_ms: int, _headers: Any, state: WindowedState
) -> list[tuple[WindowResult, Any, int, Any]]:
updated_windows, _ = self.process_window(
value=value, key=key, timestamp_ms=timestamp_ms, state=state
value: Any,
key: Any,
timestamp_ms: int,
_headers: Any,
transaction: WindowedPartitionTransaction,
) -> Iterable[Message]:
updated_windows, expired_windows = self.process_window(
value=value, key=key, timestamp_ms=timestamp_ms, transaction=transaction
)
return [(window, key, window["start"], None) for window in updated_windows]

# loop over the expired_windows generator to ensure the windows
# are expired
for key, window in expired_windows:
pass

# Use window start timestamp as a new record timestamp
for key, window in updated_windows:
yield (window, key, window["start"], None)

return self._apply_window(func=window_callback, name=self._name)

Expand All @@ -167,7 +201,7 @@ def _as_windowed(
@functools.wraps(func)
def wrapper(
value: Any, key: Any, timestamp: int, headers: Any
) -> list[tuple[WindowResult, Any, int, Any]]:
) -> Iterable[Message]:
ctx = message_context()
transaction = cast(
WindowedPartitionTransaction,
Expand All @@ -181,8 +215,7 @@ def wrapper(
f"partition='{ctx.topic}[{ctx.partition}]' offset='{ctx.offset}'."
)
return _noop()
state = transaction.as_state(prefix=key)
return func(value, key, timestamp, headers, state)
return func(value, key, timestamp, headers, transaction)

return wrapper

Expand Down
31 changes: 20 additions & 11 deletions quixstreams/dataframe/windows/count_based.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from typing import TYPE_CHECKING, Any, Iterable, Optional, TypedDict

from quixstreams.state import WindowedState
from quixstreams.state import WindowedPartitionTransaction

from .base import (
Window,
WindowAggregateFunc,
WindowKeyResult,
WindowMergeFunc,
WindowResult,
default_merge_func,
Expand Down Expand Up @@ -57,8 +58,8 @@ def process_window(
value: Any,
key: Any,
timestamp_ms: int,
state: WindowedState,
) -> tuple[Iterable[WindowResult], Iterable[WindowResult]]:
transaction: WindowedPartitionTransaction,
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
"""
Count based windows are different from time based windows as we don't
have a clear indicator on when a window starts, it depends on the
Expand All @@ -79,6 +80,7 @@ def process_window(
For tumbling windows there is no window overlap so we can't rely on that
optimisation. Instead the msg id reset to 0 on every new window.
"""
state = transaction.as_state(prefix=key)
data = state.get(key=self.STATE_KEY)
if data is None:
data = CountWindowsData(windows=[])
Expand Down Expand Up @@ -136,10 +138,13 @@ def process_window(
)

expired_windows.append(
WindowResult(
start=window["start"],
end=window["end"],
value=self._merge_func(values),
(
key,
WindowResult(
start=window["start"],
end=window["end"],
value=self._merge_func(values),
),
)
)
to_remove.append(index)
Expand All @@ -156,11 +161,15 @@ def process_window(
else:
window["value"] = self._aggregate_func(window["value"], value)

result = WindowResult(
start=window["start"],
end=window["end"],
value=self._merge_func(window["value"]),
result = (
key,
WindowResult(
start=window["start"],
end=window["end"],
value=self._merge_func(window["value"]),
),
)

updated_windows.append(result)

if window["count"] >= self._max_count:
Expand Down
Loading

0 comments on commit bbfd643

Please sign in to comment.