From d7c7c8460cfc01dbd26f064e576e60f2edf61faf Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 30 Sep 2025 16:50:22 +0800 Subject: [PATCH 1/5] impl SimpleStringAggAccumulator for performance --- .../functions-aggregate/src/string_agg.rs | 165 ++++++++++++++++-- 1 file changed, 148 insertions(+), 17 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 3986984b2630..90283a8dbd22 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -25,9 +25,14 @@ use crate::array_agg::ArrayAgg; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, FieldRef}; -use datafusion_common::cast::{as_generic_string_array, as_string_view_array}; -use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::cast::{ + as_generic_string_array, as_string_array, as_string_view_array, +}; +use datafusion_common::{ + internal_datafusion_err, internal_err, not_impl_err, Result, ScalarValue, +}; use datafusion_expr::function::AccumulatorArgs; +use datafusion_expr::utils::format_state_name; use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, Signature, TypeSignature, Volatility, }; @@ -120,6 +125,8 @@ impl Default for StringAgg { } } +/// If there is no `distinct` and `order by` required by the `string_agg` call, a +/// more efficient accumulator `SimpleStringAggAccumulator` will be used. impl AggregateUDFImpl for StringAgg { fn as_any(&self) -> &dyn Any { self @@ -138,7 +145,21 @@ impl AggregateUDFImpl for StringAgg { } fn state_fields(&self, args: StateFieldsArgs) -> Result> { - self.array_agg.state_fields(args) + // See comments in `impl AggregateUDFImpl ...` for more detail + let no_order_no_distinct = + (args.ordering_fields.is_empty()) && (!args.is_distinct); + if no_order_no_distinct { + // Case `SimpleStringAggAccumulator` + Ok(vec![Field::new( + format_state_name(args.name, "string_agg"), + DataType::LargeUtf8, + true, + ) + .into()]) + } else { + // Case `StringAggAccumulator` + self.array_agg.state_fields(args) + } } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -161,21 +182,31 @@ impl AggregateUDFImpl for StringAgg { ); }; - let array_agg_acc = self.array_agg.accumulator(AccumulatorArgs { - return_field: Field::new( - "f", - DataType::new_list(acc_args.return_field.data_type().clone(), true), - true, - ) - .into(), - exprs: &filter_index(acc_args.exprs, 1), - ..acc_args - })?; + // See comments in `impl AggregateUDFImpl ...` for more detail + let no_order_no_distinct = + acc_args.order_bys.is_empty() && (!acc_args.is_distinct); - Ok(Box::new(StringAggAccumulator::new( - array_agg_acc, - delimiter, - ))) + if no_order_no_distinct { + // simple case (more efficient) + Ok(Box::new(SimpleStringAggAccumulator::new(delimiter))) + } else { + // general case + let array_agg_acc = self.array_agg.accumulator(AccumulatorArgs { + return_field: Field::new( + "f", + DataType::new_list(acc_args.return_field.data_type().clone(), true), + true, + ) + .into(), + exprs: &filter_index(acc_args.exprs, 1), + ..acc_args + })?; + + Ok(Box::new(StringAggAccumulator::new( + array_agg_acc, + delimiter, + ))) + } } fn reverse_expr(&self) -> datafusion_expr::ReversedUDAF { @@ -187,6 +218,7 @@ impl AggregateUDFImpl for StringAgg { } } +/// StringAgg accumulator for the general case (with order or distinct specified) #[derive(Debug)] pub(crate) struct StringAggAccumulator { array_agg_acc: Box, @@ -269,6 +301,105 @@ fn filter_index(values: &[T], index: usize) -> Vec { .collect::>() } +/// StringAgg accumulator for the simple case (no order or distinct specified) +/// This accumulator is more efficient than `StringAggAccumulator` +#[derive(Debug)] +pub(crate) struct SimpleStringAggAccumulator { + delimiter: String, + // Updating during `update_batch()`. e.g. "foo,bar" + in_progress_string: String, + has_value: bool, +} + +impl SimpleStringAggAccumulator { + pub fn new(delimiter: &str) -> Self { + Self { + delimiter: delimiter.to_string(), + in_progress_string: "".to_string(), + has_value: false, + } + } + + #[inline] + fn append_strings<'a, I>(&mut self, iter: I) + where + I: Iterator>, + { + for value in iter { + if let Some(value) = value { + if self.has_value { + self.in_progress_string.push_str(&self.delimiter); + } + + self.in_progress_string.push_str(value); + self.has_value = true; + } + } + } +} + +impl Accumulator for SimpleStringAggAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let string_arr = values.get(0).ok_or_else(|| { + internal_datafusion_err!( + "Planner should ensure its first arg is Utf8/Utf8View" + ) + })?; + + match string_arr.data_type() { + DataType::Utf8 => { + let array = as_string_array(string_arr)?; + self.append_strings(array.iter()); + } + DataType::LargeUtf8 => { + let array = as_generic_string_array::(string_arr)?; + self.append_strings(array.iter()); + } + DataType::Utf8View => { + let array = as_string_view_array(string_arr)?; + self.append_strings(array.iter()); + } + other => { + return internal_err!( + "Planner should ensure string_agg first argument is Utf8-like, found {other}" + ); + } + } + + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let result = if self.has_value { + ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.in_progress_string))) + } else { + ScalarValue::LargeUtf8(None) + }; + + self.has_value = false; + Ok(result) + } + + fn size(&self) -> usize { + size_of_val(self) + self.delimiter.capacity() + self.in_progress_string.capacity() + } + + fn state(&mut self) -> Result> { + let result = if self.has_value { + ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.in_progress_string))) + } else { + ScalarValue::LargeUtf8(None) + }; + self.has_value = false; + + Ok(vec![result]) + } + + fn merge_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + self.update_batch(values) + } +} + #[cfg(test)] mod tests { use super::*; From 2ee945b645a7c944843f536790814a5c7aef7a5a Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Tue, 30 Sep 2025 17:42:31 +0800 Subject: [PATCH 2/5] lint --- datafusion/functions-aggregate/src/string_agg.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 90283a8dbd22..a80292e8bf7d 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -325,22 +325,20 @@ impl SimpleStringAggAccumulator { where I: Iterator>, { - for value in iter { - if let Some(value) = value { - if self.has_value { - self.in_progress_string.push_str(&self.delimiter); - } - - self.in_progress_string.push_str(value); - self.has_value = true; + for value in iter.flatten() { + if self.has_value { + self.in_progress_string.push_str(&self.delimiter); } + + self.in_progress_string.push_str(value); + self.has_value = true; } } } impl Accumulator for SimpleStringAggAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let string_arr = values.get(0).ok_or_else(|| { + let string_arr = values.first().ok_or_else(|| { internal_datafusion_err!( "Planner should ensure its first arg is Utf8/Utf8View" ) From 574ef18df75c36d9663c003744b7a330749d399b Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 1 Oct 2025 19:14:42 +0800 Subject: [PATCH 3/5] review: rename in_progress_string --> accumualted_string --- datafusion/functions-aggregate/src/string_agg.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index a80292e8bf7d..3f497c4f1ebd 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -307,7 +307,7 @@ fn filter_index(values: &[T], index: usize) -> Vec { pub(crate) struct SimpleStringAggAccumulator { delimiter: String, // Updating during `update_batch()`. e.g. "foo,bar" - in_progress_string: String, + accumulated_string: String, has_value: bool, } @@ -315,7 +315,7 @@ impl SimpleStringAggAccumulator { pub fn new(delimiter: &str) -> Self { Self { delimiter: delimiter.to_string(), - in_progress_string: "".to_string(), + accumulated_string: "".to_string(), has_value: false, } } @@ -327,10 +327,10 @@ impl SimpleStringAggAccumulator { { for value in iter.flatten() { if self.has_value { - self.in_progress_string.push_str(&self.delimiter); + self.accumulated_string.push_str(&self.delimiter); } - self.in_progress_string.push_str(value); + self.accumulated_string.push_str(value); self.has_value = true; } } @@ -369,7 +369,7 @@ impl Accumulator for SimpleStringAggAccumulator { fn evaluate(&mut self) -> Result { let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.in_progress_string))) + ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) } else { ScalarValue::LargeUtf8(None) }; @@ -379,12 +379,12 @@ impl Accumulator for SimpleStringAggAccumulator { } fn size(&self) -> usize { - size_of_val(self) + self.delimiter.capacity() + self.in_progress_string.capacity() + size_of_val(self) + self.delimiter.capacity() + self.accumulated_string.capacity() } fn state(&mut self) -> Result> { let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.in_progress_string))) + ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) } else { ScalarValue::LargeUtf8(None) }; From a041219ad1bd5a289429d75ca3fac5b130ffcdf4 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 1 Oct 2025 19:17:56 +0800 Subject: [PATCH 4/5] Update datafusion/functions-aggregate/src/string_agg.rs Co-authored-by: Vegard Stikbakke --- datafusion/functions-aggregate/src/string_agg.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 3f497c4f1ebd..3be3145417a2 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -303,6 +303,8 @@ fn filter_index(values: &[T], index: usize) -> Vec { /// StringAgg accumulator for the simple case (no order or distinct specified) /// This accumulator is more efficient than `StringAggAccumulator` +/// because it accumulates the string directly, +/// whereas `StringAggAccumulator` uses `ArrayAggAccumulator`. #[derive(Debug)] pub(crate) struct SimpleStringAggAccumulator { delimiter: String, From 2bc95c19154dd867965bcb2eaf730849a3fb5878 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Wed, 1 Oct 2025 19:18:10 +0800 Subject: [PATCH 5/5] Update datafusion/functions-aggregate/src/string_agg.rs Co-authored-by: Vegard Stikbakke --- datafusion/functions-aggregate/src/string_agg.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 3be3145417a2..a091ed34da70 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -308,7 +308,7 @@ fn filter_index(values: &[T], index: usize) -> Vec { #[derive(Debug)] pub(crate) struct SimpleStringAggAccumulator { delimiter: String, - // Updating during `update_batch()`. e.g. "foo,bar" + /// Updated during `update_batch()`. e.g. "foo,bar" accumulated_string: String, has_value: bool, }