Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session windows #243

Merged
merged 11 commits into from
Aug 14, 2023
Merged

Session windows #243

merged 11 commits into from
Aug 14, 2023

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Aug 11, 2023

This PR adds support for session windows, addressing #239.

Session windows divide time up by a minimum gap size and are useful for sessionization. For example, a session window could be defined on clickstream data to determine metrics for a particular usage period.

Session windows can be used like our existing time window functions (hop and tumble). For example:

select bid.auction, count(*), avg(bid.price), session(interval '30 seconds')
from nexmark
where bid is not null
group by bid.auction, session(interval '30 seconds')

This PR introduces only an unoptimized implementation of session windows without support for partial aggregates.


This PR also includes a large rework of how windows are handled in SQL. This was necessary because the existing implementation relied on windows being fixed width in order to determine the window output.

Previously, we would compile a window-groupby into two nodes: a window aggregation, and a merge node. The window aggregation was responsible for outputting all of the aggregate fields (in the above query, that would be the count(*)), while the subsequent merge was responsible for combining the aggregates with the group by fields. Key fields (like bid.auction) would be pulled from the key, while window fields (session(interval '30 seconds')) would be computed statically from the timestamp and the window size. However, this doesn't work with dynamically-sized windows.

Now instead each aggregation closure takes both a key and the window, and is responsible for outputting the full struct. In addition to allowing us to support non-fixed sized windows like session windows, it also saves a (non-fused) node graph and simplifies the code significantly.

Copy link
Contributor

@jacksonrnewhouse jacksonrnewhouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp used for session windows needs to be fixed. I described the behavior of that field in the comments.

pub field_names: Vec<Column>,
pub field_computations: Vec<AggregationExpression>,
pub aggregates: Vec<(Column, AggregationExpression)>,
pub group_bys: Vec<(Column, syn::Expr, TypeDef)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how this leaks generating code into this phase of the planning, but everything is complex enough right now I don't think it is worth trying to fix this right now. Will be a goal for reworking how code is generated.

arroyo-worker/src/operators/windows.rs Outdated Show resolved Hide resolved
.get_key_state('s')
.await
// I have no idea if this timestamp is correct -- this datastructure does not make sense to me
.insert(timestamp, key, windows.unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong. Given that events can arrive out of order, imagine you have a 24 hour window in the future, with gaps of 30 minutes, it all fills in at 00:00, but the last event you process is at 00:10. This'd get written with that 00:10 timestamp. Then, if you restarted at 1:00, it would expire the record of your window. This should be the max of all timestamps you've seen for the key, I believe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can compute this by maxing over the windows, or storing it along with the vec of Windows.

Copy link
Member Author

@mwylde mwylde Aug 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this works for hop/tumble windows because they're fixed width, so the watermark + the safe_retention_micros is enough to ensure we don't delete data we might need in the future.

However I'm not sure your approach would work either, since at insertion time we can't know how long we need to store it—every time the window gets extended we'd have to rewrite all of the events for that key.

Instead what do you think about just opting out of the optimized delete behavior and relying on the clear_time_range calls in handle_timer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments were for the 's' table, which gets rewritten every time we change the windows. For the window state table 's', you are correct that we can't use the expiration, so should be relying on clear_time_range.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From offline conversation:

  • For the window table 'w' we can get the correct behavior by using the max endtime of all of the windows, which ensures that we do not clear up the windows vec until all windows are finished
  • The state table 's' is more problematic; since we can't know at insertion time when the window that contains that record will close (since it can be arbitrarily extended) we can't use any watermark-based cleanup—currently the only type of deletion currently supported by the state backend. For now, we will introduce a max session window size of 24 hours; any sessions longer than that will get broken up. All state will be retained for 24 hours then deleted. As a follow on, we will add support for tombstone-based deletion and compaction in the s3 state backend.

@mwylde
Copy link
Member Author

mwylde commented Aug 14, 2023

Updated with tests for the window handling logic, fixed timestamps for the window table, and a max session size to correctly handling deletion of the state data.

@mwylde mwylde enabled auto-merge (rebase) August 14, 2023 22:06
@mwylde mwylde merged commit 944ecfb into master Aug 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants