-
Notifications
You must be signed in to change notification settings - Fork 587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): add two-phase stateless simple approx percentile #17873
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 5253 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2253 | 2 | 2998 | 0 |
Click to see the invalid file list
- src/stream/src/executor/approx_percentile/global.rs
- src/stream/src/executor/keyed_merge.rs
6a0aa64
to
689ec0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 5257 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2256 | 2 | 2999 | 0 |
Click to see the invalid file list
- src/stream/src/executor/approx_percentile/global.rs
- src/stream/src/executor/keyed_merge.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 5259 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2259 | 1 | 2999 | 0 |
Click to see the invalid file list
- src/stream/src/executor/keyed_merge.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 5259 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2259 | 1 | 2999 | 0 |
Click to see the invalid file list
- src/stream/src/executor/keyed_merge.rs
c5a8387
to
b15d957
Compare
… use reverse iterator
d0e8852
to
c134477
Compare
// Just iterate over the singleton vnode. | ||
// TODO(kwannoel): Should we just use separate state tables for | ||
// positive and negative counts? | ||
// Reverse iterator is not as efficient. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I'm not totally sure about this design decision.
The alternative is to split into 3 tables:
- zeros
- neg
- pos
Then we don't have to use reverse iteration.
The current design is to group everyting in a single table, and add a sign
column.
Because everything is ascending, this means that when iterating negative values, the order will be:
- 1
- 10000
- 10000000
When we want this instead:
- 10000000
- 10000
- 1
So we will need to use a separate reverse iterator for the negative values. This approach is simpler, but may be less efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just use an empty prefix to scan the state table? The order should start from neg to pos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't work. Because we aren't just iterating on negative values.
Consider the following is in the state table (with the count
column elided)
sign | bucket_id |
---|---|
-1 | -3 |
-1 | -1 |
-1 | 1 |
-1 | 2 |
0 | 0 |
1 | 1 |
1 | 2 |
Our iteration order will be:
(-1, -3), (-1,-1), (-1, 1), (-1, 2), (0,0), (+1, 1), (+1, 2)
But we actually want
(-1, 2), (-1,1), (-1, -1), (-1, -3), (0,0), (+1, 1), (+1, 2)
Because as an example:
-(base^2) < -(base ^1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Actually, we can do a math trick here, for example, encoding bucket_id
as - bucket_id
for sign with -1
. Then we will get the order we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Actually, we can do a math trick here, for example, encoding bucket_id as - bucket_id for sign with -1. Then we will get the order we want.
That's a good idea for optimization, but I intend to keep this suggestion as a future optimization, rather than change it in this PR. Currently this is just a few lines in one location:
for keyed_row in bucket_state_table
.rev_iter_with_prefix(&[Datum::None; 0], &neg_bounds, PrefetchOptions::default())
.await?
Adding -1
sign, means at all areas where we encode / decode, we will need to add the logic to reverse it. And probably have to add comments there to explain why as well. Here we can just add some comments in a single spot.
Once we add caching, I think we won't do as much state table iter anymore, and so it will mitigate any performance hit from the reverse iteration of negative values.
If we add an abstraction over state_table and cache in the caching PR, we can consider adding this optimization there.
Bump, PTAL, the tests have passed. |
src/frontend/src/optimizer/plan_node/stream_global_approx_percentile.rs
Outdated
Show resolved
Hide resolved
@@ -95,8 +97,42 @@ impl PlanTreeNodeUnary for StreamGlobalApproxPercentile { | |||
impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile} | |||
|
|||
impl StreamNode for StreamGlobalApproxPercentile { | |||
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { | |||
todo!() | |||
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't have any shuffle between the global approx percentile and the local approx percentile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't quite get it. We do have shuffle between local -> global approx percentile. May take a look at the output inside agg.yaml
. There we have:
├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 }
│ └─StreamExchange { dist: Single }
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see it. My main branch is out of date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think StreamMerge
inputs could be changed from binary inputs to multi inputs like StreamUnion
. Maybe the in next PR.
bc3a768
to
66ecdea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#17531
We add stateless simple approx percentile executors (local and global). This is the barebones version, without any caching, and which only works when its used alone without other aggregations.
Subsequent PRs will take care of:
LocalApproxPercentile
is stateless, and just construct the buckets and corresponding counts. For negative numbers we need to get the absolute value, and log that instead. Then we pass the sign in a separate column. The schema will look like:GlobalApproxPercentile
stores the bucket_ids, prefixed with a sign. On barrier, we will iterate over all buckets, and output the approximated percentile.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
This is an experimental feature. The interface may change in the future. Only Streaming Approx percentile is supported at the moment.
Approx Percentile is an aggregation with the following interface:
percentile
refers to the percentile to approximate. For example, 50th, 90th will be0.5, 0.9
respectively.relative_error
refers to how far the approximated percentile value can stray from the actual percentile value. If unspecified, it will default to0.01
(1%).percentile_column
refers to the column we will maintain the approx percentile for. It must be of a numeric type.