From 12120639f12caa2cb785b4ea05957e460ba8908d Mon Sep 17 00:00:00 2001 From: suibianwanwan Date: Tue, 3 Jun 2025 00:33:57 +0800 Subject: [PATCH 1/6] Improve performance of constant aggregate window expression --- .../physical-expr/src/window/aggregate.rs | 21 ++++++++++++++++++- .../src/window/sliding_aggregate.rs | 4 ++++ .../physical-expr/src/window/window_expr.rs | 11 +++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 9b959796136a..c8b270de03e7 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -34,7 +34,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::FieldRef; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result, ScalarValue}; -use datafusion_expr::{Accumulator, WindowFrame}; +use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A window expr that takes the form of an aggregate function. @@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr { partition_by: Vec>, order_by: LexOrdering, window_frame: Arc, + is_window_constant: bool, } impl PlainAggregateWindowExpr { @@ -56,11 +57,13 @@ impl PlainAggregateWindowExpr { order_by: &LexOrdering, window_frame: Arc, ) -> Self { + let is_window_constant = Self::is_window_constant(order_by, &window_frame); Self { aggregate, partition_by: partition_by.to_vec(), order_by: order_by.clone(), window_frame, + is_window_constant, } } @@ -85,6 +88,18 @@ impl PlainAggregateWindowExpr { ); } } + + fn is_window_constant(order_by: &LexOrdering, window_frame: &WindowFrame) -> bool { + let is_constant_bound = |bound: &WindowFrameBound| match bound { + WindowFrameBound::CurrentRow => { + window_frame.units == WindowFrameUnits::Range && order_by.is_empty() + } + _ => bound.is_unbounded(), + }; + + is_constant_bound(&window_frame.start_bound) + && is_constant_bound(&window_frame.end_bound) + } } /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns @@ -213,4 +228,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr { accumulator.evaluate() } } + + fn is_constant(&self) -> bool { + self.is_window_constant + } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 2b22299f9386..8ac8e5b3d5da 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr { accumulator.evaluate() } } + + fn is_constant(&self) -> bool { + false + } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 8d72604a6af5..1daf3af66f2b 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr { accumulator: &mut Box, ) -> Result; + /// Indicates whether this window function always produces the same result + /// for all rows in the partition. + fn is_constant(&self) -> bool; + /// Evaluates the window function against the batch. fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result { let mut accumulator = self.get_accumulator()?; @@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr { not_end: bool, ) -> Result { let values = self.evaluate_args(record_batch)?; - let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); + if self.is_constant() { + accumulator.update_batch(&values)?; + let value = accumulator.evaluate()?; + return value.to_array_of_size(record_batch.num_rows()); + } + let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); let most_recent_row_order_bys = most_recent_row .map(|batch| self.order_by_columns(batch)) .transpose()? From c4cc4ceb47b63172521bef2a6df434d0989b9ecb Mon Sep 17 00:00:00 2001 From: suibianwanwan <95014391+suibianwanwank@users.noreply.github.com> Date: Tue, 3 Jun 2025 14:21:15 +0800 Subject: [PATCH 2/6] Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Jonathan Chen --- datafusion/physical-expr/src/window/aggregate.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index c8b270de03e7..2f6daf6320de 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -89,6 +89,13 @@ impl PlainAggregateWindowExpr { } } + // Returns true if every row in the partition has the same window frame. This allows + // for preventing bound + function calculation for every row due to the values being the + // same. + // + // This occurs when both bounds fall under either condition below: + // 1. Bound is unbounded (`Preceding` or `Following`) + // 2. Bound is `CurrentRow` while using `Range` units with no order by clause fn is_window_constant(order_by: &LexOrdering, window_frame: &WindowFrame) -> bool { let is_constant_bound = |bound: &WindowFrameBound| match bound { WindowFrameBound::CurrentRow => { From cf3d1b05e50590c75cda563610d1e132a32d0acc Mon Sep 17 00:00:00 2001 From: suibianwanwan Date: Tue, 3 Jun 2025 17:21:33 +0800 Subject: [PATCH 3/6] fmt --- datafusion/physical-expr/src/window/aggregate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 2f6daf6320de..e186366a64a4 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -89,8 +89,8 @@ impl PlainAggregateWindowExpr { } } - // Returns true if every row in the partition has the same window frame. This allows - // for preventing bound + function calculation for every row due to the values being the + // Returns true if every row in the partition has the same window frame. This allows + // for preventing bound + function calculation for every row due to the values being the // same. // // This occurs when both bounds fall under either condition below: From ff1bc94ebf3ebe38ae0990f2af2cbcc97e394917 Mon Sep 17 00:00:00 2001 From: suibianwanwan <95014391+suibianwanwank@users.noreply.github.com> Date: Wed, 4 Jun 2025 21:48:02 +0800 Subject: [PATCH 4/6] Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Yongting You <2010youy01@gmail.com> --- datafusion/physical-expr/src/window/aggregate.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index e186366a64a4..f6c8cb60d11e 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -96,6 +96,8 @@ impl PlainAggregateWindowExpr { // This occurs when both bounds fall under either condition below: // 1. Bound is unbounded (`Preceding` or `Following`) // 2. Bound is `CurrentRow` while using `Range` units with no order by clause + // This results in an invalid range specification. Following PostgreSQL’s convention, + // we interpret this as the entire partition being used for the current window frame. fn is_window_constant(order_by: &LexOrdering, window_frame: &WindowFrame) -> bool { let is_constant_bound = |bound: &WindowFrameBound| match bound { WindowFrameBound::CurrentRow => { From 2e24baddb137f21927c743735204250da84392d1 Mon Sep 17 00:00:00 2001 From: suibianwanwan Date: Wed, 4 Jun 2025 21:54:21 +0800 Subject: [PATCH 5/6] Rename --- datafusion/physical-expr/src/window/aggregate.rs | 16 ++++++++++------ .../src/window/sliding_aggregate.rs | 2 +- .../physical-expr/src/window/window_expr.rs | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index f6c8cb60d11e..a82394140163 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -46,7 +46,7 @@ pub struct PlainAggregateWindowExpr { partition_by: Vec>, order_by: LexOrdering, window_frame: Arc, - is_window_constant: bool, + is_constant_in_partition: bool, } impl PlainAggregateWindowExpr { @@ -57,13 +57,14 @@ impl PlainAggregateWindowExpr { order_by: &LexOrdering, window_frame: Arc, ) -> Self { - let is_window_constant = Self::is_window_constant(order_by, &window_frame); + let is_constant_in_partition = + Self::is_window_constant_in_partition(order_by, &window_frame); Self { aggregate, partition_by: partition_by.to_vec(), order_by: order_by.clone(), window_frame, - is_window_constant, + is_constant_in_partition, } } @@ -98,7 +99,10 @@ impl PlainAggregateWindowExpr { // 2. Bound is `CurrentRow` while using `Range` units with no order by clause // This results in an invalid range specification. Following PostgreSQL’s convention, // we interpret this as the entire partition being used for the current window frame. - fn is_window_constant(order_by: &LexOrdering, window_frame: &WindowFrame) -> bool { + fn is_window_constant_in_partition( + order_by: &LexOrdering, + window_frame: &WindowFrame, + ) -> bool { let is_constant_bound = |bound: &WindowFrameBound| match bound { WindowFrameBound::CurrentRow => { window_frame.units == WindowFrameUnits::Range && order_by.is_empty() @@ -238,7 +242,7 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr { } } - fn is_constant(&self) -> bool { - self.is_window_constant + fn is_constant_in_partition(&self) -> bool { + self.is_constant_in_partition } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 8ac8e5b3d5da..09d6af748755 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -211,7 +211,7 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr { } } - fn is_constant(&self) -> bool { + fn is_constant_in_partition(&self) -> bool { false } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 1daf3af66f2b..70a73c44ae9d 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -188,7 +188,7 @@ pub trait AggregateWindowExpr: WindowExpr { /// Indicates whether this window function always produces the same result /// for all rows in the partition. - fn is_constant(&self) -> bool; + fn is_constant_in_partition(&self) -> bool; /// Evaluates the window function against the batch. fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result { @@ -277,7 +277,7 @@ pub trait AggregateWindowExpr: WindowExpr { ) -> Result { let values = self.evaluate_args(record_batch)?; - if self.is_constant() { + if self.is_constant_in_partition() { accumulator.update_batch(&values)?; let value = accumulator.evaluate()?; return value.to_array_of_size(record_batch.num_rows()); From f40020a4d4194275d0e149a2cd6b1bcc23934c72 Mon Sep 17 00:00:00 2001 From: suibianwanwan Date: Wed, 4 Jun 2025 22:00:36 +0800 Subject: [PATCH 6/6] fmt --- datafusion/physical-expr/src/window/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index a82394140163..dae0667afb25 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -57,7 +57,7 @@ impl PlainAggregateWindowExpr { order_by: &LexOrdering, window_frame: Arc, ) -> Self { - let is_constant_in_partition = + let is_constant_in_partition = Self::is_window_constant_in_partition(order_by, &window_frame); Self { aggregate,