diff --git a/Cargo.lock b/Cargo.lock index 5ab0b8c84a563..c2e9312a24723 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1225,9 +1225,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "bytes-utils" @@ -4193,9 +4193,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-integer" @@ -6044,30 +6044,30 @@ dependencies = [ [[package]] name = "time" -version = "0.3.44" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "itoa", "num-conv", "powerfmt", - "serde", + "serde_core", "time-core", "time-macros", ] [[package]] name = "time-core" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.24" +version = "0.2.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" dependencies = [ "num-conv", "time-core", diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..b006d8ae39a9c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -802,10 +802,17 @@ impl DefaultPhysicalPlanner { )); } } - return internal_err!( + debug!( "Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences.iter().map(|s| format!("\n\t- {s}")).join("") ); + + //influx: temporarily remove error and only log so that we can find a + //reproducer in production + // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences + // .iter() + // .map(|s| format!("\n\t- {s}")) + // .join("")); } let groups = self.create_grouping_physical_expr( @@ -4207,6 +4214,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_metadata() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4227,6 +4236,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_count() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4247,6 +4258,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_name() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4268,6 +4281,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_type() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4286,6 +4301,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_nullability() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4304,6 +4321,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_field_metadata() { let logical_schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); @@ -4324,6 +4343,8 @@ digraph { } #[tokio::test] + // Ignored due to disabling the physical schema check skip. + #[ignore] async fn test_aggregate_schema_mismatch_multiple() { let logical_schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index d77129472a8ba..702688011668d 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -67,16 +67,43 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(&lhs, &rhs)?; - orderings.add_satisfied_orderings(&rhs, &lhs)?; - let orderings = orderings.build(); + let mut orderings = vec![]; + for ordering in lhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for ordering in rhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.add_constants(constants)?; eq_properties.add_orderings(orderings); + Ok(eq_properties) } @@ -122,6 +149,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -504,6 +532,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -579,6 +608,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); @@ -607,6 +637,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -658,6 +689,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e766e..267faeda0c1bb 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -35,6 +35,7 @@ use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::execution_plan::CardinalityEffect; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::utils::{ @@ -353,6 +354,8 @@ fn pushdown_requirement_to_children( Ok(None) } } + } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { + handle_aggregate_pushdown(aggregate_exec, parent_required) } else if maintains_input_order.is_empty() || !maintains_input_order.iter().any(|o| *o) || plan.as_any().is::() @@ -388,6 +391,77 @@ fn pushdown_requirement_to_children( // TODO: Add support for Projection push down } +/// Try to push sorting through [`AggregateExec`] +/// +/// `AggregateExec` only preserves the input order of its group by columns +/// (not aggregates in general, which are formed from arbitrary expressions over +/// input) +/// +/// Thus function rewrites the parent required ordering in terms of the +/// aggregate input if possible. This rewritten requirement represents the +/// ordering of the `AggregateExec`'s **input** that would also satisfy the +/// **parent** ordering. +/// +/// If no such mapping is possible (e.g. because the sort references aggregate +/// columns), returns None. +fn handle_aggregate_pushdown( + aggregate_exec: &AggregateExec, + parent_required: OrderingRequirements, +) -> Result>>> { + if !aggregate_exec + .maintains_input_order() + .into_iter() + .any(|o| o) + { + return Ok(None); + } + + let group_expr = aggregate_exec.group_expr(); + // GROUPING SETS introduce additional output columns and NULL substitutions; + // skip pushdown until we can map those cases safely. + if group_expr.has_grouping_set() { + return Ok(None); + } + + let group_input_exprs = group_expr.input_exprs(); + let parent_requirement = parent_required.into_single(); + let mut child_requirement = Vec::with_capacity(parent_requirement.len()); + + for req in parent_requirement { + // Sort above AggregateExec should reference its output columns. Map each + // output group-by column to its original input expression. + let Some(column) = req.expr.as_any().downcast_ref::() else { + return Ok(None); + }; + if column.index() >= group_input_exprs.len() { + // AggregateExec does not produce output that is sorted on aggregate + // columns so those can not be pushed through. + return Ok(None); + } + child_requirement.push(PhysicalSortRequirement::new( + Arc::clone(&group_input_exprs[column.index()]), + req.options, + )); + } + + let Some(child_requirement) = LexRequirement::new(child_requirement) else { + return Ok(None); + }; + + // Keep sort above aggregate unless input ordering already satisfies the + // mapped requirement. + if aggregate_exec + .input() + .equivalence_properties() + .ordering_satisfy_requirement(child_requirement.iter().cloned())? + { + let child_requirements = OrderingRequirements::new(child_requirement); + Ok(Some(vec![Some(child_requirements)])) + } else { + Ok(None) + } +} + /// Return true if pushing the sort requirements through a node would violate /// the input sorting requirements for the plan fn pushdown_would_violate_requirements( diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index bff33a281556d..8eb00327143dc 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ExecutionPlanProperties, get_plan_string}; use crate::PhysicalOptimizerRule; @@ -136,6 +138,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { let sort_req = sort_req.into_single(); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec39..1774a8ea95e53 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -25,6 +25,7 @@ use super::utils::{ OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning, reorder_output_after_swap, }; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ @@ -332,7 +333,7 @@ impl ExecutionPlan for CrossJoinExec { })?; if enforce_batch_size_in_joins { - Ok(Box::pin(CrossJoinStream { + Ok(Box::pin(cooperative(CrossJoinStream { schema: Arc::clone(&self.schema), left_fut, right: stream, @@ -341,9 +342,9 @@ impl ExecutionPlan for CrossJoinExec { state: CrossJoinStreamState::WaitBuildSide, left_data: RecordBatch::new_empty(self.left().schema()), batch_transformer: BatchSplitter::new(batch_size), - })) + }))) } else { - Ok(Box::pin(CrossJoinStream { + Ok(Box::pin(cooperative(CrossJoinStream { schema: Arc::clone(&self.schema), left_fut, right: stream, @@ -352,7 +353,7 @@ impl ExecutionPlan for CrossJoinExec { state: CrossJoinStreamState::WaitBuildSide, left_data: RecordBatch::new_empty(self.left().schema()), batch_transformer: NoopBatchTransformer::new(), - })) + }))) } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 91fc1ee4436ee..0a45887eff3e4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -22,6 +22,7 @@ use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; use crate::ExecutionPlanProperties; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, @@ -1061,7 +1062,7 @@ impl ExecutionPlan for HashJoinExec { .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); - Ok(Box::pin(HashJoinStream::new( + Ok(Box::pin(cooperative(HashJoinStream::new( partition, self.schema(), on_right, @@ -1079,7 +1080,7 @@ impl ExecutionPlan for HashJoinExec { self.right.output_ordering().is_some(), build_accumulator, self.mode, - ))) + )))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e35..fa435c0d815b1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -29,6 +29,7 @@ use super::utils::{ reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::joins::SharedBitmapBuilder; use crate::joins::utils::{ @@ -529,7 +530,7 @@ impl ExecutionPlan for NestedLoopJoinExec { None => self.column_indices.clone(), }; - Ok(Box::pin(NestedLoopJoinStream::new( + Ok(Box::pin(cooperative(NestedLoopJoinStream::new( self.schema(), self.filter.clone(), self.join_type, @@ -538,7 +539,7 @@ impl ExecutionPlan for NestedLoopJoinExec { column_indices_after_projection, metrics, batch_size, - ))) + )))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ae7a5fa764bcc..82929107806de 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -23,6 +23,7 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; +use crate::coop::cooperative; use crate::execution_plan::{EmissionType, boundedness_from_children}; use crate::expressions::PhysicalSortExpr; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; @@ -497,7 +498,7 @@ impl ExecutionPlan for SortMergeJoinExec { .register(context.memory_pool()); // create join stream - Ok(Box::pin(SortMergeJoinStream::try_new( + Ok(Box::pin(cooperative(SortMergeJoinStream::try_new( context.session_config().spill_compression(), Arc::clone(&self.schema), self.sort_options.clone(), @@ -512,7 +513,7 @@ impl ExecutionPlan for SortMergeJoinExec { SortMergeJoinMetrics::new(partition, &self.metrics), reservation, context.runtime_env(), - )?)) + )?))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..a54f930114c60 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -33,6 +33,7 @@ use std::task::{Context, Poll}; use std::vec; use crate::common::SharedMemoryReservation; +use crate::coop::cooperative; use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::joins::stream_join_utils::{ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, @@ -534,7 +535,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { } if enforce_batch_size_in_joins { - Ok(Box::pin(SymmetricHashJoinStream { + Ok(Box::pin(cooperative(SymmetricHashJoinStream { left_stream, right_stream, schema: self.schema(), @@ -552,9 +553,9 @@ impl ExecutionPlan for SymmetricHashJoinExec { state: SHJStreamState::PullRight, reservation, batch_transformer: BatchSplitter::new(batch_size), - })) + }))) } else { - Ok(Box::pin(SymmetricHashJoinStream { + Ok(Box::pin(cooperative(SymmetricHashJoinStream { left_stream, right_stream, schema: self.schema(), @@ -572,7 +573,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { state: SHJStreamState::PullRight, reservation, batch_transformer: NoopBatchTransformer::new(), - })) + }))) } } diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 58d9915a24be2..4148de1d62f73 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -851,6 +851,184 @@ LIMIT 3; 5 4 2 -3 +# Test 3.7: Aggregate ORDER BY expression should keep SortExec +# Source pattern declared on parquet scan: [x ASC, y ASC]. +# Requested pattern in ORDER BY: [x ASC, CAST(y AS BIGINT) % 2 ASC]. +# Example for x=1 input y order 1,2,3 gives bucket order 1,0,1, which does not +# match requested bucket ASC order. SortExec is required above AggregateExec. +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE TABLE agg_expr_data(x INT, y INT, v INT) AS VALUES +(1, 1, 10), +(1, 2, 20), +(1, 3, 30), +(2, 1, 40), +(2, 2, 50), +(2, 3, 60); + +query I +COPY (SELECT * FROM agg_expr_data ORDER BY x, y) +TO 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet'; +---- +6 + +statement ok +CREATE EXTERNAL TABLE agg_expr_parquet(x INT, y INT, v INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/agg_expr_sorted.parquet' +WITH ORDER (x ASC, y ASC); + +query TT +EXPLAIN SELECT + x, + CAST(y AS BIGINT) % 2, + SUM(v) +FROM agg_expr_parquet +GROUP BY x, CAST(y AS BIGINT) % 2 +ORDER BY x, CAST(y AS BIGINT) % 2; +---- +logical_plan +01)Sort: agg_expr_parquet.x ASC NULLS LAST, agg_expr_parquet.y % Int64(2) ASC NULLS LAST +02)--Aggregate: groupBy=[[agg_expr_parquet.x, CAST(agg_expr_parquet.y AS Int64) % Int64(2)]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]] +03)----TableScan: agg_expr_parquet projection=[x, y, v] +physical_plan +01)SortExec: expr=[x@0 ASC NULLS LAST, agg_expr_parquet.y % Int64(2)@1 ASC NULLS LAST], preserve_partitioning=[false] +02)--AggregateExec: mode=Single, gby=[x@0 as x, CAST(y@1 AS Int64) % 2 as agg_expr_parquet.y % Int64(2)], aggr=[sum(agg_expr_parquet.v)], ordering_mode=PartiallySorted([0]) +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[x, y, v], output_ordering=[x@0 ASC NULLS LAST, y@1 ASC NULLS LAST], file_type=parquet + +# Expected output pattern from ORDER BY [x, bucket]: +# rows grouped by x, and within each x bucket appears as 0 then 1. +query III +SELECT + x, + CAST(y AS BIGINT) % 2, + SUM(v) +FROM agg_expr_parquet +GROUP BY x, CAST(y AS BIGINT) % 2 +ORDER BY x, CAST(y AS BIGINT) % 2; +---- +1 0 20 +1 1 40 +2 0 50 +2 1 100 + +# Test 3.8: Aggregate ORDER BY monotonic expression can push down (no SortExec) +query TT +EXPLAIN SELECT + x, + CAST(y AS BIGINT), + SUM(v) +FROM agg_expr_parquet +GROUP BY x, CAST(y AS BIGINT) +ORDER BY x, CAST(y AS BIGINT); +---- +logical_plan +01)Sort: agg_expr_parquet.x ASC NULLS LAST, agg_expr_parquet.y ASC NULLS LAST +02)--Aggregate: groupBy=[[agg_expr_parquet.x, CAST(agg_expr_parquet.y AS Int64)]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]] +03)----TableScan: agg_expr_parquet projection=[x, y, v] +physical_plan +01)AggregateExec: mode=Single, gby=[x@0 as x, CAST(y@1 AS Int64) as agg_expr_parquet.y], aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[x, y, v], output_ordering=[x@0 ASC NULLS LAST, y@1 ASC NULLS LAST], file_type=parquet + +query III +SELECT + x, + CAST(y AS BIGINT), + SUM(v) +FROM agg_expr_parquet +GROUP BY x, CAST(y AS BIGINT) +ORDER BY x, CAST(y AS BIGINT); +---- +1 1 10 +1 2 20 +1 3 30 +2 1 40 +2 2 50 +2 3 60 + +# Test 3.9: Aggregate ORDER BY aggregate output should keep SortExec +query TT +EXPLAIN SELECT x, SUM(v) +FROM agg_expr_parquet +GROUP BY x +ORDER BY SUM(v); +---- +logical_plan +01)Sort: sum(agg_expr_parquet.v) ASC NULLS LAST +02)--Aggregate: groupBy=[[agg_expr_parquet.x]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]] +03)----TableScan: agg_expr_parquet projection=[x, v] +physical_plan +01)SortExec: expr=[sum(agg_expr_parquet.v)@1 ASC NULLS LAST], preserve_partitioning=[false] +02)--AggregateExec: mode=Single, gby=[x@0 as x], aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[x, v], output_ordering=[x@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT x, SUM(v) +FROM agg_expr_parquet +GROUP BY x +ORDER BY SUM(v); +---- +1 60 +2 150 + +# Test 3.10: Aggregate with non-preserved input order should keep SortExec +# v is not part of the order by +query TT +EXPLAIN SELECT v, SUM(y) +FROM agg_expr_parquet +GROUP BY v +ORDER BY v; +---- +logical_plan +01)Sort: agg_expr_parquet.v ASC NULLS LAST +02)--Aggregate: groupBy=[[agg_expr_parquet.v]], aggr=[[sum(CAST(agg_expr_parquet.y AS Int64))]] +03)----TableScan: agg_expr_parquet projection=[y, v] +physical_plan +01)SortExec: expr=[v@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--AggregateExec: mode=Single, gby=[v@1 as v], aggr=[sum(agg_expr_parquet.y)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[y, v], file_type=parquet + +query II +SELECT v, SUM(y) +FROM agg_expr_parquet +GROUP BY v +ORDER BY v; +---- +10 1 +20 2 +30 3 +40 1 +50 2 +60 3 + +# Test 3.11: Aggregate ORDER BY non-column expression (unsatisfied) keeps SortExec +# (though note in theory DataFusion could figure out that data sorted by x will also be sorted by x+1) +query TT +EXPLAIN SELECT x, SUM(v) +FROM agg_expr_parquet +GROUP BY x +ORDER BY x + 1 DESC; +---- +logical_plan +01)Sort: CAST(agg_expr_parquet.x AS Int64) + Int64(1) DESC NULLS FIRST +02)--Aggregate: groupBy=[[agg_expr_parquet.x]], aggr=[[sum(CAST(agg_expr_parquet.v AS Int64))]] +03)----TableScan: agg_expr_parquet projection=[x, v] +physical_plan +01)SortExec: expr=[CAST(x@0 AS Int64) + 1 DESC], preserve_partitioning=[false] +02)--AggregateExec: mode=Single, gby=[x@0 as x], aggr=[sum(agg_expr_parquet.v)], ordering_mode=Sorted +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/agg_expr_sorted.parquet]]}, projection=[x, v], output_ordering=[x@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT x, SUM(v) +FROM agg_expr_parquet +GROUP BY x +ORDER BY x + 1 DESC; +---- +2 150 +1 60 + # Cleanup statement ok DROP TABLE timestamp_data; @@ -882,5 +1060,11 @@ DROP TABLE signed_data; statement ok DROP TABLE signed_parquet; +statement ok +DROP TABLE agg_expr_data; + +statement ok +DROP TABLE agg_expr_parquet; + statement ok SET datafusion.optimizer.enable_sort_pushdown = true;