From 6a0aa644176c48afa98c0a8080f1b0dbe3f52ab7 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 30 Jul 2024 17:57:22 +0800 Subject: [PATCH] interim commit: add local approx percentile --- .../src/executor/approx_percentile/global.rs | 0 .../src/executor/approx_percentile/local.rs | 94 +++++++++++++++++++ .../src/executor/approx_percentile/mod.rs | 15 +++ src/stream/src/executor/keyed_merge.rs | 0 src/stream/src/executor/mod.rs | 2 + 5 files changed, 111 insertions(+) create mode 100644 src/stream/src/executor/approx_percentile/global.rs create mode 100644 src/stream/src/executor/approx_percentile/local.rs create mode 100644 src/stream/src/executor/approx_percentile/mod.rs create mode 100644 src/stream/src/executor/keyed_merge.rs diff --git a/src/stream/src/executor/approx_percentile/global.rs b/src/stream/src/executor/approx_percentile/global.rs new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/stream/src/executor/approx_percentile/local.rs b/src/stream/src/executor/approx_percentile/local.rs new file mode 100644 index 0000000000000..c54b8cd42b0b9 --- /dev/null +++ b/src/stream/src/executor/approx_percentile/local.rs @@ -0,0 +1,94 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use risingwave_common::array::Op; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_pb::expr::InputRef; + +use crate::executor::prelude::*; + +pub struct LocalApproxPercentile { + _ctx: ActorContextRef, + pub input: Executor, + pub base: f64, + pub percentile_col: InputRef, + pub schema: Schema, +} + +impl LocalApproxPercentile { + pub fn new( + _ctx: ActorContextRef, + input: Executor, + base: f64, + percentile_col: InputRef, + schema: Schema, + ) -> Self { + Self { + _ctx, + input, + base, + percentile_col, + schema, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(mut self) { + let index = self.percentile_col.index as usize; + #[for_await] + for message in self.input.execute() { + match message { + Message::Chunk(chunk) => { + yield calculate_count_per_bucket(percentile_index, chunk); + } + _ => yield message, + } + } + } + + // TODO(kwannoel): Perhaps we can batch and only flush the count per bucket downstream + // once some threshold is reached / once per epoch. + // FIXME(kwannoel): handle negative values. + fn calculate_count_per_bucket(base: f64, percentile_index: usize, chunk: StreamChunk) -> Message { + let chunk = chunk.project(&[percentile_index]); + let mut counts = HashMap::new(); + for (op, row) in chunk.rows() { + let value = row.datum_at(0).unwrap(); + let value: f64 = value.as_f64(); + let bucket = value.log(base).floor() as i32; + let count = counts.entry(bucket).or_insert(0); + match op { + Op::Insert | Op::UpdateInsert => *count += 1, + Op::Delete | Op::UpdateDelete => *count -= 1, + } + } + // NOTE(kwannoel): The op here is simply ignored. + // The downstream global_approx_percentile will always just update its bucket counts. + let op = vec![Op::Insert; counts.len()]; + let mut bucket_col: Datum = Vec::with_capacity(counts.len()); + let mut count_col: Datum = Vec::with_capacity(counts.len()); + for (bucket, count) in counts { + bucket_col.push(Some(ScalarImpl::Int32(bucket))); + count_col.push(Some(ScalarImpl::Int32(count))); + } + StreamChunk::from_columns(vec![bucket_col, count_col], op) + } +} + +impl Execute for LocalApproxPercentile { + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } +} diff --git a/src/stream/src/executor/approx_percentile/mod.rs b/src/stream/src/executor/approx_percentile/mod.rs new file mode 100644 index 0000000000000..38076e72177a3 --- /dev/null +++ b/src/stream/src/executor/approx_percentile/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod local; diff --git a/src/stream/src/executor/keyed_merge.rs b/src/stream/src/executor/keyed_merge.rs new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index a1ef0691d14ef..6cd4ceff2e2ae 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -103,6 +103,8 @@ mod watermark; mod watermark_filter; mod wrapper; +mod approx_percentile; + #[cfg(test)] mod integration_tests; pub mod test_utils;