Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ordering sensitive aggregation #6332

Merged
merged 30 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7411d6b
Naive test pass
mustafasrepo May 3, 2023
87c397d
Add new tests and simplifications
mustafasrepo May 3, 2023
ebd0e5f
move tests to the .slt file
mustafasrepo May 4, 2023
ecc5b0d
update requirement
mustafasrepo May 4, 2023
7093fa7
update tests
mustafasrepo May 5, 2023
e377ea9
Add support for partiallyOrdered aggregation sensitive.
mustafasrepo May 5, 2023
e0ce989
Resolve linter errors
mustafasrepo May 5, 2023
e3edf7c
Merge branch 'main' into feature/ordering_sensitive_aggregation
mustafasrepo May 5, 2023
550416e
update comments
mustafasrepo May 5, 2023
d72cc8f
minor changes
mustafasrepo May 5, 2023
713b56b
Merge branch 'main' into feature/ordering_sensitive_aggregation
mustafasrepo May 5, 2023
3dcf079
retract changes in generated
mustafasrepo May 5, 2023
a817887
update proto files
mustafasrepo May 5, 2023
740a605
Simplifications
ozankabak May 6, 2023
8209c46
Make types consistent in schema, and data
mustafasrepo May 8, 2023
d93b187
Update todos
mustafasrepo May 8, 2023
5bb8dde
Convert API to vector
mustafasrepo May 8, 2023
72985f8
Convert get_finest to handle Vector inputs
mustafasrepo May 9, 2023
4ae5c25
simplifications, update comment
mustafasrepo May 9, 2023
0a52150
Merge branch 'main' into feature/ordering_sensitive_aggregation
mustafasrepo May 10, 2023
c59ce9b
Minor code simplifications
ozankabak May 11, 2023
90aa195
Update comment
mustafasrepo May 11, 2023
9abfe65
Update documents
mustafasrepo May 11, 2023
8456a8d
Merge branch 'apache:main' into feature/ordering_sensitive_aggregation
mustafasrepo May 12, 2023
344e184
fix projection push down failure bug
mustafasrepo May 12, 2023
1d8d4ba
Simplifications, Address reviews
mustafasrepo May 15, 2023
01c8da8
Update comment
mustafasrepo May 15, 2023
c5be504
Merge branch 'main' into feature/ordering_sensitive_aggregation
mustafasrepo May 15, 2023
c23945d
Resolve linter errors
mustafasrepo May 15, 2023
7c955eb
Merge branch 'main' into feature/ordering_sensitive_aggregation
mustafasrepo May 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -421,6 +422,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -442,6 +444,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -451,6 +454,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand All @@ -471,6 +475,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -483,6 +488,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand All @@ -503,6 +509,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
source,
Arc::clone(&schema),
)?;
Expand All @@ -515,6 +522,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -546,6 +554,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -555,6 +564,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down Expand Up @@ -591,6 +601,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
filter,
Arc::clone(&schema),
)?;
Expand All @@ -600,6 +611,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
group_by: input_group_by,
aggr_expr: input_aggr_expr,
filter_expr: input_filter_expr,
order_by_expr: input_order_by_expr,
input_schema,
..
}| {
Expand All @@ -95,6 +96,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_group_by.clone(),
input_aggr_expr.to_vec(),
input_filter_expr.to_vec(),
input_order_by_expr.to_vec(),
partial_input.clone(),
input_schema.clone(),
)
Expand Down Expand Up @@ -279,6 +281,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand All @@ -298,6 +301,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
vec![],
input,
schema,
)
Expand Down
11 changes: 10 additions & 1 deletion datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
PhysicalExpr,
PhysicalExpr, PhysicalSortExpr,
};
use std::sync::Arc;

Expand Down Expand Up @@ -254,6 +254,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input,
input_schema,
..
Expand All @@ -267,6 +268,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input.clone(),
input_schema,
)?),
Expand Down Expand Up @@ -367,12 +369,14 @@ where
}
}

#[allow(clippy::too_many_arguments)]
fn reorder_aggregate_keys(
agg_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
group_by: &PhysicalGroupBy,
aggr_expr: &[Arc<dyn AggregateExpr>],
filter_expr: &[Option<Arc<dyn PhysicalExpr>>],
order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
agg_input: Arc<dyn ExecutionPlan>,
input_schema: &SchemaRef,
) -> Result<PlanWithKeyRequirements> {
Expand Down Expand Up @@ -403,6 +407,7 @@ fn reorder_aggregate_keys(
group_by,
aggr_expr,
filter_expr,
order_by_expr,
input,
input_schema,
..
Expand All @@ -422,6 +427,7 @@ fn reorder_aggregate_keys(
new_partial_group_by,
aggr_expr.clone(),
filter_expr.clone(),
order_by_expr.clone(),
input.clone(),
input_schema.clone(),
)?))
Expand Down Expand Up @@ -453,6 +459,7 @@ fn reorder_aggregate_keys(
new_group_by,
aggr_expr.to_vec(),
filter_expr.to_vec(),
order_by_expr.to_vec(),
partial_agg,
input_schema.clone(),
)?);
Expand Down Expand Up @@ -1104,12 +1111,14 @@ mod tests {
final_grouping,
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
vec![],
input,
schema.clone(),
)
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,14 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema.clone(),
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema,
)
Expand Down
Loading