diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 795857b10ef5..86a8cdb7b3d4 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -397,7 +397,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], source, Arc::clone(&schema), )?; @@ -407,7 +406,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -429,7 +427,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], source, Arc::clone(&schema), )?; @@ -439,7 +436,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -460,7 +456,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], source, Arc::clone(&schema), )?; @@ -473,7 +468,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(coalesce), Arc::clone(&schema), )?; @@ -494,7 +488,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], source, Arc::clone(&schema), )?; @@ -507,7 +500,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(coalesce), Arc::clone(&schema), )?; @@ -539,7 +531,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], filter, Arc::clone(&schema), )?; @@ -549,7 +540,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(partial_agg), Arc::clone(&schema), )?; @@ -586,7 +576,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], filter, Arc::clone(&schema), )?; @@ -596,7 +585,6 @@ pub(crate) mod tests { PhysicalGroupBy::default(), vec![agg.count_expr()], vec![None], - vec![None], Arc::new(partial_agg), Arc::clone(&schema), )?; diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 0948445de20d..c50ea36b68ec 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -91,7 +91,6 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.group_by().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), - input_agg_exec.order_by_expr().to_vec(), input_agg_exec.input().clone(), input_agg_exec.input_schema(), ) @@ -277,7 +276,6 @@ mod tests { group_by, aggr_expr, vec![], - vec![], input, schema, ) @@ -297,7 +295,6 @@ mod tests { group_by, aggr_expr, vec![], - vec![], input, schema, ) @@ -458,7 +455,6 @@ mod tests { final_group_by, aggr_expr, vec![], - vec![], partial_agg, schema, ) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 93cdbf858367..f2e04989ef66 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -521,7 +521,6 @@ fn reorder_aggregate_keys( new_partial_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), - agg_exec.order_by_expr().to_vec(), agg_exec.input().clone(), agg_exec.input_schema.clone(), )?)) @@ -548,7 +547,6 @@ fn reorder_aggregate_keys( new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), - agg_exec.order_by_expr().to_vec(), partial_agg, agg_exec.input_schema(), )?); @@ -1909,14 +1907,12 @@ pub(crate) mod tests { final_grouping, vec![], vec![], - vec![], Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, vec![], vec![], - vec![], input, schema.clone(), ) diff --git a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs index 8f5dbc2e9214..540f9a6a132b 100644 --- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs @@ -55,7 +55,6 @@ impl LimitedDistinctAggregation { aggr.group_by().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), - aggr.order_by_expr().to_vec(), aggr.input().clone(), aggr.input_schema(), ) @@ -307,7 +306,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -316,7 +314,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ Arc::new(partial_agg), /* input */ schema.clone(), /* input_schema */ )?; @@ -359,7 +356,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -401,7 +397,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -443,7 +438,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string(), "b".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -452,7 +446,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ Arc::new(group_by_agg), /* input */ schema.clone(), /* input_schema */ )?; @@ -495,7 +488,6 @@ mod tests { build_group_by(&schema.clone(), vec![]), vec![], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -526,7 +518,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![agg.count_expr()], /* aggr_expr */ vec![None], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -563,7 +554,6 @@ mod tests { build_group_by(&schema.clone(), vec!["a".to_string()]), vec![], /* aggr_expr */ vec![filter_expr], /* filter_expr */ - vec![None], /* order_by_expr */ source, /* input */ schema.clone(), /* input_schema */ )?; @@ -592,22 +582,15 @@ mod tests { let source = parquet_exec_with_sort(vec![sort_key]); let schema = source.schema(); - // `SELECT a FROM MemoryExec GROUP BY a ORDER BY a LIMIT 10;`, Single AggregateExec - let order_by_expr = Some(vec![PhysicalSortExpr { - expr: expressions::col("a", &schema.clone()).unwrap(), - options: SortOptions::default(), - }]); - // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single AggregateExec // the `a > 1` filter is applied in the AggregateExec let single_agg = AggregateExec::try_new( AggregateMode::Single, build_group_by(&schema.clone(), vec!["a".to_string()]), - vec![], /* aggr_expr */ - vec![None], /* filter_expr */ - vec![order_by_expr], /* order_by_expr */ - source, /* input */ - schema.clone(), /* input_schema */ + vec![], /* aggr_expr */ + vec![None], /* filter_expr */ + source, /* input */ + schema.clone(), /* input_schema */ )?; let limit_exec = LocalLimitExec::new( Arc::new(single_agg), diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 37a76eff1ee2..678dc1f373e3 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -339,7 +339,6 @@ pub fn aggregate_exec(input: Arc) -> Arc { PhysicalGroupBy::default(), vec![], vec![], - vec![], input, schema, ) diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 52d34d4f8198..dd0261420304 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -73,7 +73,6 @@ impl TopKAggregation { aggr.group_by().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), - aggr.order_by_expr().to_vec(), aggr.input().clone(), aggr.input_schema(), ) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index ab38b3ec6d2f..364fea376add 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -799,14 +799,13 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; - let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); + let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); let initial_aggr = Arc::new(AggregateExec::try_new( AggregateMode::Partial, groups.clone(), aggregates.clone(), filters.clone(), - order_bys, input_exec, physical_input_schema.clone(), )?); @@ -824,7 +823,6 @@ impl DefaultPhysicalPlanner { // To reflect such changes to subsequent stages, use the updated // `AggregateExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - let updated_order_bys = initial_aggr.order_by_expr().to_vec(); let next_partition_mode = if can_repartition { // construct a second aggregation with 'AggregateMode::FinalPartitioned' @@ -848,7 +846,6 @@ impl DefaultPhysicalPlanner { final_grouping_set, updated_aggregates, filters, - updated_order_bys, initial_aggr, physical_input_schema.clone(), )?)) diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 821f236af87b..9069dbbd5850 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -109,7 +109,6 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str group_by.clone(), aggregate_expr.clone(), vec![None], - vec![None], running_source, schema.clone(), ) @@ -122,7 +121,6 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str group_by.clone(), aggregate_expr.clone(), vec![None], - vec![None], usual_source, schema.clone(), ) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2f69ed061ce1..c74c4ac0f821 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -279,8 +279,6 @@ pub struct AggregateExec { aggr_expr: Vec>, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, - /// (ORDER BY clause) expression for each aggregate expression - order_by_expr: Vec>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause limit: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -468,8 +466,6 @@ impl AggregateExec { group_by: PhysicalGroupBy, mut aggr_expr: Vec>, filter_expr: Vec>>, - // Ordering requirement of each aggregate expression - mut order_by_expr: Vec>, input: Arc, input_schema: SchemaRef, ) -> Result { @@ -487,10 +483,10 @@ impl AggregateExec { )); let original_schema = Arc::new(original_schema); // Reset ordering requirement to `None` if aggregator is not order-sensitive - order_by_expr = aggr_expr + let mut order_by_expr = aggr_expr .iter() - .zip(order_by_expr) - .map(|(aggr_expr, fn_reqs)| { + .map(|aggr_expr| { + let fn_reqs = aggr_expr.order_bys().map(|ordering| ordering.to_vec()); // If // - aggregation function is order-sensitive and // - aggregation is performing a "first stage" calculation, and @@ -558,7 +554,6 @@ impl AggregateExec { group_by, aggr_expr, filter_expr, - order_by_expr, input, original_schema, schema, @@ -602,11 +597,6 @@ impl AggregateExec { &self.filter_expr } - /// ORDER BY clause expression for each aggregate expression - pub fn order_by_expr(&self) -> &[Option] { - &self.order_by_expr - } - /// Input plan pub fn input(&self) -> &Arc { &self.input @@ -684,7 +674,7 @@ impl AggregateExec { return false; } // ensure there are no order by expressions - if self.order_by_expr().iter().any(|e| e.is_some()) { + if self.aggr_expr().iter().any(|e| e.order_bys().is_some()) { return false; } // ensure there is no output ordering; can this rule be relaxed? @@ -873,7 +863,6 @@ impl ExecutionPlan for AggregateExec { self.group_by.clone(), self.aggr_expr.clone(), self.filter_expr.clone(), - self.order_by_expr.clone(), children[0].clone(), self.input_schema.clone(), )?; @@ -1395,7 +1384,6 @@ mod tests { grouping_set.clone(), aggregates.clone(), vec![None], - vec![None], input, input_schema.clone(), )?); @@ -1474,7 +1462,6 @@ mod tests { final_grouping_set, aggregates, vec![None], - vec![None], merge, input_schema, )?); @@ -1540,7 +1527,6 @@ mod tests { grouping_set.clone(), aggregates.clone(), vec![None], - vec![None], input, input_schema.clone(), )?); @@ -1588,7 +1574,6 @@ mod tests { final_grouping_set, aggregates, vec![None], - vec![None], merge, input_schema, )?); @@ -1855,7 +1840,6 @@ mod tests { groups, aggregates, vec![None; 3], - vec![None; 3], input.clone(), input_schema.clone(), )?); @@ -1911,7 +1895,6 @@ mod tests { groups.clone(), aggregates.clone(), vec![None], - vec![None], blocking_exec, schema, )?); @@ -1950,7 +1933,6 @@ mod tests { groups, aggregates.clone(), vec![None], - vec![None], blocking_exec, schema, )?); @@ -2052,7 +2034,6 @@ mod tests { groups.clone(), aggregates.clone(), vec![None], - vec![Some(ordering_req.clone())], memory_exec, schema.clone(), )?); @@ -2068,7 +2049,6 @@ mod tests { groups, aggregates.clone(), vec![None], - vec![Some(ordering_req)], coalesce, schema, )?) as Arc; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 355561c36f35..37e8ffd76159 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -878,7 +878,6 @@ mod tests { build_group_by(&csv.schema().clone(), vec!["i".to_string()]), vec![], vec![None], - vec![None], csv.clone(), csv.schema().clone(), )?; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index f391592dfe76..bd8053c817e7 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1553,7 +1553,6 @@ message AggregateExecNode { repeated PhysicalExprNode null_expr = 8; repeated bool groups = 9; repeated MaybeFilter filter_expr = 10; - repeated MaybePhysicalSortExprs order_by_expr = 11; } message GlobalLimitExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index d506b5dcce53..88310be0318a 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -36,9 +36,6 @@ impl serde::Serialize for AggregateExecNode { if !self.filter_expr.is_empty() { len += 1; } - if !self.order_by_expr.is_empty() { - len += 1; - } let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExecNode", len)?; if !self.group_expr.is_empty() { struct_ser.serialize_field("groupExpr", &self.group_expr)?; @@ -72,9 +69,6 @@ impl serde::Serialize for AggregateExecNode { if !self.filter_expr.is_empty() { struct_ser.serialize_field("filterExpr", &self.filter_expr)?; } - if !self.order_by_expr.is_empty() { - struct_ser.serialize_field("orderByExpr", &self.order_by_expr)?; - } struct_ser.end() } } @@ -102,8 +96,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "groups", "filter_expr", "filterExpr", - "order_by_expr", - "orderByExpr", ]; #[allow(clippy::enum_variant_names)] @@ -118,7 +110,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { NullExpr, Groups, FilterExpr, - OrderByExpr, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -150,7 +141,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "nullExpr" | "null_expr" => Ok(GeneratedField::NullExpr), "groups" => Ok(GeneratedField::Groups), "filterExpr" | "filter_expr" => Ok(GeneratedField::FilterExpr), - "orderByExpr" | "order_by_expr" => Ok(GeneratedField::OrderByExpr), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -180,7 +170,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { let mut null_expr__ = None; let mut groups__ = None; let mut filter_expr__ = None; - let mut order_by_expr__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::GroupExpr => { @@ -243,12 +232,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { } filter_expr__ = Some(map_.next_value()?); } - GeneratedField::OrderByExpr => { - if order_by_expr__.is_some() { - return Err(serde::de::Error::duplicate_field("orderByExpr")); - } - order_by_expr__ = Some(map_.next_value()?); - } } } Ok(AggregateExecNode { @@ -262,7 +245,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { null_expr: null_expr__.unwrap_or_default(), groups: groups__.unwrap_or_default(), filter_expr: filter_expr__.unwrap_or_default(), - order_by_expr: order_by_expr__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 8aadc96349ca..3dfd3938615f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2193,8 +2193,6 @@ pub struct AggregateExecNode { pub groups: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag = "10")] pub filter_expr: ::prost::alloc::vec::Vec, - #[prost(message, repeated, tag = "11")] - pub order_by_expr: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 73091a6fced9..df01097cfa78 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -427,19 +427,6 @@ impl AsExecutionPlan for PhysicalPlanNode { .transpose() }) .collect::, _>>()?; - let physical_order_by_expr = hash_agg - .order_by_expr - .iter() - .map(|expr| { - expr.sort_expr - .iter() - .map(|e| { - parse_physical_sort_expr(e, registry, &physical_schema) - }) - .collect::>>() - .map(|exprs| (!exprs.is_empty()).then_some(exprs)) - }) - .collect::>>()?; let physical_aggr_expr: Vec> = hash_agg .aggr_expr @@ -498,7 +485,6 @@ impl AsExecutionPlan for PhysicalPlanNode { PhysicalGroupBy::new(group_expr, null_expr, groups), physical_aggr_expr, physical_filter_expr, - physical_order_by_expr, input, Arc::new(input_schema.try_into()?), )?)) @@ -1237,12 +1223,6 @@ impl AsExecutionPlan for PhysicalPlanNode { .map(|expr| expr.to_owned().try_into()) .collect::>>()?; - let order_by = exec - .order_by_expr() - .iter() - .map(|expr| expr.to_owned().try_into()) - .collect::>>()?; - let agg = exec .aggr_expr() .iter() @@ -1295,7 +1275,6 @@ impl AsExecutionPlan for PhysicalPlanNode { group_expr_name: group_names, aggr_expr: agg, filter_expr: filter, - order_by_expr: order_by, aggr_expr_name: agg_names, mode: agg_mode as i32, input: Some(Box::new(input)), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index da76209dbb49..4a512413e73e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -311,7 +311,6 @@ fn rountrip_aggregate() -> Result<()> { PhysicalGroupBy::new_single(groups.clone()), aggregates.clone(), vec![None], - vec![None], Arc::new(EmptyExec::new(schema.clone())), schema, )?)) @@ -379,7 +378,6 @@ fn roundtrip_aggregate_udaf() -> Result<()> { PhysicalGroupBy::new_single(groups.clone()), aggregates.clone(), vec![None], - vec![None], Arc::new(EmptyExec::new(schema.clone())), schema, )?), @@ -594,7 +592,6 @@ fn roundtrip_distinct_count() -> Result<()> { PhysicalGroupBy::new_single(groups), aggregates.clone(), vec![None], - vec![None], Arc::new(EmptyExec::new(schema.clone())), schema, )?))