Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Support OnWindowExpiry #32211

Closed
Tracked by #29650
lostluck opened this issue Aug 15, 2024 · 2 comments · Fixed by #33337
Closed
Tracked by #29650

[prism] Support OnWindowExpiry #32211

lostluck opened this issue Aug 15, 2024 · 2 comments · Fixed by #33337
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Aug 15, 2024

The callback for on window expiry is a special timer based callback that the runner triggers prior to garbage collection of a window's state and side inputs. This allows user code to clean up use or clean up state instead of losing it.

The feature is notably used in the GroupIntoBatches transforms for the Java and Python SDKs.

Garbage collection currently happens in a single phase, when watermarks are being updated.

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1560

We need 1: to indicate that a WindowExpiry timer needs to be sent for the stage. That'll probably be similar to the existing stateful and aggregate bools on stageState, and setter methods for the
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1052 Though we'll need the SDK given "name" for the callback, so it'll be a string field in stageState.

  1. We need to know when we may send the WindowExpiries timer messages, and for which windows. In principle this would be a timer per user key, but we only want to set/up/send those per user key at the last moment to avoid unnecessary duplication.

In principle this means that should be in startEventTimeBundle, after indicating the state should be garbage Collected. But we need to indicate windows which are ready for

So sort of like a mark and sweep algorithm for general garbage collection.

The current place we garbage collect is probably fine for both. We must "mark" the windows that have expired if they have OnWindowExpiry, and then afterwards do the current garbage collection "sweep" with the set of collectable windows. But then we'd need the sweeps to be aware of both the window and the key which is a bit heavier on memory.

Ideally we want to be able to clean up in PersistBundle, where the cleanup is just done by not storing the state back for the window of the stage in question.

But then how do we communicate that intent back and avoid too much lock contention?

What I've got so far:

  1. Indicate in stageState that it has an onwindowexpiry callback.
  2. Mark in StageState that window X needs to fire an expiry timer on watermark refresh garbage collection. Otherwise garbage collect the state as we do now.
  3. In startEventTimeBundle (and technically, readyBundle), we produce the synthetic timer elements for that, using the stageState field name. Remove the state from StageState. Remove the mark. This prevents the mark from re-appearing.
  4. Include a map of "state (window+user key) to GC by bundle"
  5. On PersistBundle do not persist the stage state for GC'd.

That's not too bad.

This ties nicely to the cleanup of state to be directly part of the bundle instead of being requested after the fact. That avoids certain read/write contentions too.

@lostluck lostluck self-assigned this Aug 15, 2024
@lostluck lostluck changed the title OnWindowExpiry [prism] Support OnWindowExpiry Aug 15, 2024
@lostluck
Copy link
Contributor Author

lostluck commented Oct 4, 2024

The initial re-write attempt required so many adjustments to how processing was happening. (~1000 lines of changes). I wonder if there isn't a simpler way?

What if we simply add a "GC timer" field to the timer element, and always sort a GC timer after non-GC timers, with the timer adding an output hold for end of window.

This should be fine until we add AllowedLateness.

It seems intuitive this isn't affected by AllowedLateness as that would only affect the inputs arriving from upstream and not within the window, which always closes the window at the end of the window from the transform's perspective.

The trick remains that we want these timers to only fire at the end of the window, right before the window state is GC'd. We don't want to leave a "gap" where we have prematurely fired the OnWindowExpiry timer, and then late data comes in setting new timers, and state.

@lostluck
Copy link
Contributor Author

New idea:

Add a side channel into the bundle generation loop, and have the garbage collection mechanism push the expiry bundles timers directly into the loop. State is move to be owned by that bundle, and there's a marker saying that this bundle is an expiry bundle, so the state doesn't need to be returned to the main store.

Avoids a gap. Does have special case plumbing. Keeps the state clean, and avoids a 2nd check through the GC handler.

lostluck added a commit to lostluck/beam that referenced this issue Dec 5, 2024
lostluck added a commit that referenced this issue Dec 5, 2024
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
lostluck added a commit to lostluck/beam that referenced this issue Dec 9, 2024
@github-actions github-actions bot added this to the 2.62.0 Release milestone Dec 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant