diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6539911cb9954..c8ed491ef4e16 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1079,6 +1079,21 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// Enable sort pushdown optimization. + /// When enabled, attempts to push sort requirements down to data sources + /// that can natively handle them (e.g., by reversing file/row group read order). + /// + /// Returns **inexact ordering**: Sort operator is kept for correctness, + /// but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), + /// providing significant speedup. + /// + /// Memory: No additional overhead (only changes read order). + /// + /// Future: Will add option to detect perfectly sorted data and eliminate Sort completely. + /// + /// Default: true + pub enable_sort_pushdown: bool, default = true } } diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index fe9db1975d27c..d11322cd26be9 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -32,6 +32,7 @@ mod limit_pushdown; mod limited_distinct_aggregation; mod partition_statistics; mod projection_pushdown; +mod pushdown_sort; mod replace_with_order_preserving_variants; mod sanity_checker; #[expect(clippy::needless_pass_by_value)] diff --git a/datafusion/core/tests/physical_optimizer/pushdown_sort.rs b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs new file mode 100644 index 0000000000000..f26ed2905bd40 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/pushdown_sort.rs @@ -0,0 +1,672 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for sort pushdown optimizer rule (Phase 1) +//! +//! Phase 1 tests verify that: +//! 1. Reverse scan is enabled (reverse_row_groups=true) +//! 2. SortExec is kept (because ordering is inexact) +//! 3. output_ordering remains unchanged +//! 4. Early termination is enabled for TopK queries +//! 5. Prefix matching works correctly + +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_optimizer::pushdown_sort::PushdownSort; +use datafusion_physical_optimizer::PhysicalOptimizerRule; + +use crate::physical_optimizer::test_utils::{ + coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, + parquet_exec_with_sort, repartition_exec, schema, sort_exec, sort_exec_with_fetch, + sort_expr, OptimizationTest, +}; + +#[test] +fn test_sort_pushdown_disabled() { + // When pushdown is disabled, plan should remain unchanged + let schema = schema(); + let source = parquet_exec(schema.clone()); + let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap(); + let plan = sort_exec(sort_exprs, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), false), + @r###" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "### + ); +} + +#[test] +fn test_sort_pushdown_basic_phase1() { + // Phase 1: Reverse scan enabled, Sort kept, output_ordering unchanged + let schema = schema(); + + // Source has ASC NULLS LAST ordering (default) + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request DESC NULLS LAST ordering (exact reverse) + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_sort_with_limit_phase1() { + // Phase 1: Sort with fetch enables early termination but keeps Sort + let schema = schema(); + + // Source has ASC ordering + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request DESC ordering with limit + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec_with_fetch(desc_ordering, Some(10), source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_sort_multiple_columns_phase1() { + // Phase 1: Sort on multiple columns - reverse multi-column ordering + let schema = schema(); + + // Source has [a DESC NULLS LAST, b ASC] ordering + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request [a ASC NULLS FIRST, b DESC] ordering (exact reverse) + let reverse_ordering = + LexOrdering::new(vec![a.clone().asc().nulls_first(), b.reverse()]).unwrap(); + let plan = sort_exec(reverse_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +// ============================================================================ +// PREFIX MATCHING TESTS +// ============================================================================ + +#[test] +fn test_prefix_match_single_column() { + // Test prefix matching: source has [a DESC, b ASC], query needs [a ASC] + // After reverse: [a ASC, b DESC] which satisfies [a ASC] prefix + let schema = schema(); + + // Source has [a DESC NULLS LAST, b ASC NULLS LAST] ordering + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request only [a ASC NULLS FIRST] - a prefix of the reversed ordering + let prefix_ordering = LexOrdering::new(vec![a.clone().asc().nulls_first()]).unwrap(); + let plan = sort_exec(prefix_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_prefix_match_with_limit() { + // Test prefix matching with LIMIT - important for TopK optimization + let schema = schema(); + + // Source has [a ASC, b DESC, c ASC] ordering + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let c = sort_expr("c", &schema); + let source_ordering = + LexOrdering::new(vec![a.clone(), b.clone().reverse(), c]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request [a DESC NULLS LAST, b ASC NULLS FIRST] with LIMIT 100 + // This is a prefix (2 columns) of the reversed 3-column ordering + let prefix_ordering = + LexOrdering::new(vec![a.reverse(), b.clone().asc().nulls_first()]).unwrap(); + let plan = sort_exec_with_fetch(prefix_ordering, Some(100), source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC], file_type=parquet + output: + Ok: + - SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_prefix_match_through_transparent_nodes() { + // Test prefix matching works through transparent nodes + let schema = schema(); + + // Source has [a DESC NULLS LAST, b ASC, c DESC] ordering + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let c = sort_expr("c", &schema); + let source_ordering = + LexOrdering::new(vec![a.clone().reverse(), b, c.reverse()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + let coalesce = coalesce_batches_exec(source, 1024); + let repartition = repartition_exec(coalesce); + + // Request only [a ASC NULLS FIRST] - prefix of reversed ordering + let prefix_ordering = LexOrdering::new(vec![a.clone().asc().nulls_first()]).unwrap(); + let plan = sort_exec(prefix_ordering, repartition); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_no_prefix_match_wrong_direction() { + // Test that prefix matching does NOT work if the direction is wrong + let schema = schema(); + + // Source has [a DESC, b ASC] ordering + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request [a DESC] - same direction as source, NOT a reverse prefix + let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); + let plan = sort_exec(same_direction, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r###" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet + "### + ); +} + +#[test] +fn test_no_prefix_match_longer_than_source() { + // Test that prefix matching does NOT work if requested is longer than source + let schema = schema(); + + // Source has [a DESC] ordering (single column) + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request [a ASC, b DESC] - longer than source, can't be a prefix + let longer_ordering = + LexOrdering::new(vec![a.clone().asc().nulls_first(), b.reverse()]).unwrap(); + let plan = sort_exec(longer_ordering, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r###" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet + "### + ); +} + +// ============================================================================ +// ORIGINAL TESTS +// ============================================================================ + +#[test] +fn test_sort_through_coalesce_batches() { + // Sort pushes through CoalesceBatchesExec + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + let coalesce = coalesce_batches_exec(source, 1024); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, coalesce); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_sort_through_repartition() { + // Sort should push through RepartitionExec + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + let repartition = repartition_exec(source); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, repartition); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_nested_sorts() { + // Nested sort operations - only innermost can be optimized + let schema = schema(); + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let inner_sort = sort_exec(desc_ordering, source); + + let sort_exprs2 = LexOrdering::new(vec![b]).unwrap(); + let plan = sort_exec(sort_exprs2, inner_sort); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[b@1 ASC], preserve_partitioning=[false] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[b@1 ASC], preserve_partitioning=[false] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_non_sort_plans_unchanged() { + // Plans without SortExec should pass through unchanged + let schema = schema(); + let source = parquet_exec(schema.clone()); + let plan = coalesce_batches_exec(source, 1024); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r###" + OptimizationTest: + input: + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + output: + Ok: + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "### + ); +} + +#[test] +fn test_optimizer_properties() { + // Test optimizer metadata + let optimizer = PushdownSort::new(); + + assert_eq!(optimizer.name(), "PushdownSort"); + assert!(optimizer.schema_check()); +} + +#[test] +fn test_sort_through_coalesce_partitions() { + // Sort should push through CoalescePartitionsExec + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + let repartition = repartition_exec(source); + let coalesce_parts = coalesce_partitions_exec(repartition); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, coalesce_parts); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_complex_plan_with_multiple_operators() { + // Test a complex plan with multiple operators between sort and source + let schema = schema(); + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + let coalesce_batches = coalesce_batches_exec(source, 1024); + let repartition = repartition_exec(coalesce_batches); + let coalesce_parts = coalesce_partitions_exec(repartition); + + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let plan = sort_exec(desc_ordering, coalesce_parts); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1 + - CoalesceBatchesExec: target_batch_size=1024 + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_multiple_sorts_different_columns() { + // Test nested sorts on different columns - only innermost can optimize + let schema = schema(); + let a = sort_expr("a", &schema); + let c = sort_expr("c", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // First sort by column 'a' DESC (reverse of source) + let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap(); + let sort1 = sort_exec(desc_ordering, source); + + // Then sort by column 'c' (different column, can't optimize) + let sort_exprs2 = LexOrdering::new(vec![c]).unwrap(); + let plan = sort_exec(sort_exprs2, sort1); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[c@2 ASC], preserve_partitioning=[false] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} + +#[test] +fn test_no_pushdown_for_unordered_source() { + // Verify pushdown does NOT happen for sources without ordering + let schema = schema(); + let source = parquet_exec(schema.clone()); // No output_ordering + let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap(); + let plan = sort_exec(sort_exprs, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r###" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet + "### + ); +} + +#[test] +fn test_no_pushdown_for_non_reverse_sort() { + // Verify pushdown does NOT happen when sort doesn't reverse source ordering + let schema = schema(); + + // Source sorted by 'a' ASC + let a = sort_expr("a", &schema); + let b = sort_expr("b", &schema); + let source_ordering = LexOrdering::new(vec![a]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Request sort by 'b' (different column) + let sort_exprs = LexOrdering::new(vec![b]).unwrap(); + let plan = sort_exec(sort_exprs, source); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r###" + OptimizationTest: + input: + - SortExec: expr=[b@1 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[b@1 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + "### + ); +} + +#[test] +fn test_pushdown_through_blocking_node() { + // Test that pushdown works for inner sort even when outer sort is blocked + // Structure: Sort -> Aggregate (blocks pushdown) -> Sort -> Scan + // The outer sort can't push through aggregate, but the inner sort should still optimize + use datafusion_functions_aggregate::count::count_udaf; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, + }; + use std::sync::Arc; + + let schema = schema(); + + // Bottom: DataSource with [a ASC NULLS LAST] ordering + let a = sort_expr("a", &schema); + let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]); + + // Inner Sort: [a DESC NULLS FIRST] - exact reverse, CAN push down to source + let inner_sort_ordering = LexOrdering::new(vec![a.clone().reverse()]).unwrap(); + let inner_sort = sort_exec(inner_sort_ordering, source); + + // Middle: Aggregate (blocks pushdown from outer sort) + // GROUP BY a, COUNT(b) + let group_by = PhysicalGroupBy::new_single(vec![( + Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)) as _, + "a".to_string(), + )]); + + let count_expr = Arc::new( + AggregateExprBuilder::new( + count_udaf(), + vec![ + Arc::new(datafusion_physical_expr::expressions::Column::new("b", 1)) as _, + ], + ) + .schema(Arc::clone(&schema)) + .alias("COUNT(b)") + .build() + .unwrap(), + ); + + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + group_by, + vec![count_expr], + vec![None], + inner_sort, + Arc::clone(&schema), + ) + .unwrap(), + ); + + // Outer Sort: [a ASC] - this CANNOT push down through aggregate + let outer_sort_ordering = LexOrdering::new(vec![a.clone()]).unwrap(); + let plan = sort_exec(outer_sort_ordering, aggregate); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownSort::new(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true + " + ); +} diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 1561ddf4407da..fa3e860ad36a9 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -18,7 +18,7 @@ //! Test utilities for physical optimizer tests use std::any::Any; -use std::fmt::Formatter; +use std::fmt::{Display, Formatter}; use std::sync::{Arc, LazyLock}; use arrow::array::Int32Array; @@ -33,7 +33,9 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use datafusion_common::{ColumnStatistics, JoinType, NullEquality, Result, Statistics}; +use datafusion_common::{ + internal_err, ColumnStatistics, JoinType, NullEquality, Result, Statistics, +}; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -700,3 +702,75 @@ impl TestAggregate { } } } + +/// A harness for testing physical optimizers. +#[derive(Debug)] +pub struct OptimizationTest { + input: Vec, + output: Result, String>, +} + +impl OptimizationTest { + pub fn new( + input_plan: Arc, + opt: O, + enable_sort_pushdown: bool, + ) -> Self + where + O: PhysicalOptimizerRule, + { + let input = format_execution_plan(&input_plan); + let input_schema = input_plan.schema(); + + let mut config = ConfigOptions::new(); + config.optimizer.enable_sort_pushdown = enable_sort_pushdown; + let output_result = opt.optimize(input_plan, &config); + let output = output_result + .and_then(|plan| { + if opt.schema_check() && (plan.schema() != input_schema) { + internal_err!( + "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}", + input_schema, + plan.schema() + ) + } else { + Ok(plan) + } + }) + .map(|plan| format_execution_plan(&plan)) + .map_err(|e| e.to_string()); + + Self { input, output } + } +} + +impl Display for OptimizationTest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "OptimizationTest:")?; + writeln!(f, " input:")?; + for line in &self.input { + writeln!(f, " - {line}")?; + } + writeln!(f, " output:")?; + match &self.output { + Ok(output) => { + writeln!(f, " Ok:")?; + for line in output { + writeln!(f, " - {line}")?; + } + } + Err(err) => { + writeln!(f, " Err: {err}")?; + } + } + Ok(()) + } +} + +pub fn format_execution_plan(plan: &Arc) -> Vec { + format_lines(&displayable(plan.as_ref()).indent(false).to_string()) +} + +fn format_lines(s: &str) -> Vec { + s.trim().split('\n').map(|s| s.to_string()).collect() +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 53ee597bd114c..eb4cc9e9ad5a3 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -30,6 +30,7 @@ mod page_filter; mod reader; mod row_filter; mod row_group_filter; +mod sort; pub mod source; mod writer; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 7b4db9e772fe5..f1ecc86ce878e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -51,6 +51,7 @@ use datafusion_physical_plan::metrics::{ }; use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; +use crate::sort::reverse_row_selection; #[cfg(feature = "parquet_encryption")] use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] @@ -63,7 +64,7 @@ use parquet::arrow::arrow_reader::{ }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData}; /// Implements [`FileOpener`] for a parquet file pub(super) struct ParquetOpener { @@ -115,6 +116,60 @@ pub(super) struct ParquetOpener { /// Maximum size of the predicate cache, in bytes. If none, uses /// the arrow-rs default. pub max_predicate_cache_size: Option, + /// Whether to read row groups in reverse order + pub reverse_row_groups: bool, +} + +/// Represents a prepared access plan with optional row selection +struct PreparedAccessPlan { + /// Row group indexes to read + row_group_indexes: Vec, + /// Optional row selection for filtering within row groups + row_selection: Option, +} + +impl PreparedAccessPlan { + /// Create a new prepared access plan from a ParquetAccessPlan + fn from_access_plan( + access_plan: ParquetAccessPlan, + rg_metadata: &[RowGroupMetaData], + ) -> Result { + let row_group_indexes = access_plan.row_group_indexes(); + let row_selection = access_plan.into_overall_row_selection(rg_metadata)?; + + Ok(Self { + row_group_indexes, + row_selection, + }) + } + + /// Reverse the access plan for reverse scanning + fn reverse( + mut self, + file_metadata: &parquet::file::metadata::ParquetMetaData, + ) -> Result { + // Reverse the row group indexes + self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect(); + + // If we have a row selection, reverse it to match the new row group order + if let Some(row_selection) = self.row_selection { + self.row_selection = + Some(reverse_row_selection(&row_selection, file_metadata)?); + } + + Ok(self) + } + + /// Apply this access plan to a ParquetRecordBatchStreamBuilder + fn apply_to_builder( + self, + mut builder: ParquetRecordBatchStreamBuilder>, + ) -> ParquetRecordBatchStreamBuilder> { + if let Some(row_selection) = self.row_selection { + builder = builder.with_row_selection(row_selection); + } + builder.with_row_groups(self.row_group_indexes) + } } impl FileOpener for ParquetOpener { @@ -212,6 +267,7 @@ impl FileOpener for ParquetOpener { let encryption_context = self.get_encryption_context(); let max_predicate_cache_size = self.max_predicate_cache_size; + let reverse_row_groups = self.reverse_row_groups; Ok(Box::pin(async move { #[cfg(feature = "parquet_encryption")] let file_decryption_properties = encryption_context @@ -479,13 +535,18 @@ impl FileOpener for ParquetOpener { ); } - let row_group_indexes = access_plan.row_group_indexes(); - if let Some(row_selection) = - access_plan.into_overall_row_selection(rg_metadata)? - { - builder = builder.with_row_selection(row_selection); + // Prepare the access plan (extract row groups and row selection) + let mut prepared_plan = + PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?; + + // If reverse scanning is enabled, reverse the prepared plan + if reverse_row_groups { + prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + // Apply the prepared plan to the builder + builder = prepared_plan.apply_to_builder(builder); + if let Some(limit) = limit { builder = builder.with_limit(limit) } @@ -500,7 +561,6 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_group_indexes) .with_metrics(arrow_reader_metrics.clone()) .build()?; @@ -904,6 +964,7 @@ mod test { use std::sync::Arc; use super::{ConstantColumns, constant_columns_from_stats}; + use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ @@ -925,8 +986,7 @@ mod test { use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; use parquet::arrow::ArrowWriter; - - use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener}; + use parquet::file::properties::WriterProperties; fn constant_int_stats() -> (Statistics, SchemaRef) { let schema = Arc::new(Schema::new(vec![ @@ -1028,16 +1088,54 @@ mod test { (num_batches, num_rows) } + /// Helper to collect all int32 values from the first column of batches + async fn collect_int32_values( + mut stream: std::pin::Pin< + Box< + dyn Stream> + + Send, + >, + >, + ) -> Vec { + use arrow::array::Array; + let mut values = vec![]; + while let Some(Ok(batch)) = stream.next().await { + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..array.len() { + if !array.is_null(i) { + values.push(array.value(i)); + } + } + } + values + } + async fn write_parquet( store: Arc, filename: &str, batch: arrow::record_batch::RecordBatch, + ) -> usize { + write_parquet_batches(store, filename, vec![batch], None).await + } + + /// Write multiple batches to a parquet file with optional writer properties + async fn write_parquet_batches( + store: Arc, + filename: &str, + batches: Vec, + props: Option, ) -> usize { let mut out = BytesMut::new().writer(); { - let mut writer = - ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); - writer.write(&batch).unwrap(); + let schema = batches[0].schema(); + let mut writer = ArrowWriter::try_new(&mut out, schema, props).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } writer.finish().unwrap(); } let data = out.into_inner().freeze(); @@ -1108,6 +1206,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + reverse_row_groups: false, } }; @@ -1179,6 +1278,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + reverse_row_groups: false, } }; @@ -1266,6 +1366,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + reverse_row_groups: false, } }; @@ -1356,6 +1457,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + reverse_row_groups: false, } }; @@ -1454,6 +1556,7 @@ mod test { #[cfg(feature = "parquet_encryption")] encryption_factory: None, max_predicate_cache_size: None, + reverse_row_groups: false, } }; @@ -1495,4 +1598,252 @@ mod test { assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); } + + #[tokio::test] + async fn test_reverse_scan_row_groups() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // Create multiple batches to ensure multiple row groups + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + // Write parquet file with multiple row groups + // Force small row groups by setting max_row_group_size + let props = WriterProperties::builder() + .set_max_row_group_size(3) // Force each batch into its own row group + .build(); + + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let make_opener = |reverse_scan: bool| ParquetOpener { + partition_index: 0, + projection: ProjectionExprs::from_indices(&[0], &schema), + batch_size: 1024, + limit: None, + predicate: None, + table_schema: TableSchema::from_file_schema(Arc::clone(&schema)), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new( + Arc::clone(&store), + )), + pushdown_filters: false, + reorder_filters: false, + force_filter_selections: false, + enable_page_index: false, + enable_bloom_filter: false, + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + reverse_row_groups: reverse_scan, + }; + + // Test normal scan (forward) + let opener = make_opener(false); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let forward_values = collect_int32_values(stream).await; + + // Test reverse scan + let opener = make_opener(true); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let reverse_values = collect_int32_values(stream).await; + + // The forward scan should return data in the order written + assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + + // With reverse scan, row groups are reversed, so we expect: + // Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 (1,2,3) + assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); + } + + #[tokio::test] + async fn test_reverse_scan_single_row_group() { + let store = Arc::new(InMemory::new()) as Arc; + + // Create a single batch (single row group) + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let schema = batch.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_size).unwrap(), + ); + + let make_opener = |reverse_scan: bool| ParquetOpener { + partition_index: 0, + projection: ProjectionExprs::from_indices(&[0], &schema), + batch_size: 1024, + limit: None, + predicate: None, + table_schema: TableSchema::from_file_schema(Arc::clone(&schema)), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new( + Arc::clone(&store), + )), + pushdown_filters: false, + reorder_filters: false, + force_filter_selections: false, + enable_page_index: false, + enable_bloom_filter: false, + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + reverse_row_groups: reverse_scan, + }; + + // With a single row group, forward and reverse should be the same + // (only the row group order is reversed, not the rows within) + let opener_forward = make_opener(false); + let stream_forward = opener_forward.open(file.clone()).unwrap().await.unwrap(); + let (batches_forward, _) = count_batches_and_rows(stream_forward).await; + + let opener_reverse = make_opener(true); + let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap(); + let (batches_reverse, _) = count_batches_and_rows(stream_reverse).await; + + // Both should have the same number of batches since there's only one row group + assert_eq!(batches_forward, batches_reverse); + } + + #[tokio::test] + async fn test_reverse_scan_with_row_selection() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // Create 3 batches with DIFFERENT selection patterns + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), Some(4)])) + .unwrap(); // 4 rows + let batch2 = + record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), Some(8)])) + .unwrap(); // 4 rows + let batch3 = + record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), Some(12)])) + .unwrap(); // 4 rows + + let props = WriterProperties::builder() + .set_max_row_group_size(4) + .build(); + + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + + let schema = batch1.schema(); + + use crate::ParquetAccessPlan; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; + + let mut access_plan = ParquetAccessPlan::new_all(3); + // Row group 0: skip first 2, select last 2 (should get: 3, 4) + access_plan.scan_selection( + 0, + RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]), + ); + // Row group 1: select all (should get: 5, 6, 7, 8) + // Row group 2: select first 2, skip last 2 (should get: 9, 10) + access_plan.scan_selection( + 2, + RowSelection::from(vec![RowSelector::select(2), RowSelector::skip(2)]), + ); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let make_opener = |reverse_scan: bool| ParquetOpener { + partition_index: 0, + projection: ProjectionExprs::from_indices(&[0], &schema), + batch_size: 1024, + limit: None, + predicate: None, + table_schema: TableSchema::from_file_schema(Arc::clone(&schema)), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new( + Arc::clone(&store), + )), + pushdown_filters: false, + reorder_filters: false, + force_filter_selections: false, + enable_page_index: false, + enable_bloom_filter: false, + enable_row_group_stats_pruning: false, + coerce_int96: None, + #[cfg(feature = "parquet_encryption")] + file_decryption_properties: None, + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + #[cfg(feature = "parquet_encryption")] + encryption_factory: None, + max_predicate_cache_size: None, + reverse_row_groups: reverse_scan, + }; + + // Forward scan: RG0(3,4), RG1(5,6,7,8), RG2(9,10) + let opener = make_opener(false); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); + let forward_values = collect_int32_values(stream).await; + + // Forward scan should produce: RG0(3,4), RG1(5,6,7,8), RG2(9,10) + assert_eq!( + forward_values, + vec![3, 4, 5, 6, 7, 8, 9, 10], + "Forward scan should select correct rows based on RowSelection" + ); + + // Reverse scan + // CORRECT behavior: reverse row groups AND their corresponding selections + // - RG2 is read first, WITH RG2's selection (select 2, skip 2) -> 9, 10 + // - RG1 is read second, WITH RG1's selection (select all) -> 5, 6, 7, 8 + // - RG0 is read third, WITH RG0's selection (skip 2, select 2) -> 3, 4 + let opener = make_opener(true); + let stream = opener.open(file).unwrap().await.unwrap(); + let reverse_values = collect_int32_values(stream).await; + + // Correct expected result: row groups reversed but each keeps its own selection + // RG2 with its selection (9,10), RG1 with its selection (5,6,7,8), RG0 with its selection (3,4) + assert_eq!( + reverse_values, + vec![9, 10, 5, 6, 7, 8, 3, 4], + "Reverse scan should reverse row group order while maintaining correct RowSelection for each group" + ); + } } diff --git a/datafusion/datasource-parquet/src/sort.rs b/datafusion/datasource-parquet/src/sort.rs new file mode 100644 index 0000000000000..4255d4d6960b1 --- /dev/null +++ b/datafusion/datasource-parquet/src/sort.rs @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort-related utilities for Parquet scanning + +use datafusion_common::Result; +use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; +use parquet::file::metadata::ParquetMetaData; +use std::collections::HashMap; + +/// Reverse a row selection to match reversed row group order. +/// +/// When scanning row groups in reverse order, we need to adjust the row selection +/// to account for the new ordering. This function: +/// 1. Maps each selection to its corresponding row group +/// 2. Reverses the order of row groups +/// 3. Reconstructs the row selection for the new order +/// +/// # Arguments +/// * `row_selection` - Original row selection +/// * `parquet_metadata` - Metadata containing row group information +/// +/// # Returns +/// A new `RowSelection` adjusted for reversed row group order +pub fn reverse_row_selection( + row_selection: &RowSelection, + parquet_metadata: &ParquetMetaData, +) -> Result { + let rg_metadata = parquet_metadata.row_groups(); + + // Build a mapping of row group index to its row range in the file + let mut rg_row_ranges: Vec<(usize, usize, usize)> = + Vec::with_capacity(rg_metadata.len()); + let mut current_row = 0; + for (rg_idx, rg) in rg_metadata.iter().enumerate() { + let num_rows = rg.num_rows() as usize; + rg_row_ranges.push((rg_idx, current_row, current_row + num_rows)); + current_row += num_rows; + } + + // Map selections to row groups + let mut rg_selections: HashMap> = HashMap::new(); + + let mut current_file_row = 0; + for selector in row_selection.iter() { + let selector_end = current_file_row + selector.row_count; + + // Find which row groups this selector spans + for (rg_idx, rg_start, rg_end) in rg_row_ranges.iter() { + if current_file_row < *rg_end && selector_end > *rg_start { + // This selector overlaps with this row group + let overlap_start = current_file_row.max(*rg_start); + let overlap_end = selector_end.min(*rg_end); + let overlap_count = overlap_end - overlap_start; + + if overlap_count > 0 { + let entry = rg_selections.entry(*rg_idx).or_default(); + if selector.skip { + entry.push(RowSelector::skip(overlap_count)); + } else { + entry.push(RowSelector::select(overlap_count)); + } + } + } + } + + current_file_row = selector_end; + } + + // Build new selection for reversed row group order + let mut reversed_selectors = Vec::new(); + for rg_idx in (0..rg_metadata.len()).rev() { + if let Some(selectors) = rg_selections.get(&rg_idx) { + reversed_selectors.extend(selectors.iter().cloned()); + } else { + // No specific selection for this row group means select all + if let Some((_, start, end)) = + rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx) + { + reversed_selectors.push(RowSelector::select(end - start)); + } + } + } + + Ok(RowSelection::from(reversed_selectors)) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use bytes::Bytes; + use parquet::arrow::ArrowWriter; + use parquet::file::reader::FileReader; + use parquet::file::serialized_reader::SerializedFileReader; + use std::sync::Arc; + + /// Helper function to create a ParquetMetaData with specified row group sizes + /// by actually writing a parquet file in memory + fn create_test_metadata(row_group_sizes: Vec) -> ParquetMetaData { + // Create a simple schema + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + // Create in-memory parquet file with the specified row groups + let mut buffer = Vec::new(); + { + let props = parquet::file::properties::WriterProperties::builder() + .set_max_row_group_size(row_group_sizes[0] as usize) + .build(); + + let mut writer = + ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + + for &size in &row_group_sizes { + // Create a batch with the specified number of rows + let array = arrow::array::Int32Array::from(vec![1; size as usize]); + let batch = arrow::record_batch::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(array)], + ) + .unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + } + + // Read back the metadata + let bytes = Bytes::from(buffer); + let reader = SerializedFileReader::new(bytes).unwrap(); + reader.metadata().clone() + } + + #[test] + fn test_reverse_simple_selection() { + // 3 row groups with 100 rows each + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Select first 50 rows from first row group + let selection = + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(250)]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // Verify total selected rows remain the same + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + } + + #[test] + fn test_reverse_multi_row_group_selection() { + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Select rows spanning multiple row groups + let selection = RowSelection::from(vec![ + RowSelector::skip(50), + RowSelector::select(100), // Spans RG0 and RG1 + RowSelector::skip(150), + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // Verify total selected rows remain the same + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + } + + #[test] + fn test_reverse_full_selection() { + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Select all rows + let selection = RowSelection::from(vec![RowSelector::select(300)]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // Should still select all rows, just in reversed row group order + let total_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(total_selected, 300); + } + + #[test] + fn test_reverse_empty_selection() { + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Skip all rows + let selection = RowSelection::from(vec![RowSelector::skip(300)]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // Should still skip all rows + let total_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(total_selected, 0); + } + + #[test] + fn test_reverse_with_different_row_group_sizes() { + let metadata = create_test_metadata(vec![50, 150, 100]); + + let selection = RowSelection::from(vec![ + RowSelector::skip(25), + RowSelector::select(200), // Spans all row groups + RowSelector::skip(75), + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + } + + #[test] + fn test_reverse_single_row_group() { + let metadata = create_test_metadata(vec![100]); + + let selection = + RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // With single row group, selection should remain the same + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + } + + #[test] + fn test_reverse_complex_pattern() { + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Complex pattern: select some, skip some, select some more + let selection = RowSelection::from(vec![ + RowSelector::select(30), + RowSelector::skip(40), + RowSelector::select(80), + RowSelector::skip(50), + RowSelector::select(100), + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 210); // 30 + 80 + 100 + } + + #[test] + fn test_reverse_with_skipped_row_group() { + // This test covers the "no specific selection" code path (lines 90-95) + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Select only from first and third row groups, skip middle one entirely + let selection = RowSelection::from(vec![ + RowSelector::select(50), // First 50 of RG0 + RowSelector::skip(150), // Rest of RG0 + all of RG1 + half of RG2 + RowSelector::select(50), // Last 50 of RG2 + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + // Verify total selected rows remain the same + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 100); // 50 + 50 + } + + #[test] + fn test_reverse_middle_row_group_only() { + // Another test to ensure skipped row groups are handled correctly + let metadata = create_test_metadata(vec![100, 100, 100]); + + // Select only middle row group + let selection = RowSelection::from(vec![ + RowSelector::skip(100), // Skip RG0 + RowSelector::select(100), // Select all of RG1 + RowSelector::skip(100), // Skip RG2 + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 100); + } + + #[test] + fn test_reverse_alternating_row_groups() { + // Test with more complex skipping pattern + let metadata = create_test_metadata(vec![100, 100, 100, 100]); + + // Select first and third row groups, skip second and fourth + let selection = RowSelection::from(vec![ + RowSelector::select(100), // RG0 + RowSelector::skip(100), // RG1 + RowSelector::select(100), // RG2 + RowSelector::skip(100), // RG3 + ]); + + let reversed = reverse_row_selection(&selection, &metadata).unwrap(); + + let original_selected: usize = selection + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + let reversed_selected: usize = reversed + .iter() + .filter(|s| !s.skip) + .map(|s| s.row_count) + .sum(); + + assert_eq!(original_selected, reversed_selected); + assert_eq!(original_selected, 200); + } +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 5caaa1c4747de..4956f83effcc7 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -44,6 +44,7 @@ use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::{ FilterPushdownPropagation, PushedDownPredicate, @@ -53,6 +54,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use itertools::Itertools; use object_store::ObjectStore; #[cfg(feature = "parquet_encryption")] @@ -287,6 +289,14 @@ pub struct ParquetSource { pub(crate) projection: ProjectionExprs, #[cfg(feature = "parquet_encryption")] pub(crate) encryption_factory: Option>, + /// The ordering of data within the files + /// This is set by FileScanConfig when it knows the file ordering + file_ordering: Option, + /// If true, read files in reverse order and reverse row groups within files. + /// But it's not guaranteed that rows within row groups are in reverse order, + /// so we still need to sort them after reading, so the reverse scan is inexact. + /// Used to optimize ORDER BY ... DESC on sorted data. + reverse_row_groups: bool, } impl ParquetSource { @@ -311,6 +321,8 @@ impl ParquetSource { metadata_size_hint: None, #[cfg(feature = "parquet_encryption")] encryption_factory: None, + file_ordering: None, + reverse_row_groups: false, } } @@ -386,6 +398,12 @@ impl ParquetSource { self } + /// If set, indicates the ordering of data within the files being read. + pub fn with_file_ordering(mut self, ordering: Option) -> Self { + self.file_ordering = ordering; + self + } + /// Return the value described in [`Self::with_pushdown_filters`] pub(crate) fn pushdown_filters(&self) -> bool { self.table_parquet_options.global.pushdown_filters @@ -465,6 +483,15 @@ impl ParquetSource { )), } } + + pub(crate) fn with_reverse_row_groups(mut self, reverse_row_groups: bool) -> Self { + self.reverse_row_groups = reverse_row_groups; + self + } + #[cfg(test)] + pub(crate) fn reverse_row_groups(&self) -> bool { + self.reverse_row_groups + } } /// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit @@ -550,6 +577,7 @@ impl FileSource for ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), + reverse_row_groups: self.reverse_row_groups, }); Ok(opener) } @@ -603,6 +631,11 @@ impl FileSource for ParquetSource { write!(f, "{predicate_string}")?; + // Add reverse_scan info if enabled + if self.reverse_row_groups { + write!(f, ", reverse_row_groups=true")?; + } + // Try to build a the pruning predicates. // These are only generated here because it's useful to have *some* // idea of what pushdown is happening when viewing plans. @@ -710,6 +743,72 @@ impl FileSource for ParquetSource { ) .with_updated_node(source)) } + + /// Try to optimize the scan to produce data in the requested sort order. + /// + /// This method receives: + /// 1. The query's required ordering (`order` parameter) + /// 2. The file's natural ordering (via `self.file_ordering`, set by FileScanConfig) + /// + /// With both pieces of information, ParquetSource can decide what optimizations to apply. + /// + /// # Phase 1 Behavior (Current) + /// Returns `Inexact` when reversing the row group scan order would help satisfy the + /// requested ordering. We still need a Sort operator at a higher level because: + /// - We only reverse row group read order, not rows within row groups + /// - This provides approximate ordering that benefits limit pushdown + /// + /// # Phase 2 (Future) + /// Could return `Exact` when we can guarantee perfect ordering through techniques like: + /// - File reordering based on statistics + /// - Detecting already-sorted data + /// This would allow removing the Sort operator entirely. + /// + /// # Returns + /// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order + /// - `Unsupported`: Cannot optimize for this ordering + fn try_reverse_output( + &self, + order: &[PhysicalSortExpr], + ) -> datafusion_common::Result>> { + // Check if we have file ordering information + let file_ordering = match &self.file_ordering { + Some(ordering) => ordering, + None => return Ok(SortOrderPushdownResult::Unsupported), + }; + + // Create a LexOrdering from the requested order to use the is_reverse method + let Some(requested_ordering) = LexOrdering::new(order.to_vec()) else { + // Empty ordering requested, cannot optimize + return Ok(SortOrderPushdownResult::Unsupported); + }; + + // Check if reversing the file ordering would satisfy the requested ordering + if file_ordering.is_reverse(&requested_ordering) { + // Phase 1: Enable reverse row group scanning + let new_source = self.clone().with_reverse_row_groups(true); + + // Return Inexact because we're only reversing row group order, + // not guaranteeing perfect row-level ordering + return Ok(SortOrderPushdownResult::Inexact { + inner: Arc::new(new_source) as Arc, + }); + } + + // TODO Phase 2: Add support for other optimizations: + // - File reordering based on min/max statistics + // - Detection of exact ordering (return Exact to remove Sort operator) + // - Partial sort pushdown for prefix matches + + Ok(SortOrderPushdownResult::Unsupported) + } + + fn with_file_ordering_info( + &self, + ordering: Option, + ) -> datafusion_common::Result> { + Ok(Arc::new(self.clone().with_file_ordering(ordering))) + } } #[cfg(test)] @@ -728,4 +827,87 @@ mod tests { // same value. but filter() call Arc::clone internally assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref()); } + + #[test] + fn test_reverse_scan_default_value() { + use arrow::datatypes::Schema; + + let schema = Arc::new(Schema::empty()); + let source = ParquetSource::new(schema); + + assert!(!source.reverse_row_groups()); + } + + #[test] + fn test_reverse_scan_with_setter() { + use arrow::datatypes::Schema; + + let schema = Arc::new(Schema::empty()); + + let source = ParquetSource::new(schema.clone()).with_reverse_row_groups(true); + assert!(source.reverse_row_groups()); + + let source = source.with_reverse_row_groups(false); + assert!(!source.reverse_row_groups()); + } + + #[test] + fn test_reverse_scan_clone_preserves_value() { + use arrow::datatypes::Schema; + + let schema = Arc::new(Schema::empty()); + + let source = ParquetSource::new(schema).with_reverse_row_groups(true); + let cloned = source.clone(); + + assert!(cloned.reverse_row_groups()); + assert_eq!(source.reverse_row_groups(), cloned.reverse_row_groups()); + } + + #[test] + fn test_reverse_scan_with_other_options() { + use arrow::datatypes::Schema; + use datafusion_common::config::TableParquetOptions; + + let schema = Arc::new(Schema::empty()); + let options = TableParquetOptions::default(); + + let source = ParquetSource::new(schema) + .with_table_parquet_options(options) + .with_metadata_size_hint(8192) + .with_reverse_row_groups(true); + + assert!(source.reverse_row_groups()); + assert_eq!(source.metadata_size_hint, Some(8192)); + } + + #[test] + fn test_reverse_scan_builder_pattern() { + use arrow::datatypes::Schema; + + let schema = Arc::new(Schema::empty()); + + let source = ParquetSource::new(schema) + .with_reverse_row_groups(true) + .with_reverse_row_groups(false) + .with_reverse_row_groups(true); + + assert!(source.reverse_row_groups()); + } + + #[test] + fn test_reverse_scan_independent_of_predicate() { + use arrow::datatypes::Schema; + use datafusion_physical_expr::expressions::lit; + + let schema = Arc::new(Schema::empty()); + let predicate = lit(true); + + let source = ParquetSource::new(schema) + .with_predicate(predicate) + .with_reverse_row_groups(true); + + assert!(source.reverse_row_groups()); + assert!(source.filter().is_some()); + } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index e25a2e889e21e..2c69987f91342 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -31,9 +31,11 @@ use datafusion_common::{Result, not_impl_err}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::DisplayFormatType; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use object_store::ObjectStore; /// Helper function to convert any type implementing FileSource to Arc<dyn FileSource> @@ -129,6 +131,21 @@ pub trait FileSource: Send + Sync { )) } + /// Try to create a new FileSource that can produce data in the specified sort order. + /// + /// # Returns + /// * `Exact` - Created a source that guarantees perfect ordering + /// * `Inexact` - Created a source optimized for ordering (e.g., reordered files) but not perfectly sorted + /// * `Unsupported` - Cannot optimize for this ordering + /// + /// Default implementation returns `Unsupported`. + fn try_reverse_output( + &self, + _order: &[PhysicalSortExpr], + ) -> Result>> { + Ok(SortOrderPushdownResult::Unsupported) + } + /// Try to push down a projection into a this FileSource. /// /// `FileSource` implementations that support projection pushdown should @@ -183,4 +200,22 @@ pub trait FileSource: Send + Sync { fn schema_adapter_factory(&self) -> Option> { None } + + /// Set the file ordering information + /// + /// This allows the file source to know how the files are sorted, + /// enabling it to make informed decisions about sort pushdown. + /// + /// # Default Implementation + /// + /// Returns `not_impl_err!`. FileSource implementations that support + /// sort optimization should override this method. + fn with_file_ordering_info( + &self, + _ordering: Option, + ) -> Result> { + // Default: clone self without modification + // ParquetSource will override this + not_impl_err!("with_file_ordering_info not implemented for this FileSource") + } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index ad89406014a0b..16a010cf27cbf 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -34,25 +34,26 @@ use datafusion_execution::{ SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl, }; use datafusion_expr::Operator; + +use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::SortOrderPushdownResult; +use datafusion_physical_plan::coop::cooperative; +use datafusion_physical_plan::execution_plan::SchedulingType; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, display::{ProjectSchemaDisplay, display_orderings}, filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, }; -use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; - -use datafusion_physical_expr::equivalence::project_orderings; -use datafusion_physical_plan::coop::cooperative; -use datafusion_physical_plan::execution_plan::SchedulingType; use log::{debug, warn}; +use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc}; /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. @@ -845,6 +846,45 @@ impl DataSource for FileScanConfig { } } } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + let file_ordering = self.output_ordering.first().cloned(); + + if file_ordering.is_none() { + return Ok(SortOrderPushdownResult::Unsupported); + } + + // Use the trait method instead of downcasting + // Try to provide file ordering info to the source + // If not supported (e.g., CsvSource), fall back to original source + let file_source_with_ordering = self + .file_source + .with_file_ordering_info(file_ordering) + .unwrap_or_else(|_| Arc::clone(&self.file_source)); + + // Try to reverse the datasource with ordering info, + // and currently only ParquetSource supports it with inexact reverse with row groups. + let pushdown_result = file_source_with_ordering.try_reverse_output(order)?; + + match pushdown_result { + SortOrderPushdownResult::Exact { inner } => { + Ok(SortOrderPushdownResult::Exact { + inner: self.rebuild_with_source(inner, true)?, + }) + } + SortOrderPushdownResult::Inexact { inner } => { + Ok(SortOrderPushdownResult::Inexact { + inner: self.rebuild_with_source(inner, false)?, + }) + } + SortOrderPushdownResult::Unsupported => { + Ok(SortOrderPushdownResult::Unsupported) + } + } + } } impl FileScanConfig { @@ -1107,6 +1147,36 @@ impl FileScanConfig { pub fn file_source(&self) -> &Arc { &self.file_source } + + /// Helper: Rebuild FileScanConfig with new file source + fn rebuild_with_source( + &self, + new_file_source: Arc, + is_exact: bool, + ) -> Result> { + let mut new_config = self.clone(); + + // Reverse file groups (FileScanConfig's responsibility) + new_config.file_groups = new_config + .file_groups + .into_iter() + .map(|group| { + let mut files = group.into_inner(); + files.reverse(); + files.into() + }) + .collect(); + + new_config.file_source = new_file_source; + + // Phase 1: Clear output_ordering for Inexact + // (we're only reversing row groups, not guaranteeing perfect ordering) + if !is_exact { + new_config.output_ordering = vec![]; + } + + Ok(Arc::new(new_config)) + } } impl Debug for FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 0945ffc94c1d4..a3892dfac9778 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -40,7 +40,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; -use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::SortOrderPushdownResult; use datafusion_physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; @@ -190,6 +191,25 @@ pub trait DataSource: Send + Sync + Debug { vec![PushedDown::No; filters.len()], )) } + + /// Try to create a new DataSource that produces data in the specified sort order. + /// + /// # Arguments + /// * `order` - The desired output ordering + /// + /// # Returns + /// * `Ok(SortOrderPushdownResult::Exact { .. })` - Created a source that guarantees exact ordering + /// * `Ok(SortOrderPushdownResult::Inexact { .. })` - Created a source optimized for the ordering + /// * `Ok(SortOrderPushdownResult::Unsupported)` - Cannot optimize for this ordering + /// * `Err(e)` - Error occurred + /// + /// Default implementation returns `Unsupported`. + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + ) -> Result>> { + Ok(SortOrderPushdownResult::Unsupported) + } } /// [`ExecutionPlan`] that reads one or more files @@ -360,6 +380,19 @@ impl ExecutionPlan for DataSourceExec { }), } } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // Delegate to the data source and wrap result with DataSourceExec + self.data_source + .try_pushdown_sort(order)? + .try_map(|new_data_source| { + let new_exec = self.clone().with_data_source(new_data_source); + Ok(Arc::new(new_exec) as Arc) + }) + } } impl DataSourceExec { diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 5dcdab1552747..30ba7de76a471 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -424,6 +424,13 @@ impl SessionConfig { self.options.optimizer.enable_round_robin_repartition } + /// Enables or disables sort pushdown optimization, and currently only + /// applies to Parquet data source. + pub fn with_enable_sort_pushdown(mut self, enabled: bool) -> Self { + self.options_mut().optimizer.enable_sort_pushdown = enabled; + self + } + /// Set the size of [`sort_spill_reservation_bytes`] to control /// memory pre-reservation /// diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 1a49db3d58cc5..e8558c7643d07 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -426,6 +426,62 @@ impl LexOrdering { self.exprs.truncate(len); true } + + /// Check if reversing this ordering would satisfy another ordering requirement. + /// + /// This supports **prefix matching**: if this ordering is `[A DESC, B ASC]` + /// and `other` is `[A ASC]`, reversing this gives `[A ASC, B DESC]`, which + /// satisfies `other` since `[A ASC]` is a prefix. + /// + /// # Arguments + /// * `other` - The ordering requirement to check against + /// + /// # Returns + /// `true` if reversing this ordering would satisfy `other` + /// + /// # Example + /// ```text + /// self: [number DESC, letter ASC] + /// other: [number ASC] + /// After reversing self: [number ASC, letter DESC] ✓ Prefix match! + /// ``` + pub fn is_reverse(&self, other: &LexOrdering) -> bool { + let self_exprs = self.as_ref(); + let other_exprs = other.as_ref(); + + if other_exprs.len() > self_exprs.len() { + return false; + } + + other_exprs.iter().zip(self_exprs.iter()).all(|(req, cur)| { + req.expr.eq(&cur.expr) && is_reversed_sort_options(&req.options, &cur.options) + }) + } +} + +/// Check if two SortOptions represent reversed orderings. +/// +/// Returns `true` if both `descending` and `nulls_first` are opposite. +/// +/// # Example +/// ``` +/// use arrow::compute::SortOptions; +/// # use datafusion_physical_expr_common::sort_expr::is_reversed_sort_options; +/// +/// let asc_nulls_last = SortOptions { +/// descending: false, +/// nulls_first: false, +/// }; +/// let desc_nulls_first = SortOptions { +/// descending: true, +/// nulls_first: true, +/// }; +/// +/// assert!(is_reversed_sort_options(&asc_nulls_last, &desc_nulls_first)); +/// assert!(is_reversed_sort_options(&desc_nulls_first, &asc_nulls_last)); +/// ``` +pub fn is_reversed_sort_options(lhs: &SortOptions, rhs: &SortOptions) -> bool { + lhs.descending != rhs.descending && lhs.nulls_first != rhs.nulls_first } impl PartialEq for LexOrdering { @@ -732,3 +788,50 @@ impl DerefMut for OrderingRequirements { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_reversed_sort_options() { + // Test basic reversal: ASC NULLS LAST ↔ DESC NULLS FIRST + let asc_nulls_last = SortOptions { + descending: false, + nulls_first: false, + }; + let desc_nulls_first = SortOptions { + descending: true, + nulls_first: true, + }; + assert!(is_reversed_sort_options(&asc_nulls_last, &desc_nulls_first)); + assert!(is_reversed_sort_options(&desc_nulls_first, &asc_nulls_last)); + + // Test another reversal: ASC NULLS FIRST ↔ DESC NULLS LAST + let asc_nulls_first = SortOptions { + descending: false, + nulls_first: true, + }; + let desc_nulls_last = SortOptions { + descending: true, + nulls_first: false, + }; + assert!(is_reversed_sort_options(&asc_nulls_first, &desc_nulls_last)); + assert!(is_reversed_sort_options(&desc_nulls_last, &asc_nulls_first)); + + // Test non-reversal: same options + assert!(!is_reversed_sort_options(&asc_nulls_last, &asc_nulls_last)); + assert!(!is_reversed_sort_options( + &desc_nulls_first, + &desc_nulls_first + )); + + // Test non-reversal: only descending differs + assert!(!is_reversed_sort_options(&asc_nulls_last, &desc_nulls_last)); + assert!(!is_reversed_sort_options(&desc_nulls_last, &asc_nulls_last)); + + // Test non-reversal: only nulls_first differs + assert!(!is_reversed_sort_options(&asc_nulls_last, &asc_nulls_first)); + assert!(!is_reversed_sort_options(&asc_nulls_first, &asc_nulls_last)); + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index cf8cf33cea531..1b45f02ebd511 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -42,6 +42,7 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; +pub mod pushdown_sort; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index f8e2e9950af3c..aa1975d98d48b 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::limit_pushdown_past_window::LimitPushPastWindows; +use crate::pushdown_sort::PushdownSort; use datafusion_common::Result; use datafusion_common::config::ConfigOptions; use datafusion_physical_plan::ExecutionPlan; @@ -145,6 +146,8 @@ impl PhysicalOptimizer { // are not present, the load of executors such as join or union will be // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), + // PushdownSort: Detect sorts that can be pushed down to data sources. + Arc::new(PushdownSort::new()), Arc::new(EnsureCooperative::new()), // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. // Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs new file mode 100644 index 0000000000000..1fa15492d2a92 --- /dev/null +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort Pushdown Optimization +//! +//! This optimizer attempts to push sort requirements down through the execution plan +//! tree to data sources that can natively handle them (e.g., by scanning files in +//! reverse order). +//! +//! ## How it works +//! +//! 1. Detects `SortExec` nodes in the plan +//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement +//! 3. Each node type defines its own pushdown behavior: +//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to +//! their children and wrap the result +//! - **Data sources** (DataSourceExec) check if they can optimize for the ordering +//! - **Blocking nodes** return `Unsupported` to stop pushdown +//! 4. Based on the result: +//! - `Exact`: Remove the Sort operator (data source guarantees perfect ordering) +//! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK) +//! - `Unsupported`: No change +//! +//! ## Current capabilities (Phase 1) +//! +//! - Reverse scan optimization: when required sort is the reverse of the data source's +//! natural ordering, enable reverse scanning (reading row groups in reverse order) +//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs +//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement +//! +//! TODO Issue: +//! ## Future enhancements (Phase 2), +//! +//! - File reordering based on statistics +//! - Return `Exact` when files are known to be perfectly sorted +//! - Complete Sort elimination when ordering is guaranteed + +use crate::PhysicalOptimizerRule; +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::SortOrderPushdownResult; +use datafusion_physical_plan::sorts::sort::SortExec; +use std::sync::Arc; + +/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources. +/// +/// See module-level documentation for details. +#[derive(Debug, Clone, Default)] +pub struct PushdownSort; + +impl PushdownSort { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for PushdownSort { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + // Check if sort pushdown optimization is enabled + if !config.optimizer.enable_sort_pushdown { + return Ok(plan); + } + + // Use transform_down to find and optimize all SortExec nodes (including nested ones) + plan.transform_down(|plan: Arc| { + // Check if this is a SortExec + let Some(sort_exec) = plan.as_any().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + let sort_input = Arc::clone(sort_exec.input()); + let required_ordering = sort_exec.expr(); + + // Try to push the sort requirement down through the plan tree + // Each node type defines its own pushdown behavior via try_pushdown_sort() + match sort_input.try_pushdown_sort(required_ordering)? { + SortOrderPushdownResult::Exact { inner } => { + // Data source guarantees perfect ordering - remove the Sort operator + Ok(Transformed::yes(inner)) + } + SortOrderPushdownResult::Inexact { inner } => { + // Data source is optimized for the ordering but not perfectly sorted + // Keep the Sort operator but use the optimized input + // Benefits: TopK queries can terminate early, better cache locality + Ok(Transformed::yes(Arc::new( + SortExec::new(required_ordering.clone(), inner) + .with_fetch(sort_exec.fetch()) + .with_preserve_partitioning( + sort_exec.preserve_partitioning(), + ), + ))) + } + SortOrderPushdownResult::Unsupported => { + // Cannot optimize for this ordering - no change + Ok(Transformed::no(plan)) + } + } + }) + .data() + } + + fn name(&self) -> &str { + "PushdownSort" + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index eb3c3b5befbdd..494b5d60fb0cb 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -40,7 +40,9 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::sort_pushdown::SortOrderPushdownResult; use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -241,6 +243,20 @@ impl ExecutionPlan for CoalesceBatchesExec { ) -> Result>> { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // CoalesceBatchesExec is transparent for sort ordering - it preserves order + // Delegate to the child and wrap with a new CoalesceBatchesExec + self.input.try_pushdown_sort(order)?.try_map(|new_input| { + Ok(Arc::new( + CoalesceBatchesExec::new(new_input, self.target_batch_size) + .with_fetch(self.fetch), + ) as Arc) + }) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 7f207d7f1e836..d83f90eb3d8c1 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -30,7 +30,9 @@ use super::{ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; +use crate::sort_pushdown::SortOrderPushdownResult; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; @@ -284,6 +286,42 @@ impl ExecutionPlan for CoalescePartitionsExec { ) -> Result { FilterDescription::from_children(parent_filters, &self.children()) } + + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // CoalescePartitionsExec merges multiple partitions into one, which loses + // global ordering. However, we can still push the sort requirement down + // to optimize individual partitions - the Sort operator above will handle + // the global ordering. + // + // Note: The result will always be at most Inexact (never Exact) when there + // are multiple partitions, because merging destroys global ordering. + let result = self.input.try_pushdown_sort(order)?; + + // If we have multiple partitions, we can't return Exact even if the + // underlying source claims Exact - merging destroys global ordering + let has_multiple_partitions = + self.input.output_partitioning().partition_count() > 1; + + result + .try_map(|new_input| { + Ok( + Arc::new( + CoalescePartitionsExec::new(new_input).with_fetch(self.fetch), + ) as Arc, + ) + }) + .map(|r| { + if has_multiple_partitions { + // Downgrade Exact to Inexact when merging multiple partitions + r.into_inexact() + } else { + r + } + }) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b7967bb7bbc8a..06da0b8933c18 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -22,6 +22,7 @@ use crate::filter_pushdown::{ }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; +use crate::sort_pushdown::SortOrderPushdownResult; pub use crate::stream::EmptyRecordBatchStream; pub use datafusion_common::hash_utils; @@ -54,7 +55,9 @@ use datafusion_common::{ use datafusion_common_runtime::JoinSet; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use datafusion_physical_expr_common::sort_expr::{ + LexOrdering, OrderingRequirements, PhysicalSortExpr, +}; use futures::stream::{StreamExt, TryStreamExt}; @@ -682,6 +685,29 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Try to push down sort ordering requirements to this node. + /// + /// This method is called during sort pushdown optimization to determine if this + /// node can optimize for a requested sort ordering. Implementations should: + /// + /// - Return [`SortOrderPushdownResult::Exact`] if the node can guarantee the exact + /// ordering (allowing the Sort operator to be removed) + /// - Return [`SortOrderPushdownResult::Inexact`] if the node can optimize for the + /// ordering but cannot guarantee perfect sorting (Sort operator is kept) + /// - Return [`SortOrderPushdownResult::Unsupported`] if the node cannot optimize + /// for the ordering + /// + /// For transparent nodes (that preserve ordering), implement this to delegate to + /// children and wrap the result with a new instance of this node. + /// + /// Default implementation returns `Unsupported`. + fn try_pushdown_sort( + &self, + _order: &[PhysicalSortExpr], + ) -> Result>> { + Ok(SortOrderPushdownResult::Unsupported) + } } /// [`ExecutionPlan`] Invariant Level diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 849b34e703477..ec8e154caec91 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -50,6 +50,7 @@ pub use crate::execution_plan::{ }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; +pub use crate::sort_pushdown::SortOrderPushdownResult; pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{ExecutionPlanVisitor, accept, visit_execution_plan}; @@ -83,6 +84,7 @@ pub mod placeholder_row; pub mod projection; pub mod recursive_query; pub mod repartition; +pub mod sort_pushdown; pub mod sorts; pub mod spill; pub mod stream; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 9d437dbcf650e..5c9472182b0a1 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -61,6 +61,8 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::sort_pushdown::SortOrderPushdownResult; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt, ready}; @@ -1094,6 +1096,27 @@ impl ExecutionPlan for RepartitionExec { Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } + fn try_pushdown_sort( + &self, + order: &[PhysicalSortExpr], + ) -> Result>> { + // RepartitionExec only maintains input order if preserve_order is set + // or if there's only one partition + if !self.maintains_input_order()[0] { + return Ok(SortOrderPushdownResult::Unsupported); + } + + // Delegate to the child and wrap with a new RepartitionExec + self.input.try_pushdown_sort(order)?.try_map(|new_input| { + let mut new_repartition = + RepartitionExec::try_new(new_input, self.partitioning().clone())?; + if self.preserve_order { + new_repartition = new_repartition.with_preserve_order(); + } + Ok(Arc::new(new_repartition) as Arc) + }) + } + fn repartitioned( &self, target_partitions: usize, diff --git a/datafusion/physical-plan/src/sort_pushdown.rs b/datafusion/physical-plan/src/sort_pushdown.rs new file mode 100644 index 0000000000000..8432fd5dabee7 --- /dev/null +++ b/datafusion/physical-plan/src/sort_pushdown.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort pushdown types for physical execution plans. +//! +//! This module provides types used for pushing sort ordering requirements +//! down through the execution plan tree to data sources. + +/// Result of attempting to push down sort ordering to a node. +/// +/// Used by [`ExecutionPlan::try_pushdown_sort`] to communicate +/// whether and how sort ordering was successfully pushed down. +/// +/// [`ExecutionPlan::try_pushdown_sort`]: crate::ExecutionPlan::try_pushdown_sort +#[derive(Debug, Clone)] +pub enum SortOrderPushdownResult { + /// The source can guarantee exact ordering (data is perfectly sorted). + /// + /// When this is returned, the optimizer can safely remove the Sort operator + /// entirely since the data source guarantees the requested ordering. + Exact { + /// The optimized node that provides exact ordering + inner: T, + }, + /// The source has optimized for the ordering but cannot guarantee perfect sorting. + /// + /// This indicates the data source has been optimized (e.g., reordered files/row groups + /// based on statistics, enabled reverse scanning) but the data may not be perfectly + /// sorted. The optimizer should keep the Sort operator but benefits from the + /// optimization (e.g., faster TopK queries due to early termination). + Inexact { + /// The optimized node that provides approximate ordering + inner: T, + }, + /// The source cannot optimize for this ordering. + /// + /// The data source does not support the requested sort ordering and no + /// optimization was applied. + Unsupported, +} + +impl SortOrderPushdownResult { + /// Extract the inner value if present + pub fn into_inner(self) -> Option { + match self { + Self::Exact { inner } | Self::Inexact { inner } => Some(inner), + Self::Unsupported => None, + } + } + + /// Map the inner value to a different type while preserving the variant. + pub fn map U>(self, f: F) -> SortOrderPushdownResult { + match self { + Self::Exact { inner } => SortOrderPushdownResult::Exact { inner: f(inner) }, + Self::Inexact { inner } => { + SortOrderPushdownResult::Inexact { inner: f(inner) } + } + Self::Unsupported => SortOrderPushdownResult::Unsupported, + } + } + + /// Try to map the inner value, returning an error if the function fails. + pub fn try_map Result>( + self, + f: F, + ) -> Result, E> { + match self { + Self::Exact { inner } => { + Ok(SortOrderPushdownResult::Exact { inner: f(inner)? }) + } + Self::Inexact { inner } => { + Ok(SortOrderPushdownResult::Inexact { inner: f(inner)? }) + } + Self::Unsupported => Ok(SortOrderPushdownResult::Unsupported), + } + } + + /// Convert this result to `Inexact`, downgrading `Exact` if present. + /// + /// This is useful when an operation (like merging multiple partitions) + /// cannot guarantee exact ordering even if the input provides it. + /// + /// # Examples + /// + /// ``` + /// # use datafusion_physical_plan::SortOrderPushdownResult; + /// let exact = SortOrderPushdownResult::Exact { inner: 42 }; + /// let inexact = exact.into_inexact(); + /// assert!(matches!(inexact, SortOrderPushdownResult::Inexact { inner: 42 })); + /// + /// let already_inexact = SortOrderPushdownResult::Inexact { inner: 42 }; + /// let still_inexact = already_inexact.into_inexact(); + /// assert!(matches!(still_inexact, SortOrderPushdownResult::Inexact { inner: 42 })); + /// + /// let unsupported = SortOrderPushdownResult::::Unsupported; + /// let still_unsupported = unsupported.into_inexact(); + /// assert!(matches!(still_unsupported, SortOrderPushdownResult::Unsupported)); + /// ``` + pub fn into_inexact(self) -> Self { + match self { + Self::Exact { inner } => Self::Inexact { inner }, + Self::Inexact { inner } => Self::Inexact { inner }, + Self::Unsupported => Self::Unsupported, + } + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 7c08aaad98738..0bf87203ac50a 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1111,7 +1111,7 @@ impl serde::Serialize for ColumnStats { struct_ser.serialize_field("distinctCount", v)?; } if let Some(v) = self.byte_size.as_ref() { - struct_ser.serialize_field("ByteSize", v)?; + struct_ser.serialize_field("byteSize", v)?; } struct_ser.end() } @@ -1134,7 +1134,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { "distinct_count", "distinctCount", "byte_size", - "ByteSize", + "byteSize", ]; #[allow(clippy::enum_variant_names)] @@ -1144,7 +1144,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { SumValue, NullCount, DistinctCount, - ByteSize, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -1172,7 +1171,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { "sumValue" | "sum_value" => Ok(GeneratedField::SumValue), "nullCount" | "null_count" => Ok(GeneratedField::NullCount), "distinctCount" | "distinct_count" => Ok(GeneratedField::DistinctCount), - "ByteSize" | "byte_size" => Ok(GeneratedField::ByteSize), + "byteSize" | "byte_size" => Ok(GeneratedField::ByteSize), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1232,7 +1231,7 @@ impl<'de> serde::Deserialize<'de> for ColumnStats { } GeneratedField::ByteSize => { if byte_size__.is_some() { - return Err(serde::de::Error::duplicate_field("ByteSize")); + return Err(serde::de::Error::duplicate_field("byteSize")); } byte_size__ = map_.next_value()?; } diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 1e6183f48bac7..0b15a7f8ec5dd 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -264,7 +264,7 @@ logical_plan 02)--TableScan: t projection=[id] physical_plan 01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], file_type=parquet, reverse_row_groups=true statement ok DROP TABLE t; diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 3e403171e0718..3d08cdf751a41 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -426,3 +426,329 @@ SET datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true; statement ok SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; + +# Test 6: Sort Pushdown for ordered Parquet files + +# Create a sorted dataset +statement ok +CREATE TABLE sorted_data(id INT, value INT, name VARCHAR) AS VALUES +(1, 100, 'a'), +(2, 200, 'b'), +(3, 300, 'c'), +(4, 400, 'd'), +(5, 500, 'e'), +(6, 600, 'f'), +(7, 700, 'g'), +(8, 800, 'h'), +(9, 900, 'i'), +(10, 1000, 'j'); + +# Copy to parquet with sorting +query I +COPY (SELECT * FROM sorted_data ORDER BY id ASC) +TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet'; +---- +10 + +statement ok +CREATE EXTERNAL TABLE sorted_parquet(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet' +WITH ORDER (id ASC); + +# Test 6.1: Sort pushdown with DESC (opposite of ASC) +# Should show reverse_row_groups=true +query TT +EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: sorted_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: sorted_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Test 6.2: Verify results are correct +query IIT +SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Test 6.3: Should NOT apply for ASC (same direction) +query TT +EXPLAIN SELECT * FROM sorted_parquet ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: sorted_parquet.id ASC NULLS LAST, fetch=3 +02)--TableScan: sorted_parquet projection=[id, value, name] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]}, projection=[id, value, name], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +# Test 6.4: Disable sort pushdown +statement ok +SET datafusion.optimizer.enable_sort_pushdown = false; + +query TT +EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: sorted_parquet.id DESC NULLS FIRST, fetch=3 +02)--TableScan: sorted_parquet projection=[id, value, name] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]}, projection=[id, value, name], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] + +# Re-enable +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Test 6.5: With OFFSET +query TT +EXPLAIN SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +logical_plan +01)Limit: skip=2, fetch=3 +02)--Sort: sorted_parquet.id DESC NULLS FIRST, fetch=5 +03)----TableScan: sorted_parquet projection=[id, value, name] +physical_plan +01)GlobalLimitExec: skip=2, fetch=3 +02)--SortExec: TopK(fetch=5), expr=[id@0 DESC], preserve_partitioning=[false] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +query IIT +SELECT * FROM sorted_parquet ORDER BY id DESC LIMIT 3 OFFSET 2; +---- +8 800 h +7 700 g +6 600 f + +# Test 6.6: Reverse scan with row selection (page index pruning) +# This tests that when reverse_row_groups=true, the RowSelection is also properly reversed + +# Create a dataset with multiple row groups and enable page index +statement ok +CREATE TABLE multi_rg_data(id INT, category VARCHAR, value INT) AS VALUES +(1, 'alpha', 10), +(2, 'alpha', 20), +(3, 'beta', 30), +(4, 'beta', 40), +(5, 'gamma', 50), +(6, 'gamma', 60), +(7, 'delta', 70), +(8, 'delta', 80); + +# Write with small row groups (2 rows each = 4 row groups) +statement ok +SET datafusion.execution.parquet.max_row_group_size = 2; + +query I +COPY (SELECT * FROM multi_rg_data ORDER BY id ASC) +TO 'test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet'; +---- +8 + +# Reset row group size +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE multi_rg_sorted(id INT, category VARCHAR, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet' +WITH ORDER (id ASC); + +# Enable page index for better pruning +statement ok +SET datafusion.execution.parquet.enable_page_index = true; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +# Test with reverse scan and filter that prunes some row groups +# This will create a RowSelection with partial row group scans +query TT +EXPLAIN SELECT * FROM multi_rg_sorted +WHERE category IN ('alpha', 'gamma') +ORDER BY id DESC LIMIT 5; +---- +logical_plan +01)Sort: multi_rg_sorted.id DESC NULLS FIRST, fetch=5 +02)--Filter: multi_rg_sorted.category = Utf8View("alpha") OR multi_rg_sorted.category = Utf8View("gamma") +03)----TableScan: multi_rg_sorted projection=[id, category, value], partial_filters=[multi_rg_sorted.category = Utf8View("alpha") OR multi_rg_sorted.category = Utf8View("gamma")] +physical_plan +01)SortExec: TopK(fetch=5), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/multi_rg_sorted.parquet]]}, projection=[id, category, value], file_type=parquet, predicate=(category@1 = alpha OR category@1 = gamma) AND DynamicFilter [ empty ], reverse_row_groups=true, pruning_predicate=category_null_count@2 != row_count@3 AND category_min@0 <= alpha AND alpha <= category_max@1 OR category_null_count@2 != row_count@3 AND category_min@0 <= gamma AND gamma <= category_max@1, required_guarantees=[category in (alpha, gamma)] + +# Verify the results are correct despite reverse scanning with row selection +# Expected: gamma values (6, 5) then alpha values (2, 1), in DESC order by id +query ITI +SELECT * FROM multi_rg_sorted +WHERE category IN ('alpha', 'gamma') +ORDER BY id DESC LIMIT 5; +---- +6 gamma 60 +5 gamma 50 +2 alpha 20 +1 alpha 10 + +# Test with more complex selection pattern +query ITI +SELECT * FROM multi_rg_sorted +WHERE category IN ('beta', 'delta') +ORDER BY id DESC; +---- +8 delta 80 +7 delta 70 +4 beta 40 +3 beta 30 + +# Test forward scan for comparison (should give same logical results in ASC order) +query ITI +SELECT * FROM multi_rg_sorted +WHERE category IN ('alpha', 'gamma') +ORDER BY id ASC; +---- +1 alpha 10 +2 alpha 20 +5 gamma 50 +6 gamma 60 + +# Disable reverse scan and verify it still works +statement ok +SET datafusion.optimizer.enable_sort_pushdown = false; + +query ITI +SELECT * FROM multi_rg_sorted +WHERE category IN ('alpha', 'gamma') +ORDER BY id DESC LIMIT 5; +---- +6 gamma 60 +5 gamma 50 +2 alpha 20 +1 alpha 10 + +# Re-enable +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Test 6.7: Sort pushdown with more than one partition +# Create multiple parquet files to trigger it + +# Split data into multiple files +statement ok +CREATE TABLE sorted_data_part1(id INT, value INT, name VARCHAR) AS VALUES +(1, 100, 'a'), +(2, 200, 'b'), +(3, 300, 'c'); + +statement ok +CREATE TABLE sorted_data_part2(id INT, value INT, name VARCHAR) AS VALUES +(4, 400, 'd'), +(5, 500, 'e'), +(6, 600, 'f'); + +statement ok +CREATE TABLE sorted_data_part3(id INT, value INT, name VARCHAR) AS VALUES +(7, 700, 'g'), +(8, 800, 'h'), +(9, 900, 'i'), +(10, 1000, 'j'); + +# Create directory for multi-file parquet +query I +COPY (SELECT * FROM sorted_data_part1 ORDER BY id ASC) +TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part1.parquet'; +---- +3 + +query I +COPY (SELECT * FROM sorted_data_part2 ORDER BY id ASC) +TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part2.parquet'; +---- +3 + +query I +COPY (SELECT * FROM sorted_data_part3 ORDER BY id ASC) +TO 'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part3.parquet'; +---- +4 + +# Create external table pointing to directory with multiple files +statement ok +CREATE EXTERNAL TABLE sorted_parquet_multi(id INT, value INT, name VARCHAR) +STORED AS PARQUET +LOCATION 'test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/' +WITH ORDER (id ASC); + +# Enable multiple partitions +statement ok +SET datafusion.execution.target_partitions = 4; + +# Now we should see RepartitionExec because we have 3 input partitions (3 files) +query TT +EXPLAIN SELECT * FROM sorted_parquet_multi ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: sorted_parquet_multi.id DESC NULLS FIRST, fetch=3 +02)--TableScan: sorted_parquet_multi projection=[id, value, name] +physical_plan +01)SortPreservingMergeExec: [id@0 DESC], fetch=3 +02)--SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[true] +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/sorted_multi/part3.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true + +# Verify correctness with repartitioning and multiple files +query IIT +SELECT * FROM sorted_parquet_multi ORDER BY id DESC LIMIT 3; +---- +10 1000 j +9 900 i +8 800 h + +# Test ASC order (should not trigger reverse scan) +query IIT +SELECT * FROM sorted_parquet_multi ORDER BY id ASC LIMIT 3; +---- +1 100 a +2 200 b +3 300 c + +# Cleanup +statement ok +DROP TABLE sorted_data_part1; + +statement ok +DROP TABLE sorted_data_part2; + +statement ok +DROP TABLE sorted_data_part3; + +statement ok +DROP TABLE sorted_parquet_multi; + +# Reset to default +statement ok +SET datafusion.execution.target_partitions = 4; + +# Cleanup +statement ok +DROP TABLE multi_rg_data; + +statement ok +DROP TABLE multi_rg_sorted; + +statement ok +SET datafusion.execution.parquet.enable_page_index = false; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = false; + +# Cleanup +statement ok +DROP TABLE sorted_data; + +statement ok +DROP TABLE sorted_parquet; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 9f8e264ee8bb0..9087aee56d978 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -240,6 +240,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -320,6 +321,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -364,6 +366,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE @@ -599,6 +602,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE +physical_plan after PushdownSort SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b5429d68c99f0..8c11667d970ad 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,6 +297,7 @@ datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_piecewise_merge_join false datafusion.optimizer.enable_round_robin_repartition true +datafusion.optimizer.enable_sort_pushdown true datafusion.optimizer.enable_topk_aggregation true datafusion.optimizer.enable_topk_dynamic_filter_pushdown true datafusion.optimizer.enable_window_limits true @@ -430,6 +431,7 @@ datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attemp datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores +datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible datafusion.optimizer.enable_topk_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down TopK dynamic filters into the file scan phase. datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 7364fccd8e570..aba468d21fd08 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -340,7 +340,7 @@ explain select number, letter, age from partial_sorted order by number asc limit ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], file_type=parquet, predicate=DynamicFilter [ empty ], reverse_row_groups=true query TT explain select number, letter, age from partial_sorted order by letter asc, number desc limit 3; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 156df1d9d70aa..750543d0b56d3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -161,6 +161,7 @@ The following configuration settings are available: | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |