Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support array aggregate sum function #7242

Closed
wants to merge 4 commits into from

Conversation

jayzhan211
Copy link
Contributor

@jayzhan211 jayzhan211 commented Aug 9, 2023

Which issue does this PR close?

Ref #7213 .
Ref #7214 .

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Note

@github-actions github-actions bot added logical-expr Logical plan and expressions physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 9, 2023

# TODO: Support nulls in array.
# query error DataFusion error: This feature is not implemented: Arrays with different types are not supported: \{Int64, Null\}
# select array_sum([1, null, 3, null]);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to fix this in the next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be related to #7142

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree -- I think if we add a coercion pass that tries to coerce array elements into the same type this will magically start working

@jayzhan211 jayzhan211 marked this pull request as ready for review August 10, 2023 01:48
@jayzhan211
Copy link
Contributor Author

@izveigor @alamb Ready for review, thanks 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jayzhan211 -- I didn't even know this (array aggregate functions) was a thing!

I think we should consider using the existing aggregate / accumulators rather than reimplementing aggregates for arrays and left some detailed suggestions. Let me know what you think

@@ -1495,6 +1496,26 @@ from_unixtime(expression)
- [make_list](#make_list)
- [trim_array](#trim_array)

### `array_aggregate`

Allows the execution of arbitrary existing aggregate functions on the elements of a list.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Allows the execution of arbitrary existing aggregate functions on the elements of a list.
Allows the execution of arbitrary existing aggregate function `name` on the elements of a list.

# array aggregate function
## array aggregate
query I
select array_aggregate([1, 3, 5, 7], 'sum');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow -- this is pretty neat


# TODO: Support nulls in array.
# query error DataFusion error: This feature is not implemented: Arrays with different types are not supported: \{Int64, Null\}
# select array_sum([1, null, 3, null]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree -- I think if we add a coercion pass that tries to coerce array elements into the same type this will magically start working

let func_name = args[1].as_string::<i32>().value(0);
let args = &args[0..1];
match func_name {
"sum" => array_sum(args),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you implement this in terms of an AggregateExpr and then Accumulator you could use the existing aggregate implementations. This would have several benefits:

  1. We would avoid code duplication and get all existing aggregates "for free"
  2. As new aggregates were added they would also be usable as an array aggregate
  3. They would work for non primitive types (e.g. DecimalArray, strings, etc)

The basic idea would be to look up the aggregate expr during the analysis / parsing phase, create a physical version, and then use it here to instantiate the accumulator

Maybe you could prototype this approach by simply hard coding the mapping sum --> [AggregateFunction::sum] here and using that to create an Accumulator 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @alamb. Only I want to say that array_aggregate function must only accept aggregate functions with one argument.

@jayzhan211 jayzhan211 marked this pull request as draft August 11, 2023 13:35
@jayzhan211
Copy link
Contributor Author

Thank you @jayzhan211 -- I didn't even know this (array aggregate functions) was a thing!

I think we should consider using the existing aggregate / accumulators rather than reimplementing aggregates for arrays and left some detailed suggestions. Let me know what you think

I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.

@alamb
Copy link
Contributor

alamb commented Aug 12, 2023

I also prefer to reuse existing functions, this might not be trivial but I will try to figure it out.

I can probably find time to help figure out how this might be able to work next week if that would help

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Aug 12, 2023

One of the problems I met is I would get NullArray from batch (RecordBatch) if I did not set the target partition to 1.
https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/core/src/physical_plan/aggregates/no_grouping.rs#L112-L114

CoalescePartitionsExec is the one that sent the batch with NullArray. The default partition seems to be 4. I would need to set set datafusion.execution.target_partitions = 1; in array.slt. I'm not yet fully understand why it that, not confident on whether this is the correct fix or workaround.

Exact batch I get with default partition count

  1. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "column1", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [ListArray
    [
    PrimitiveArray
    [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10,
    ],
    PrimitiveArray
    [
    11,
    12,
    13,
    14,
    15,
    16,
    17,
    18,
    19,
    20,
    ],
    PrimitiveArray
    [
    21,
    22,
    23,
    24,
    25,
    26,
    27,
    28,
    29,
    30,
    ],
    PrimitiveArray
    [
    31,
    32,
    33,
    34,
    35,
    26,
    37,
    38,
    39,
    40,
    ],
    ]], row_count: 4 }
  2. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray
    [
    null,
    ]], row_count: 1 }
  3. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray
    [
    null,
    ]], row_count: 1 }
  4. loop_batch: RecordBatch { schema: Schema { fields: [Field { name: "SUM(arrays_values_without_nulls.column1)[sum]", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray
    [
    null,
    ]], row_count: 1 }

Only the first one is expected, the other 3 are null.

The query I was testing with select array_aggregate(column1, 'sum') from arrays_values_without_nulls;

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Aug 13, 2023

Another big issue, we might need Array for aggregate function Sum so we can process array and return ArrayRef.
I try to process and return ScalarValue, but ScalarValue::List is not a good replacement for ArrayRef. We might need ColumnarValue that can accept either ScalarValue or ArrayRef for fn sum_batch and SumAccumulator.

https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/physical-expr/src/aggregate/sum.rs#L269-L300

Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?

https://github.com/apache/arrow-datafusion/blob/00627785718d9d98998021bf44585f32c33af3ea/datafusion/expr/src/accumulator.rs#L49-L63

@github-actions github-actions bot added the sql SQL Planner label Aug 13, 2023
@jayzhan211
Copy link
Contributor Author

It seems that ColumnarValue for Accumulator works! Just need to confirm this is a reasonable change.

An approach without set datafusion.execution.target_partitions = 1; has not been solved.

@alamb
Copy link
Contributor

alamb commented Aug 13, 2023

Unfortunately, we might need to change the signature in Accumulator from ScalarValue to ColumnarValue. Do you think this signature change is a good decision for accepting the Aggregate function to process Array?

I wonder if you can call Accumulator::update_batch instead? It is already possible to go from ColumnarValue --> Array via https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#method.into_array

.map(|accumulator|
{
println!("mode: {:?}", mode);
let res = accumulator.evaluate_v2();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could do

Suggested change
let res = accumulator.evaluate_v2();
let res = accumulator.evaluate()?;
res.to_array()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that I need to find a way to represent Array (Column support) with ScalarValue. So that I can get the ArrayRef from ScalarValue to_array. ScalarValue::List does not fit at all.

@jayzhan211
Copy link
Contributor Author

I miss the latest comment, let me try out that.

}
}
}
let arr = ScalarValue::iter_to_array(scalars.into_iter())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I construct Array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in this case, I was thinking sum would be called on the values of the list, not the list itself 🤔 -- could we call evaluate on each scalar 🤔

.map(|accumulator| accumulator.evaluate().map(|v| v.to_array()))
.collect::<Result<Vec<ArrayRef>>>()
.map(|accumulator| {
let column_value = accumulator.evaluate_v2();
Copy link
Contributor Author

@jayzhan211 jayzhan211 Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
If we want accumulator.evaluate() here, we might need to introduce something like ScalarValue::Array(Option<Vec<ScalarValue>>), the Vec here is to represent values of rows.

i.e. ScalarValue::Array(vec![1, 2, 3, 4])
-> Int64Array(1,2,3,4), which is a four-row value with (1,2,3,4).

With ScalarValue::List(vec![1,2,3,4])
I would get ListArray(Int64Array(1,2,3,4)), which is theone-row value with list[1,2,3,4].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::mem::size_of_val(self) - std::mem::size_of_val(sv) + sv.size()
}
// TODO: Return Correct value
ColumnarValue::Array(ref array) => 0,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to get the size of array: ArrayRef

@jayzhan211 jayzhan211 requested a review from alamb August 15, 2023 01:33
@alamb
Copy link
Contributor

alamb commented Aug 16, 2023

I am sorry @jayzhan211 -- I don't think I will have time to help work on this PR for the next few days. I will try to find time this wekeend or next week but I am too busy now with TopK and copy related work to help here too. Sorry I am spread too thin 😢

@github-actions github-actions bot added the optimizer Optimizer rules label Oct 15, 2023
@jayzhan211 jayzhan211 changed the title Support array aggregate function Support array aggregate sum function Oct 15, 2023
@jayzhan211 jayzhan211 marked this pull request as ready for review October 18, 2023 11:35
@jayzhan211
Copy link
Contributor Author

It is significantly simpler after #7352 :)

@alamb
Copy link
Contributor

alamb commented Oct 18, 2023

I have this on my review list, and I hope to start working that list down tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution @jayzhan211 and I apologize for the time it has taken to do a review.

I am concerned about the changes to sum in this PR -- I can see why you did so (to avoid code duplication) but in this case the performance is so critical I think it would be better to have a special implementation

}
}
}
let arr = ScalarValue::iter_to_array(scalars.into_iter())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in this case, I was thinking sum would be called on the values of the list, not the list itself 🤔 -- could we call evaluate on each scalar 🤔

datafusion/common/src/scalar.rs Outdated Show resolved Hide resolved
let v = self.sum.get_or_insert(T::Native::usize_as(0));
*v = v.add_wrapping(x);
// Wrap single-row input into multiple-rows input and use the same logic as multiple-rows input
let list_values = match as_list_array(&values[0]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please not ignore the error here? Each error requires a string allocation and this is the performance critical inner loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I ignored the "error" here. I use the "error" of as_list_array to know whether it is List or non-List.

Btw, Each error requires a string allocation, can you elaborate more on why there is string allocation?

@@ -59,6 +60,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;

// Translate array_aggregate to aggregate function with array argument.
if fun == BuiltinScalarFunction::ArrayAggregate {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do the rewrite here, it will only apply to SQL (so array_aggregate will not work if it is constructed via the dataframe API or an an Expr directly)

Copy link
Contributor Author

@jayzhan211 jayzhan211 Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, DataFrame use
https://github.com/apache/arrow-datafusion/blob/eee790f695a58a99e880957d50a33c1f075c8edc/datafusion/expr/src/expr_fn.rs#L153-L161

to call aggregate function. If we need to introduce AggregateFunction::ArraySum we just need to use another function that utilizes ArraySum, it will have a different path than SQL, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's raise another PR to have a common rewriting rule for SQL, Dataframe, and Expr. I would need more time to deep dive into them.

datafusion/physical-expr/src/aggregate/sum.rs Outdated Show resolved Hide resolved
@@ -167,7 +172,7 @@ impl PartialEq<dyn Any> for Sum {

/// This accumulator computes SUM incrementally
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very worried about changing how the Sum accumulator works, as the Sum accumulator is one of the performance critical aggregators.

While it would result in duplicated code, I think in this case it would make sense to have a separate accumulator implementation for array_sum given this concern

Copy link
Contributor Author

@jayzhan211 jayzhan211 Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about introducing AggregateFunction::ArraySum for array_aggreate('sum') and creating an accumulator SumAccumulator that is specialized for array cases, instead of introducing ArraySumAccumulator under the existing AggregateFunction::Sum.

We also need to do the same to other functions, e.g. avg, min, max

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative approach is to go with SumAccumulator or ArraySumAccumulator based on the data_type aka ListArray.

datafusion/common/src/scalar.rs Outdated Show resolved Hide resolved
datafusion/common/src/scalar.rs Outdated Show resolved Hide resolved
@jayzhan211 jayzhan211 marked this pull request as draft October 25, 2023 13:23
@jayzhan211 jayzhan211 marked this pull request as ready for review October 25, 2023 14:45
@jayzhan211 jayzhan211 requested a review from alamb October 25, 2023 14:45
@alamb
Copy link
Contributor

alamb commented Oct 27, 2023

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Oct 27, 2023

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

Ok

@jayzhan211 jayzhan211 marked this pull request as draft October 28, 2023 05:54
@edmondop
Copy link
Contributor

Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?

@jayzhan211
Copy link
Contributor Author

Following up my discussion on #7214 what's the status of this? Are there blockers that need to be removed / other PR that need to be worked on before this one?

I plan to merge #8141 first, then maybe review #7242 (comment).

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@jayzhan211 jayzhan211 marked this pull request as ready for review November 19, 2023 08:49
@jayzhan211
Copy link
Contributor Author

jayzhan211 commented Nov 19, 2023

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --

I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.

@alamb
Copy link
Contributor

alamb commented Nov 20, 2023

@jayzhan211 I wonder if you could start looking at the LIst / Array code and find ways to make it simpler before we begin implementing new features --
I feel like the code is at a place where doing anything is challenging to get a timely review (because the code is so complicated). Also, when making changes like #7629 results in several regressions, that is a sign to me that the existing code needs to be simplified / improved before we can add significant new features

@alamb I think ArrayAggregate is so different that cleanup existing array function does not help so much. Maybe we can move on this again? Let me know if there is any issue we should focus on before moving on ArrayAggregate.

My concern is that aggregate functions are some of the most important features in DataFusion as they are widely used and their performance is very important perspective. I am very concerned that any change in how the aggregates operate will cause issues downstream (either functionality or performance) as well as make it harder to maintain.

For this case especially I am worried about adding features for their own sake without anyone who is waiting for it

So one question I have is if anyone is waiting on this feature, and if so perhaps they can help implement / test it.

@@ -101,6 +106,43 @@ impl AggregateExpr for Sum {
}

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
if let DataType::List(field) = &self.data_type {
Copy link
Contributor Author

@jayzhan211 jayzhan211 Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I agree that we should not introduce any downgrade especially for this widely used function.

I think we can either run the comparison that ensure there is no performance downgrade for sum or we just consider the initial approach that we build array_aggreate like other array function.

In current approach, ArraySum is actually very different from Sum. We have another Accumulator for Array version. I think this is the only place that might effect the performance of current sum aggregate, where we need to differentiate Array and non-Array cases.

Actually, the initial goal that we try to done this in Accumulator "To reduce code duplication" is actually no longer true. Unless we have other reason that we need ArraySum Accumulator. Maybe we should move this back to array_expression.rs?

  1. Is compare with the performance of Sum enough to ensure there is no downgrade? Not sure what is functionality effect like.
  2. Should we have ArraySum Accumulator for other reason or feature?
  3. If we implement array aggregate function in array_expression.rs is there any concern for the overall design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we implement array aggregate function in array_expression.rs is there any concern for the overall design?

Maybe we could put it in array_aggregates.rs (array_expressions.rs is already quite substantial)

Upon further reflection I agree that it makes sense to keep ArraySum separate from the normal Sum aggregator

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
@github-actions github-actions bot added the core Core DataFusion crate label Nov 26, 2023
@jayzhan211 jayzhan211 mentioned this pull request Nov 27, 2023
@alamb
Copy link
Contributor

alamb commented Feb 2, 2024

I am trying to go through old PRs and make sure we don't lose any -- this one has not had much activity and has accumulated conflicts. Marking as draft so it isn't on the review queue. Please feel free to reopen / mark as ready for review if it is

@alamb alamb marked this pull request as draft February 2, 2024 21:50
Copy link

github-actions bot commented May 4, 2024

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label May 4, 2024
@github-actions github-actions bot closed this May 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions sql SQL Planner sqllogictest SQL Logic Tests (.slt) Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement array_aggregate function
4 participants