-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for conflicting order sensitive aggregates in ARRAY_AGG
aggregate function
#8558
Add support for conflicting order sensitive aggregates in ARRAY_AGG
aggregate function
#8558
Conversation
…ggregates # Conflicts: # datafusion/sqllogictest/test_files/groupby.slt
Thank you for this PR @mustafasrepo -- I have this one on my list to look at carefully over the next day or two |
I am changing this to draft to avoid any misunderstanding that this ready for merge. The code works, but we want to iterate on the design with the community to (1) avoid any performance regressions on ordinary single-aggregation cases, and (2) converge to a general/extensible design as early as possible. |
ARRAY_AGG
aggregate function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started looking at this PR but I haven't gone through the entire code in detail
I am definitely potentially worried about the size of the change to the hash aggregate stream, given how performance sensitive that code is. At the very least we should benchmark the code and make sure this PR doesn't slow it down
Goals
It would help to take a step back and review what we are trying to accomplish in the context of what is currently implemented. Here is my understanding (please correct me if I got it wrong)
As of today, on main
-
create_aggregate_expr
supports exactly three aggregate functions that are order sensitive:ARRAY_AGG
,FIRST_VALUE
andLAST_VALUE
. Order sensitive means they can have anORDER BY
specified in their arguments -
order sensitive aggregates work today is to ensure the input to the group by is sorted by group and then by aggregate ORDER BY argument. So
ARRAY_AGG(x ORDER BY x) GROUP BY y)
would result in sorting the input to the group by to bex, y)
-
The goal is to allow multiple instances of these functions to appear in a single query, which means the single resort doesn't work if there are conflicting requirements.
Potential solutions
-
Teach the HashAggregate stream about the order required for the expressions and then feed data in that order to the aggregates.
-
Update the aggregates themselves to handle the knowledge of ordeirng internally
In this case, rather than resorting the input to the hash aggregate, we could update the aggregator implementations to consider the input sort order directly, rather than relying on another operator to re-sort the data. We could use pre-sorted data as an optimization, but fall back to tracking the input internally.
So for a query like
SELECT FIRST_VALUE(a ORDER BY z) FROM t GROUP BY y
The FirstValueAccumulator
would internally could know to find the last value of z
and then get the corresponding value of a
. Note this doesn't actually require a sort, it just requires finding the max
.
This PR
This PR seems to take a hybrid approach where the hash aggregate sorts each input batch prior to passing to each order preserving accumulator, and each accumulator must then handle multiple batches that are sorted.
I think the benefit of this design is that if there are many aggregates in a query with the same sort order
FIRST_VALUE(a, ORDER BY z),
FIRST_VALUE(b, ORDER BY z),
FIRST_VALUE(c, ORDER BY z),
...
The data only needs to be sorted by z
once (per batch)
However, one potential downside is that the implementation is complicated and another downside is that sorting is not necessary for FIRST_VALUE
and LAST_VALUE
-- only the position of the max value of z
is really needed
@@ -2297,8 +2296,7 @@ Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales | |||
physical_plan | |||
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts] | |||
--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] | |||
----SortExec: expr=[amount@1 ASC NULLS LAST] | |||
------MemoryExec: partitions=1, partition_sizes=[1] | |||
----MemoryExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand this plan change. Now there is no sort but the comments above say there should be
# test_ordering_sensitive_aggregation
# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
# in the aggregation
@@ -994,6 +831,132 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { | |||
Arc::new(Schema::new(group_fields)) | |||
} | |||
|
|||
/// Determines the lexical ordering requirement for an aggregate expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this code just moved, or was it also changed?
let batch = if aggregate_group.requirement.is_empty() { | ||
batch.clone() | ||
} else { | ||
sort_batch(&batch, &aggregate_group.requirement, None)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sort the entire input to the group, or just the rows in the current batch? I think it just sorts the current batch, but it would actually need to sort the entire input for that group, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sorts the current batch. By changing the implementation of the order-sensitive aggregators, we no longer depend on the property: entire input for the group is ordered.
@mustafasrepo @ozankabak and I had a brief meeting this moring. Here are some notes I took Goal:The longer term goal is to support many more order aware normal (as opposed to window) aggregates such as various N'th value, rank functions, etc. These would be both for built in functions and user defined aggregates. In other words, this is a much larger feature than just for the current three functions. Possible Design DiscussionsWe discussed several possible designs that had different tradeoffs (largely where the complexity was):
Next steps:
|
I ran the benchmark. It seems that there is no noticable difference between main and this branch
|
Thanks @mustafasrepo -- can you also run the clickbench benchmark as well (which is very heavy on the aggregation code)? |
Sure |
By the way I wrote a design document for possible approaches. It can be found in document. It has 1 missing approach for now, I will add 3rd approach also. |
Thank you -- I will check it out tomorrow! |
I ran a benchmark for the clickbench queries. Results can be found below
|
…ggregates # Conflicts: # datafusion/physical-plan/src/aggregates/mod.rs # datafusion/sqllogictest/test_files/groupby.slt
I will try and review this carefully later today |
I am hesitant to make the Hash Aggregate stream any more complex than it already is -- so therefore I am worried about this PR. I think @mustafasrepo has shown the performance does not change, which is good. However, can you please review the contents of #8582 before proceeding with this PR. I feel like we can likely make most queries faster with less complicated code |
I agree
We will think about this over the weekend and come back to you. My preliminary thinking is Approach 3 could be a good stepping stone to Approach 2 if we feel like going direct to 2 would take too long |
Thank you
I am also happy to help implement Approach 3 (as we already have some version of the code to do first_value / last_value downstream in IOx) I agree. In terms of getting to Approach 2, I think supporting shared CTEs (aka not actually re-computing a CTE each time it is referenced in the query) is required for approach 2 and would be a valuable feature in its own right for many DataFusion users so perhaps we could plan out that project |
To avoid any confusion, we will resume this work in a different branch/repo (we are just moving code around between repos). |
Which issue does this PR close?
Closes #.
Related to #8582
Rationale for this change
Currently, order sensitive aggregation cannot work when there is incompatible ordering requirements such as following
This PR adds this functionality.
What changes are included in this PR?
With this PR we can support conflicting (e.g incompatible) ordering requirements among aggregate expressions. To support this functionality existing design is changed a bit.
With this design it is guaranteed that each order sensitive aggregator will receive ordered batches (according to its requirement) at its
update_batch
method.However, it is not guaranteed that batches are ordered globally.
As an example an order sensitive aggregator with requirement
a DESC
may receive the following batchin
update_batch
argument. Then may receive the batchin the next run. Hence we update state at each
update_batch
call with new values without any assumption of global ordering (This is the difference between previous design). At the previous design it was guaranteed thatreceived batches are ordered across different
update_batch
runs.For instance following batches received at the
update_batch
would be valid for the previous design. However, following batches wouldn't be
With new design both of these batches are valid. This relaxation enables us to support incompatible orderings. By locally sorting
RecordBatches
at the input of the aggregation for each distinct aggregate group that requires different ordering.Are these changes tested?
Yes new tests are added.
Are there any user-facing changes?