Skip to content

Commit

Permalink
Add support for ordering sensitive aggregation (#6332)
Browse files Browse the repository at this point in the history
* Naive test pass

i

* Add new tests and simplifications

* move tests to the .slt file

* update requirement

* update tests

* Add support for partiallyOrdered aggregation sensitive.

* Resolve linter errors

* update comments

* minor changes

* retract changes in generated

* update proto files

* Simplifications

* Make types consistent in schema, and data

* Update todos

* Convert API to vector

* Convert get_finest to handle Vector inputs

* simplifications, update comment

* Minor code simplifications

* Update comment

* Update documents

* fix projection push down failure bug

* Simplifications, Address reviews

* Update comment

* Resolve linter errors

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored May 15, 2023
1 parent 7563cdb commit d8a92be
Show file tree
Hide file tree
Showing 35 changed files with 977 additions and 108 deletions.
12 changes: 12 additions & 0 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -421,6 +422,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -442,6 +444,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -451,6 +454,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -471,6 +475,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -483,6 +488,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand All @@ -503,6 +509,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -515,6 +522,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -546,6 +554,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -555,6 +564,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -591,6 +601,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -600,6 +611,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
group_by: input_group_by,
aggr_expr: input_aggr_expr,
filter_expr: input_filter_expr,
order_by_expr: input_order_by_expr,
input_schema,
..
}| {
Expand All @@ -95,6 +96,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_group_by.clone(),
input_aggr_expr.to_vec(),
input_filter_expr.to_vec(),
input_order_by_expr.to_vec(),
partial_input.clone(),
input_schema.clone(),
)
Expand Down Expand Up @@ -279,6 +281,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand All @@ -298,6 +301,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand Down
11 changes: 10 additions & 1 deletion datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
PhysicalExpr,
PhysicalExpr, PhysicalSortExpr,
};
use std::sync::Arc;

Expand Down Expand Up @@ -254,6 +254,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input,
input_schema,
..
Expand All @@ -267,6 +268,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input.clone(),
input_schema,
)?),
Expand Down Expand Up @@ -367,12 +369,14 @@ where
}
}

#[allow(clippy::too_many_arguments)]
fn reorder_aggregate_keys(
agg_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
group_by: &PhysicalGroupBy,
aggr_expr: &[Arc<dyn AggregateExpr>],
filter_expr: &[Option<Arc<dyn PhysicalExpr>>],
order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
agg_input: Arc<dyn ExecutionPlan>,
input_schema: &SchemaRef,
) -> Result<PlanWithKeyRequirements> {
Expand Down Expand Up @@ -403,6 +407,7 @@ fn reorder_aggregate_keys(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input,
input_schema,
..
Expand All @@ -422,6 +427,7 @@ fn reorder_aggregate_keys(
new_partial_group_by,
aggr_expr.clone(),
filter_expr.clone(),
order_by_expr.clone(),
input.clone(),
input_schema.clone(),
)?))
Expand Down Expand Up @@ -453,6 +459,7 @@ fn reorder_aggregate_keys(
new_group_by,
aggr_expr.to_vec(),
filter_expr.to_vec(),
order_by_expr.to_vec(),
partial_agg,
input_schema.clone(),
)?);
Expand Down Expand Up @@ -1104,12 +1111,14 @@ mod tests {
final_grouping,
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
vec![],
input,
schema.clone(),
)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,14 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema.clone(),
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema,
)
Expand Down
Loading

0 comments on commit d8a92be

Please sign in to comment.