Skip to content

Commit

Permalink
feat(stream): add two-phase stateless simple approx percentile (#17873)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Aug 8, 2024
1 parent e2d18c1 commit c315942
Show file tree
Hide file tree
Showing 16 changed files with 660 additions and 9 deletions.
78 changes: 78 additions & 0 deletions e2e_test/streaming/aggregate/two_phase_approx_percentile.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(0, 10) t(a);

statement ok
insert into t values(0, 1);

statement ok
flush;

statement ok
create materialized view m1 as select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01
from t;

statement ok
flush;

query I
select * from m1;
----
0

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
0

statement ok
insert into t select a, 1 from generate_series(11, 1000) t(a);

statement ok
flush;

query I
select * from m1;
----
8.93541864376352

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
9.01

query I
select approx_percentile(0.01, 0.01) within group (order by p_col) from t group by grp_col;
----
8.93541864376352

statement ok
insert into t select a, 1 from generate_series(-1000, -1) t(a);

statement ok
flush;

query I
select * from m1;
----
-982.5779489474152

query I
select approx_percentile(0.01, 0.01) within group (order by p_col) from t group by grp_col;
----
-982.5779489474152

query I
select percentile_cont(0.01) within group (order by p_col) from t;
----
-979.99

statement ok
drop materialized view m1;

statement ok
drop table t;
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,18 @@ message OverWindowNode {
OverWindowCachePolicy cache_policy = 5;
}

message LocalApproxPercentileNode {
double base = 1;
uint32 percentile_index = 2;
}

message GlobalApproxPercentileNode {
double base = 1;
double quantile = 2;
catalog.Table bucket_state_table = 3;
catalog.Table count_state_table = 4;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -833,6 +845,8 @@ message StreamNode {
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangeLogNode changelog = 143;
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ pub fn visit_stream_node_tables_inner<F>(
NodeBody::Materialize(node) if !internal_tables_only => {
always!(node.table, "Materialize")
}

NodeBody::GlobalApproxPercentile(node) => {
always!(node.bucket_state_table, "GlobalApproxPercentileBucketState");
always!(node.count_state_table, "GlobalApproxPercentileCountState");
}
_ => {}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/src/aggregate/approx_percentile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl AggregateFunction for ApproxPercentile {
// approximate quantile bucket on the fly.
async fn get_result(&self, state: &AggregateState) -> Result<Datum> {
let state = state.downcast_ref::<State>();
let quantile_count = (state.count as f64 * self.quantile) as u64;
let quantile_count = (state.count as f64 * self.quantile).floor() as u64;
let mut acc_count = 0;
for (bucket_id, count) in state.neg_buckets.iter().rev() {
acc_count += count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::GlobalApproxPercentileNode;

use crate::expr::{ExprRewriter, ExprVisitor, Literal};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, Distill};
use crate::optimizer::plan_node::utils::{childless_record, Distill, TableCatalogBuilder};
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamNode,
};
Expand Down Expand Up @@ -95,8 +97,42 @@ impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile}

impl StreamNode for StreamGlobalApproxPercentile {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
let relative_error = self.relative_error.get_data().as_ref().unwrap();
let relative_error = relative_error.as_float64().into_inner();
let base = (1.0 + relative_error) / (1.0 - relative_error);
let quantile = self.quantile.get_data().as_ref().unwrap();
let quantile = quantile.as_float64().into_inner();

// setup table: bucket_id->count
let mut bucket_table_builder = TableCatalogBuilder::default();
bucket_table_builder.add_column(&Field::with_name(DataType::Int16, "sign"));
bucket_table_builder.add_column(&Field::with_name(DataType::Int32, "bucket_id"));
bucket_table_builder.add_column(&Field::with_name(DataType::Int64, "count"));
bucket_table_builder.add_order_column(0, OrderType::ascending()); // sign
bucket_table_builder.add_order_column(1, OrderType::ascending()); // bucket_id

// setup table: total_count
let mut count_table_builder = TableCatalogBuilder::default();
count_table_builder.add_column(&Field::with_name(DataType::Int64, "total_count"));

let body = GlobalApproxPercentileNode {
base,
quantile,
bucket_state_table: Some(
bucket_table_builder
.build(vec![], 0)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
count_state_table: Some(
count_table_builder
.build(vec![], 0)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
};
PbNodeBody::GlobalApproxPercentile(body)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::LocalApproxPercentileNode;

use crate::expr::{ExprRewriter, ExprVisitor, InputRef, InputRefDisplay, Literal};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
Expand All @@ -43,11 +44,12 @@ pub struct StreamLocalApproxPercentile {
impl StreamLocalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
let schema = Schema::new(vec![
Field::with_name(DataType::Int64, "bucket_id"),
Field::with_name(DataType::Int64, "count"),
Field::with_name(DataType::Int16, "sign"),
Field::with_name(DataType::Int32, "bucket_id"),
Field::with_name(DataType::Int32, "count"),
]);
// FIXME(kwannoel): How does watermark work with FixedBitSet
let watermark_columns = FixedBitSet::with_capacity(2);
let watermark_columns = FixedBitSet::with_capacity(3);
let base = PlanBase::new_stream(
input.ctx(),
schema,
Expand Down Expand Up @@ -108,7 +110,15 @@ impl_plan_tree_node_for_unary! {StreamLocalApproxPercentile}

impl StreamNode for StreamLocalApproxPercentile {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
let relative_error = self.relative_error.get_data().as_ref().unwrap();
let relative_error = relative_error.as_float64().into_inner();
let base = (1.0 + relative_error) / (1.0 - relative_error);
let percentile_index = self.percentile_col.index() as u32;
let body = LocalApproxPercentileNode {
base,
percentile_index,
};
PbNodeBody::LocalApproxPercentile(body)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct MemTable {

#[derive(Error, Debug)]
pub enum MemTableError {
#[error("Inconsistent operation")]
#[error("Inconsistent operation {key:?}, prev: {prev:?}, new: {new:?}")]
InconsistentOperation {
key: TableKey<Bytes>,
prev: KeyOp,
Expand Down
Loading

0 comments on commit c315942

Please sign in to comment.