Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add two-phase stateless simple approx percentile #17873

Merged
merged 21 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions e2e_test/streaming/aggregate/two_phase_approx_percentile.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(0, 10) t(a);

statement ok
insert into t values(0, 1);

statement ok
flush;

statement ok
create materialized view m1 as select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01
from t;

statement ok
flush;

query I
select * from m1;
----
0

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
0

statement ok
insert into t select a, 1 from generate_series(11, 1000) t(a);

statement ok
flush;

query I
select * from m1;
----
8.93541864376352

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
9.01

query I
select approx_percentile(0.01, 0.01) within group (order by p_col) from t group by grp_col;
----
8.93541864376352

statement ok
insert into t select a, 1 from generate_series(-1000, -1) t(a);

statement ok
flush;

query I
select * from m1;
----
-982.5779489474152

query I
select approx_percentile(0.01, 0.01) within group (order by p_col) from t group by grp_col;
----
-982.5779489474152

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
-979.99

statement ok
drop materialized view m1;

statement ok
drop table t;
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,18 @@ message OverWindowNode {
OverWindowCachePolicy cache_policy = 5;
}

message LocalApproxPercentileNode {
double base = 1;
uint32 percentile_index = 2;
}

message GlobalApproxPercentileNode {
double base = 1;
double quantile = 2;
catalog.Table bucket_state_table = 3;
catalog.Table count_state_table = 4;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -835,6 +847,8 @@ message StreamNode {
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangeLogNode changelog = 143;
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ pub fn visit_stream_node_tables_inner<F>(
NodeBody::Materialize(node) if !internal_tables_only => {
always!(node.table, "Materialize")
}

NodeBody::GlobalApproxPercentile(node) => {
always!(node.bucket_state_table, "GlobalApproxPercentileBucketState");
always!(node.count_state_table, "GlobalApproxPercentileCountState");
}
_ => {}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/src/aggregate/approx_percentile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl AggregateFunction for ApproxPercentile {
// approximate quantile bucket on the fly.
async fn get_result(&self, state: &AggregateState) -> Result<Datum> {
let state = state.downcast_ref::<State>();
let quantile_count = (state.count as f64 * self.quantile) as u64;
let quantile_count = (state.count as f64 * self.quantile).floor() as u64;
let mut acc_count = 0;
for (bucket_id, count) in state.neg_buckets.iter().rev() {
acc_count += count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::GlobalApproxPercentileNode;

use crate::expr::{ExprRewriter, ExprVisitor, Literal};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::utils::{childless_record, Distill, TableCatalogBuilder};
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
Expand Down Expand Up @@ -95,8 +97,42 @@ impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile}

impl StreamNode for StreamGlobalApproxPercentile {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't have any shuffle between the global approx percentile and the local approx percentile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't quite get it. We do have shuffle between local -> global approx percentile. May take a look at the output inside agg.yaml. There we have:

      ├─StreamGlobalApproxPercentile { quantile: 0.8:Float64, relative_error: 0.01:Float64 }
      │ └─StreamExchange { dist: Single }
      │   └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.8:Float64, relative_error: 0.01:Float64 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see it. My main branch is out of date.

Copy link
Contributor

@chenzl25 chenzl25 Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think StreamMerge inputs could be changed from binary inputs to multi inputs like StreamUnion. Maybe the in next PR.

let relative_error = self.relative_error.get_data().as_ref().unwrap();
let relative_error = relative_error.as_float64().into_inner();
let base = (1.0 + relative_error) / (1.0 - relative_error);
let quantile = self.quantile.get_data().as_ref().unwrap();
let quantile = quantile.as_float64().into_inner();

// setup table: bucket_id->count
let mut bucket_table_builder = TableCatalogBuilder::default();
bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
bucket_table_builder.add_order_column(0, OrderType::ascending()); // sign
bucket_table_builder.add_order_column(1, OrderType::ascending()); // bucket_id

// setup table: total_count
let mut count_table_builder = TableCatalogBuilder::default();
count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));
stdrc marked this conversation as resolved.
Show resolved Hide resolved

let body = GlobalApproxPercentileNode {
base,
quantile,
bucket_state_table: Some(
bucket_table_builder
.build(vec![], 0)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
count_state_table: Some(
count_table_builder
.build(vec![], 0)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
};
PbNodeBody::GlobalApproxPercentile(body)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::LocalApproxPercentileNode;

use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
Expand All @@ -43,11 +44,12 @@ pub struct StreamLocalApproxPercentile {
impl StreamLocalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
let schema = Schema::new(vec![
Field::with_name(DataType::Int64, "bucket_id"),
Field::with_name(DataType::Int64, "count"),
Field::with_name(DataType::Int16, "sign"),
Field::with_name(DataType::Int32, "bucket_id"),
Field::with_name(DataType::Int32, "count"),
]);
// FIXME(kwannoel): How does watermark work with FixedBitSet
let watermark_columns = FixedBitSet::with_capacity(2);
let watermark_columns = FixedBitSet::with_capacity(3);
let base = PlanBase::new_stream(
input.ctx(),
schema,
Expand Down Expand Up @@ -108,7 +110,15 @@ impl_plan_tree_node_for_unary! {StreamLocalApproxPercentile}

impl StreamNode for StreamLocalApproxPercentile {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
let relative_error = self.relative_error.get_data().as_ref().unwrap();
let relative_error = relative_error.as_float64().into_inner();
let base = (1.0 + relative_error) / (1.0 - relative_error);
let percentile_index = self.percentile_col.index() as u32;
let body = LocalApproxPercentileNode {
base,
percentile_index,
};
PbNodeBody::LocalApproxPercentile(body)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct MemTable {

#[derive(Error, Debug)]
pub enum MemTableError {
#[error("Inconsistent operation")]
#[error("Inconsistent operation {key:?}, prev: {prev:?}, new: {new:?}")]
InconsistentOperation {
key: TableKey<Bytes>,
prev: KeyOp,
Expand Down
Loading
Loading