Skip to content

Conversation

2010YOUY01
Copy link
Contributor

Which issue does this PR close?

part of #17789

Rationale for this change

string_agg is slow, see the tracking issue for details.

This PR added a new Accumulator implementation for the simple case: if there is no DISTINCT and ORDER BY like string_agg(distinct str, ',' ORDER BY str), use SimpleStringAggAccumulator instead. The original StringAggAccumulator is used for the general case with potential DISTINCT and ORDER BYs.

While @vegarsti is working on a GroupsAccumulator solution that can further speed it up potentially, I think this PR is still necessary because the single group case like select string_agg(str, ',') from t1 is still using the Accumulator interface instead of GroupsAccumulator

I haven't checked the original implementation yet, and I don't know why is it so slow. It's using array_agg internally, and memory bloat can be observed. I guess the reason is redundant transcoding, and incorrect operations on StringView buffers.
There are several ongoing work to improve arrray_agg: #17829

Benchmark

It's around 1000x faster. See the original issue for data setup, scaling factor 0.1 is used for the table.

DataFusion CLI v50.0.0
> CREATE EXTERNAL TABLE partsupp
STORED AS PARQUET
LOCATION '/Users/yongting/Code/datafusion-sqlstorm/data/partsupp.parquet';

> select ps_partkey, string_agg(ps_comment, ';')
from partsupp
group by ps_partkey;

Before: ~50s
PR: 0.05s

What changes are included in this PR?

  1. Implemented a SimpleStringAggAccumulator for string_agg
  2. In the aggregate function implementation, opt for the new accumulator if there is no DISTINCT and ORDER BY in the string_agg() aggregate function.

Are these changes tested?

Existing tests

Are there any user-facing changes?

@vegarsti
Copy link

Amazing!

Copy link

@vegarsti vegarsti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice!!

pub(crate) struct SimpleStringAggAccumulator {
delimiter: String,
// Updating during `update_batch()`. e.g. "foo,bar"
in_progress_string: String,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud about the name here: I think acc or accumulated would also be conventional. But this name is fine!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated.

size_of_val(self) + self.delimiter.capacity() + self.in_progress_string.capacity()
}

fn state(&mut self) -> Result<Vec<ScalarValue>> {
Copy link

@vegarsti vegarsti Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just asking to understand the Accumulator trait: I see that this and evaluate are the same except for what they return - what is the difference between the two and when they are used, do you know?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state is for per-partition intermediate result, and evaluate() is the final result.
e.g. for group key1, it's getting executed in 2 partitions.
partition 1:
-- [INPUT] (foo, bar) --state()--> "foo, bar"
partition 2:
-- [input] (baz) --state--> "baz"

and evaluate() is called after merge_batch to combine the above intermediates from all partitions, and get the final result "foo, bar, baz"

I think there is a detailed doc in the Accumulator interface

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! And oh yeah, I should read the doc comments on the trait!

Comment on lines +340 to +341
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let string_arr = values.first().ok_or_else(|| {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there only one element in values? That was surprising

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's an array of arg1. The arg is validated during the planning time, and we can also assume it's the right type here (values[0] is a string array)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

2010YOUY01 and others added 3 commits October 1, 2025 19:14
Co-authored-by: Vegard Stikbakke <vegard.stikbakke@gmail.com>
Co-authored-by: Vegard Stikbakke <vegard.stikbakke@gmail.com>
@2010YOUY01
Copy link
Contributor Author

CI should pass after #17855 is merged, we can re-run afterwards

@2010YOUY01 2010YOUY01 closed this Oct 1, 2025
@2010YOUY01 2010YOUY01 reopened this Oct 1, 2025
@alamb alamb added the performance Make DataFusion faster label Oct 1, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 -- I agree this is great . I left a few suggestions.

I tested doing this:

# created data
tpchgen-cli -v --tables=partsupp --format=parquet --parts=2 -s 0.1
# run query: 
time datafusion-cli -c "select ps_partkey, string_agg(ps_comment, ';') from 'partsupp' group by ps_partkey;"

main:

  • real 0m50.296s

This branch 😮

  • real 0m0.706s

Comment on lines +152 to +158
// Case `SimpleStringAggAccumulator`
Ok(vec![Field::new(
format_state_name(args.name, "string_agg"),
DataType::LargeUtf8,
true,
)
.into()])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to put this as part of SimpleStringAggAccumulator, something like

  SimpleStringAggAccumulator::state_fields(args)

Ok(Box::new(SimpleStringAggAccumulator::new(delimiter)))
} else {
// general case
let array_agg_acc = self.array_agg.accumulator(AccumulatorArgs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here for encapsulating this

pub(crate) struct SimpleStringAggAccumulator {
delimiter: String,
/// Updated during `update_batch()`. e.g. "foo,bar"
accumulated_string: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rater than has_value perhaps using an option would be better / more rust idomatic and harder to misuse

    accumulated_string: Option<String>,

/// because it accumulates the string directly,
/// whereas `StringAggAccumulator` uses `ArrayAggAccumulator`.
#[derive(Debug)]
pub(crate) struct SimpleStringAggAccumulator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is likely much better than what we have. We can probably do better still with a GroupsAccumulator as well

Comment on lines +331 to +336
if self.has_value {
self.accumulated_string.push_str(&self.delimiter);
}

self.accumulated_string.push_str(value);
self.has_value = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you used an option, this could be like

Suggested change
if self.has_value {
self.accumulated_string.push_str(&self.delimiter);
}
self.accumulated_string.push_str(value);
self.has_value = true;
if let Some(accumulated_value) = self.accumulated_value.as_mut() {
accumulated_string.push_str(&self.delimiter);
} else {
self.accumulated_valie = Some(String::from(&value))
}

self.accumulated_string.push_str(&self.delimiter);
}

self.accumulated_string.push_str(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push_str is 💪

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions Changes to functions implementation performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants