diff --git a/Cargo.lock b/Cargo.lock index 11b158fe83b7e..e78b8a88b3df9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1764,6 +1764,7 @@ dependencies = [ "parking_lot", "parquet", "paste", + "pretty_assertions", "rand 0.9.2", "rand_distr", "recursive", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5c7e944e59f7b..89a22ac4ab129 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -175,6 +175,7 @@ env_logger = { workspace = true } glob = { workspace = true } insta = { workspace = true } paste = { workspace = true } +pretty_assertions = "1.0" rand = { workspace = true, features = ["small_rng"] } rand_distr = "0.5" recursive = { workspace = true } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 5f1971f649d2c..bb9e03c837c1e 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -59,6 +59,7 @@ use datafusion_physical_plan::{ coalesce_partitions::CoalescePartitionsExec, collect, filter::{FilterExec, FilterExecBuilder}, + projection::ProjectionExec, repartition::RepartitionExec, sorts::sort::SortExec, }; @@ -1826,6 +1827,234 @@ fn schema() -> SchemaRef { Arc::clone(&TEST_SCHEMA) } +struct ProjectionDynFilterTestCase { + schema: SchemaRef, + batches: Vec, + projection: Vec<(Arc, String)>, + sort_expr: PhysicalSortExpr, + expected_plans: Vec, +} + +async fn run_projection_dyn_filter_case(case: ProjectionDynFilterTestCase) { + let ProjectionDynFilterTestCase { + schema, + batches, + projection, + sort_expr, + expected_plans, + } = case; + + let scan = TestScanBuilder::new(Arc::clone(&schema)) + .with_support(true) + .with_batches(batches) + .build(); + + let projection_exec = Arc::new(ProjectionExec::try_new(projection, scan).unwrap()); + + let sort = Arc::new( + SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), projection_exec) + .with_fetch(Some(2)), + ) as Arc; + + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + + let optimized_plan = FilterPushdown::new_post_optimization() + .optimize(Arc::clone(&sort), &config) + .unwrap(); + + pretty_assertions::assert_eq!( + format_plan_for_test(&optimized_plan).trim(), + expected_plans[0].trim() + ); + + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = optimized_plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + for (idx, expected_plan) in expected_plans.iter().enumerate().skip(1) { + stream.next().await.unwrap().unwrap(); + let formatted_plan = format_plan_for_test(&optimized_plan); + pretty_assertions::assert_eq!( + formatted_plan.trim(), + expected_plan.trim(), + "Mismatch at iteration {}", + idx + ); + } +} + +#[tokio::test] +async fn test_topk_with_projection_transformation_on_dyn_filter() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let simple_abc = vec![ + record_batch!( + ("a", Int32, [1, 2, 3]), + ("b", Utf8, ["x", "y", "z"]), + ("c", Float64, [1.0, 2.0, 3.0]) + ) + .unwrap(), + ]; + + // Case 1: Reordering [b, a] + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![ + (col("b", &schema).unwrap(), "b".to_string()), + (col("a", &schema).unwrap(), "a".to_string()), + ], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("a", 1)), + SortOptions::default(), + ), + expected_plans: vec![ +r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[b@1 as b, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), +r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false], filter=[a@1 IS NULL OR a@1 < 2] + - ProjectionExec: expr=[b@1 as b, a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string()] + }) + .await; + + // Case 2: Pruning [a] + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![(col("a", &schema).unwrap(), "a".to_string())], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + ), + expected_plans: vec![ + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2] + - ProjectionExec: expr=[a@0 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(), + ], + }) + .await; + + // Case 3: Identity [a, b] + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![ + (col("a", &schema).unwrap(), "a".to_string()), + (col("b", &schema).unwrap(), "b".to_string()), + ], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + ), + expected_plans: vec![ + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2] + - ProjectionExec: expr=[a@0 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(), + ], + }) + .await; + + // Case 4: Expressions [a + 1, b] + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![ + ( + Arc::new(BinaryExpr::new( + col("a", &schema).unwrap(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + "a_plus_1".to_string(), + ), + (col("b", &schema).unwrap(), "b".to_string()), + ], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("a_plus_1", 0)), + SortOptions::default(), + ), + expected_plans: vec![ + r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), + r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false], filter=[a_plus_1@0 IS NULL OR a_plus_1@0 < 3] + - ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(), + ], + }) + .await; + + // Case 5: [a as b, b as a] (swapped columns) + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![ + (col("a", &schema).unwrap(), "b".to_string()), + (col("b", &schema).unwrap(), "a".to_string()), + ], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("b", 0)), + SortOptions::default(), + ), + expected_plans: vec![ + r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 as b, b@1 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), + r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false], filter=[b@0 IS NULL OR b@0 < 2] + - ProjectionExec: expr=[a@0 as b, b@1 as a] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(), + ], + }) + .await; + + // Case 6: Confusing expr [a + 1 as a, b] + run_projection_dyn_filter_case(ProjectionDynFilterTestCase { + schema: Arc::clone(&schema), + batches: simple_abc.clone(), + projection: vec![ + ( + Arc::new(BinaryExpr::new( + col("a", &schema).unwrap(), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + "a".to_string(), + ), + (col("b", &schema).unwrap(), "b".to_string()), + ], + sort_expr: PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions::default(), + ), + expected_plans: vec![ + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false] + - ProjectionExec: expr=[a@0 + 1 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(), + r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 3] + - ProjectionExec: expr=[a@0 + 1 as a, b@1 as b] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(), + ], + }) + .await; +} + /// Returns a predicate that is a binary expression col = lit fn col_lit_predicate( column_name: &str, diff --git a/datafusion/physical-plan/src/column_rewriter.rs b/datafusion/physical-plan/src/column_rewriter.rs new file mode 100644 index 0000000000000..7cd8656304554 --- /dev/null +++ b/datafusion/physical-plan/src/column_rewriter.rs @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{ + DataFusionError, HashMap, + tree_node::{Transformed, TreeNodeRecursion, TreeNodeRewriter}, +}; +use datafusion_physical_expr::{PhysicalExpr, expressions::Column}; + +/// Rewrite column references in a physical expr according to a mapping. +/// +/// This rewriter traverses the expression tree and replaces [`Column`] nodes +/// with the corresponding expression found in the `column_map`. +/// +/// If a column is found in the map, it is replaced by the mapped expression. +/// If a column is NOT found in the map, a `DataFusionError::Internal` is +/// returned. +pub struct PhysicalColumnRewriter<'a> { + /// Mapping from original column to new column. + pub column_map: &'a HashMap>, +} + +impl<'a> PhysicalColumnRewriter<'a> { + /// Create a new PhysicalColumnRewriter with the given column mapping. + pub fn new(column_map: &'a HashMap>) -> Self { + Self { column_map } + } +} + +impl<'a> TreeNodeRewriter for PhysicalColumnRewriter<'a> { + type Node = Arc; + + fn f_down( + &mut self, + node: Self::Node, + ) -> datafusion_common::Result> { + if let Some(column) = node.as_any().downcast_ref::() { + if let Some(new_column) = self.column_map.get(column) { + // jump to prevent rewriting the new sub-expression again + return Ok(Transformed::new( + Arc::clone(new_column), + true, + TreeNodeRecursion::Jump, + )); + } else { + // Column not found in mapping + return Err(DataFusionError::Internal(format!( + "Column {column:?} not found in column mapping {:?}", + self.column_map + ))); + } + } + Ok(Transformed::no(node)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{DataFusionError, Result, tree_node::TreeNode}; + use datafusion_physical_expr::{ + PhysicalExpr, + expressions::{Column, binary, col, lit}, + }; + use std::sync::Arc; + + /// Helper function to create a test schema + fn create_test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + Field::new("e", DataType::Int32, true), + Field::new("new_col", DataType::Int32, true), + Field::new("inner_col", DataType::Int32, true), + Field::new("another_col", DataType::Int32, true), + ])) + } + + /// Helper function to create a complex nested expression with multiple columns + /// Create: (col_a + col_b) * (col_c - col_d) + col_e + fn create_complex_expression(schema: &Schema) -> Arc { + let col_a = col("a", schema).unwrap(); + let col_b = col("b", schema).unwrap(); + let col_c = col("c", schema).unwrap(); + let col_d = col("d", schema).unwrap(); + let col_e = col("e", schema).unwrap(); + + let add_expr = + binary(col_a, datafusion_expr::Operator::Plus, col_b, schema).unwrap(); + let sub_expr = + binary(col_c, datafusion_expr::Operator::Minus, col_d, schema).unwrap(); + let mul_expr = binary( + add_expr, + datafusion_expr::Operator::Multiply, + sub_expr, + schema, + ) + .unwrap(); + binary(mul_expr, datafusion_expr::Operator::Plus, col_e, schema).unwrap() + } + + /// Helper function to create a deeply nested expression + /// Create: col_a + (col_b + (col_c + (col_d + col_e))) + fn create_deeply_nested_expression(schema: &Schema) -> Arc { + let col_a = col("a", schema).unwrap(); + let col_b = col("b", schema).unwrap(); + let col_c = col("c", schema).unwrap(); + let col_d = col("d", schema).unwrap(); + let col_e = col("e", schema).unwrap(); + + let inner1 = + binary(col_d, datafusion_expr::Operator::Plus, col_e, schema).unwrap(); + let inner2 = + binary(col_c, datafusion_expr::Operator::Plus, inner1, schema).unwrap(); + let inner3 = + binary(col_b, datafusion_expr::Operator::Plus, inner2, schema).unwrap(); + binary(col_a, datafusion_expr::Operator::Plus, inner3, schema).unwrap() + } + + #[test] + fn test_simple_column_replacement_with_jump() -> Result<()> { + let schema = create_test_schema(); + + // Test that Jump prevents re-processing of replaced columns + let mut column_map = HashMap::new(); + column_map.insert(Column::new_with_schema("a", &schema).unwrap(), lit(42i32)); + column_map.insert( + Column::new_with_schema("b", &schema).unwrap(), + lit("replaced_b"), + ); + column_map.insert( + Column::new_with_schema("c", &schema).unwrap(), + col("c", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("d", &schema).unwrap(), + col("d", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("e", &schema).unwrap(), + col("e", &schema).unwrap(), + ); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + let expr = create_complex_expression(&schema); + + let result = expr.rewrite(&mut rewriter)?; + + // Verify the transformation occurred + assert!(result.transformed); + + assert_eq!( + format!("{}", result.data), + "(42 + replaced_b) * (c@2 - d@3) + e@4" + ); + + Ok(()) + } + + #[test] + fn test_nested_column_replacement_with_jump() -> Result<()> { + let schema = create_test_schema(); + // Test Jump behavior with deeply nested expressions + let mut column_map = HashMap::new(); + // Replace col_c with a complex expression containing new columns + let replacement_expr = binary( + lit(100i32), + datafusion_expr::Operator::Plus, + col("new_col", &schema).unwrap(), + &schema, + ) + .unwrap(); + column_map.insert( + Column::new_with_schema("c", &schema).unwrap(), + replacement_expr, + ); + column_map.insert( + Column::new_with_schema("a", &schema).unwrap(), + col("a", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("b", &schema).unwrap(), + col("b", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("d", &schema).unwrap(), + col("d", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("e", &schema).unwrap(), + col("e", &schema).unwrap(), + ); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + let expr = create_deeply_nested_expression(&schema); + + let result = expr.rewrite(&mut rewriter)?; + + // Verify transformation occurred + assert!(result.transformed); + + assert_eq!( + format!("{}", result.data), + "a@0 + b@1 + 100 + new_col@5 + d@3 + e@4" + ); + + Ok(()) + } + + #[test] + fn test_circular_reference_prevention() -> Result<()> { + let schema = create_test_schema(); + // Test that Jump prevents infinite recursion with circular references + let mut column_map = HashMap::new(); + + // Create a circular reference: col_a -> col_b -> col_a (but Jump should prevent the second visit) + column_map.insert( + Column::new_with_schema("a", &schema).unwrap(), + col("b", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("b", &schema).unwrap(), + col("a", &schema).unwrap(), + ); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + + // Start with an expression containing col_a + let expr = binary( + col("a", &schema).unwrap(), + datafusion_expr::Operator::Plus, + col("b", &schema).unwrap(), + &schema, + ) + .unwrap(); + + let result = expr.rewrite(&mut rewriter)?; + + // Verify transformation occurred + assert!(result.transformed); + + assert_eq!(format!("{}", result.data), "b@1 + a@0"); + + Ok(()) + } + + #[test] + fn test_multiple_replacements_in_same_expression() -> Result<()> { + let schema = create_test_schema(); + // Test multiple column replacements in the same complex expression + let mut column_map = HashMap::new(); + + // Replace multiple columns with literals + column_map.insert(Column::new_with_schema("a", &schema).unwrap(), lit(10i32)); + column_map.insert(Column::new_with_schema("c", &schema).unwrap(), lit(20i32)); + column_map.insert(Column::new_with_schema("e", &schema).unwrap(), lit(30i32)); + column_map.insert( + Column::new_with_schema("b", &schema).unwrap(), + col("b", &schema).unwrap(), + ); + column_map.insert( + Column::new_with_schema("d", &schema).unwrap(), + col("d", &schema).unwrap(), + ); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + let expr = create_complex_expression(&schema); // (col_a + col_b) * (col_c - col_d) + col_e + + let result = expr.rewrite(&mut rewriter)?; + + // Verify transformation occurred + assert!(result.transformed); + assert_eq!(format!("{}", result.data), "(10 + b@1) * (20 - d@3) + 30"); + + Ok(()) + } + + #[test] + fn test_jump_with_complex_replacement_expression() -> Result<()> { + let schema = create_test_schema(); + // Test Jump behavior when replacing with very complex expressions + let mut column_map = HashMap::new(); + + // Replace col_a with a complex nested expression + let inner_expr = binary( + lit(5i32), + datafusion_expr::Operator::Multiply, + col("a", &schema).unwrap(), + &schema, + ) + .unwrap(); + let middle_expr = binary( + inner_expr, + datafusion_expr::Operator::Plus, + lit(3i32), + &schema, + ) + .unwrap(); + let complex_replacement = binary( + middle_expr, + datafusion_expr::Operator::Minus, + col("another_col", &schema).unwrap(), + &schema, + ) + .unwrap(); + + column_map.insert( + Column::new_with_schema("a", &schema).unwrap(), + complex_replacement, + ); + column_map.insert( + Column::new_with_schema("b", &schema).unwrap(), + col("b", &schema).unwrap(), + ); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + + // Create expression: col_a + col_b + let expr = binary( + col("a", &schema).unwrap(), + datafusion_expr::Operator::Plus, + col("b", &schema).unwrap(), + &schema, + ) + .unwrap(); + + let result = expr.rewrite(&mut rewriter)?; + + assert_eq!( + format!("{}", result.data), + "5 * a@0 + 3 - another_col@7 + b@1" + ); + + // Verify transformation occurred + assert!(result.transformed); + + Ok(()) + } + + #[test] + fn test_unmapped_columns_detection() -> Result<()> { + let schema = create_test_schema(); + let mut column_map = HashMap::new(); + + // Only map col_a, leave col_b unmapped + column_map.insert(Column::new_with_schema("a", &schema).unwrap(), lit(42i32)); + + let mut rewriter = PhysicalColumnRewriter::new(&column_map); + + // Create expression: col_a + col_b + let expr = binary( + col("a", &schema).unwrap(), + datafusion_expr::Operator::Plus, + col("b", &schema).unwrap(), + &schema, + ) + .unwrap(); + + let err = expr.rewrite(&mut rewriter).unwrap_err(); + assert!(matches!(err, DataFusionError::Internal(_))); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 1274e954eaeb3..37cbd684909ba 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -37,8 +37,12 @@ use std::collections::HashSet; use std::sync::Arc; -use datafusion_common::Result; -use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; +use arrow_schema::Schema; +use datafusion_common::{ + Result, + tree_node::{TreeNode, TreeNodeRecursion}, +}; +use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::Itertools; @@ -306,6 +310,56 @@ pub struct ChildFilterDescription { pub(crate) self_filters: Vec>, } +/// A utility for checking whether a filter expression can be pushed down +/// to a child node based on column availability. +/// +/// This checker validates that all columns referenced in a filter expression +/// exist in the target schema. If any column in the filter is not present +/// in the schema, the filter cannot be pushed down to that child. +pub(crate) struct FilterColumnChecker<'a> { + column_names: HashSet<&'a str>, +} + +impl<'a> FilterColumnChecker<'a> { + /// Creates a new [`FilterColumnChecker`] from the given schema. + /// + /// Extracts all column names from the schema's fields to build + /// a lookup set for efficient column existence checks. + pub(crate) fn new(input_schema: &'a Schema) -> Self { + let column_names: HashSet<&str> = input_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + Self { column_names } + } + + /// Checks whether a filter expression can be pushed down to the child + /// whose schema was used to create this checker. + /// + /// Returns `true` if all [`Column`] references in the filter expression + /// exist in the target schema, `false` otherwise. + /// + /// This method traverses the entire expression tree, checking each + /// column reference against the available column names. + pub(crate) fn can_pushdown(&self, filter: &Arc) -> bool { + let mut can_apply = true; + filter + .apply(|expr| { + if let Some(column) = expr.as_any().downcast_ref::() + && !self.column_names.contains(column.name()) + { + can_apply = false; + return Ok(TreeNodeRecursion::Stop); + } + + Ok(TreeNodeRecursion::Continue) + }) + .expect("infallible traversal"); + can_apply + } +} + impl ChildFilterDescription { /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. /// @@ -320,26 +374,14 @@ impl ChildFilterDescription { ) -> Result { let child_schema = child.schema(); - // Get column names from child schema for quick lookup - let child_column_names: HashSet<&str> = child_schema - .fields() - .iter() - .map(|f| f.name().as_str()) - .collect(); + // Build a set of column names in the child schema for quick lookup + let checker = FilterColumnChecker::new(&child_schema); // Analyze each parent filter let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { - // Check which columns the filter references - let referenced_columns = collect_columns(filter); - - // Check if all referenced columns exist in the child schema - let all_columns_exist = referenced_columns - .iter() - .all(|col| child_column_names.contains(col.name())); - - if all_columns_exist { + if checker.can_pushdown(filter) { // All columns exist in child - we can push down // Need to reassign column indices to match child schema let reassigned_filter = diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index ec8e154caec91..9352a143c11f8 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -68,6 +68,7 @@ pub mod async_func; pub mod coalesce; pub mod coalesce_batches; pub mod coalesce_partitions; +pub mod column_rewriter; pub mod common; pub mod coop; pub mod display; diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8f2f2219f4338..8d4c775f87348 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -26,10 +26,11 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics, }; +use crate::column_rewriter::PhysicalColumnRewriter; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + ChildFilterDescription, ChildPushdownResult, FilterColumnChecker, FilterDescription, + FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; @@ -45,11 +46,11 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{JoinSide, Result, internal_err}; +use datafusion_common::{DataFusionError, JoinSide, Result, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::projection::Projector; -use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql}; use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, PhysicalSortExpr, @@ -192,6 +193,29 @@ impl ProjectionExec { input.boundedness(), )) } + + /// Collect reverse alias mapping from projection expressions. + /// The result hash map is a map from aliased Column in parent to original expr. + fn collect_reverse_alias( + &self, + ) -> Result>> { + let mut alias_map = datafusion_common::HashMap::new(); + for projection in self.projection_expr().iter() { + let (aliased_index, _output_field) = self + .projector + .output_schema() + .column_with_name(&projection.alias) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Expr {} with alias {} not found in output schema", + projection.expr, projection.alias + )) + })?; + let aliased_col = Column::new(&projection.alias, aliased_index); + alias_map.insert(aliased_col, Arc::clone(&projection.expr)); + } + Ok(alias_map) + } } impl DisplayAs for ProjectionExec { @@ -347,10 +371,31 @@ impl ExecutionPlan for ProjectionExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - // TODO: In future, we can try to handle inverting aliases here. - // For the time being, we pass through untransformed filters, so filters on aliases are not handled. - // https://github.com/apache/datafusion/issues/17246 - FilterDescription::from_children(parent_filters, &self.children()) + // expand alias column to original expr in parent filters + let invert_alias_map = self.collect_reverse_alias()?; + let output_schema = self.schema(); + let checker = FilterColumnChecker::new(&output_schema); + let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); + + for filter in parent_filters { + if !checker.can_pushdown(&filter) { + child_parent_filters.push(PushedDownPredicate::unsupported(filter)); + continue; + } + // All columns exist in child - we can push down + // Need to reassign column indices to match child schema + let reassigned_filter = reassign_expr_columns(filter, &output_schema)?; + // rewrite filter expression using invert alias map + let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); + let rewritten = reassigned_filter.rewrite(&mut rewriter)?.data; + + child_parent_filters.push(PushedDownPredicate::supported(rewritten)); + } + + Ok(FilterDescription::new().with_child(ChildFilterDescription { + parent_filters: child_parent_filters, + self_filters: vec![], + })) } fn handle_child_pushdown_result( @@ -1086,6 +1131,7 @@ mod tests { use crate::common::collect; + use crate::filter_pushdown::PushedDown; use crate::test; use crate::test::exec::StatisticsExec; @@ -1094,7 +1140,9 @@ mod tests { use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; use datafusion_expr::Operator; - use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, col}; + use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit, + }; #[test] fn test_collect_column_indices() -> Result<()> { @@ -1283,4 +1331,431 @@ mod tests { ); assert!(stats.total_byte_size.is_exact().unwrap_or(false)); } + + #[test] + fn test_filter_pushdown_with_alias() -> Result<()> { + let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics::new_unknown(&input_schema), + input_schema.clone(), + )); + + // project "a" as "b" + let projection = ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "b".to_string(), + }], + input, + )?; + + // filter "b > 5" + let filter = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter], + &ConfigOptions::default(), + )?; + + // Should be converted to "a > 5" + // "a" is index 0 in input + let expected_filter = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + assert_eq!(description.self_filters(), vec![vec![]]); + let pushed_filters = &description.parent_filters()[0]; + assert_eq!( + format!("{}", pushed_filters[0].predicate), + format!("{}", expected_filter) + ); + // Verify the predicate was actually pushed down + assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes)); + + Ok(()) + } + + #[test] + fn test_filter_pushdown_with_multiple_aliases() -> Result<()> { + let input_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.clone(), + )); + + // project "a" as "x", "b" as "y" + let projection = ProjectionExec::try_new( + vec![ + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "x".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("b", 1)), + alias: "y".to_string(), + }, + ], + input, + )?; + + // filter "x > 5" + let filter1 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("x", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + // filter "y < 10" + let filter2 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("y", 1)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter1, filter2], + &ConfigOptions::default(), + )?; + + // Should be converted to "a > 5" and "b < 10" + let expected_filter1 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + let expected_filter2 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )) as Arc; + + let pushed_filters = &description.parent_filters()[0]; + assert_eq!(pushed_filters.len(), 2); + // Note: The order of filters is preserved + assert_eq!( + format!("{}", pushed_filters[0].predicate), + format!("{}", expected_filter1) + ); + assert_eq!( + format!("{}", pushed_filters[1].predicate), + format!("{}", expected_filter2) + ); + // Verify the predicates were actually pushed down + assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes)); + assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes)); + + Ok(()) + } + + #[test] + fn test_filter_pushdown_with_swapped_aliases() -> Result<()> { + let input_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.clone(), + )); + + // project "a" as "b", "b" as "a" + let projection = ProjectionExec::try_new( + vec![ + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "b".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("b", 1)), + alias: "a".to_string(), + }, + ], + input, + )?; + + // filter "b > 5" (output column 0, which is "a" in input) + let filter1 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + // filter "a < 10" (output column 1, which is "b" in input) + let filter2 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 1)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter1, filter2], + &ConfigOptions::default(), + )?; + + let pushed_filters = &description.parent_filters()[0]; + assert_eq!(pushed_filters.len(), 2); + + // "b" (output index 0) -> "a" (input index 0) + let expected_filter1 = "a@0 > 5"; + // "a" (output index 1) -> "b" (input index 1) + let expected_filter2 = "b@1 < 10"; + + assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1); + assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2); + // Verify the predicates were actually pushed down + assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes)); + assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes)); + + Ok(()) + } + + #[test] + fn test_filter_pushdown_with_mixed_columns() -> Result<()> { + let input_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.clone(), + )); + + // project "a" as "x", "b" as "b" (pass through) + let projection = ProjectionExec::try_new( + vec![ + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "x".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("b", 1)), + alias: "b".to_string(), + }, + ], + input, + )?; + + // filter "x > 5" + let filter1 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("x", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + // filter "b < 10" (using output index 1 which corresponds to 'b') + let filter2 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter1, filter2], + &ConfigOptions::default(), + )?; + + let pushed_filters = &description.parent_filters()[0]; + assert_eq!(pushed_filters.len(), 2); + // "x" -> "a" (index 0) + let expected_filter1 = "a@0 > 5"; + // "b" -> "b" (index 1) + let expected_filter2 = "b@1 < 10"; + + assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1); + assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2); + // Verify the predicates were actually pushed down + assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes)); + assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes)); + + Ok(()) + } + + #[test] + fn test_filter_pushdown_with_complex_expression() -> Result<()> { + let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.clone(), + )); + + // project "a + 1" as "z" + let projection = ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )), + alias: "z".to_string(), + }], + input, + )?; + + // filter "z > 10" + let filter = Arc::new(BinaryExpr::new( + Arc::new(Column::new("z", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter], + &ConfigOptions::default(), + )?; + + // expand to `a + 1 > 10` + let pushed_filters = &description.parent_filters()[0]; + assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes)); + assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10"); + + Ok(()) + } + + #[test] + fn test_filter_pushdown_with_unknown_column() -> Result<()> { + let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.clone(), + )); + + // project "a" as "a" + let projection = ProjectionExec::try_new( + vec![ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "a".to_string(), + }], + input, + )?; + + // filter "unknown_col > 5" - using a column name that doesn't exist in projection output + // Column constructor: name, index. Index 1 doesn't exist. + let filter = Arc::new(BinaryExpr::new( + Arc::new(Column::new("unknown_col", 1)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )) as Arc; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![filter], + &ConfigOptions::default(), + )?; + + let pushed_filters = &description.parent_filters()[0]; + assert!(matches!(pushed_filters[0].discriminant, PushedDown::No)); + // The column shouldn't be found in the alias map, so it remains unchanged with its index + assert_eq!( + format!("{}", pushed_filters[0].predicate), + "unknown_col@1 > 5" + ); + + Ok(()) + } + + /// Basic test for `DynamicFilterPhysicalExpr` can correctly update its child expression + /// i.e. starting with lit(true) and after update it becomes `a > 5` + /// with projection [b - 1 as a], the pushed down filter should be `b - 1 > 5` + #[test] + fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> { + let input_schema = + Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)])); + + let input = Arc::new(StatisticsExec::new( + Statistics { + column_statistics: vec![Default::default(); input_schema.fields().len()], + ..Default::default() + }, + input_schema.as_ref().clone(), + )); + + // project "b" - 1 as "a" + let projection = ProjectionExec::try_new( + vec![ProjectionExpr { + expr: binary( + Arc::new(Column::new("b", 0)), + Operator::Minus, + lit(1), + &input_schema, + ) + .unwrap(), + alias: "a".to_string(), + }], + input, + )?; + + // simulate projection's parent create a dynamic filter on "a" + let projected_schema = projection.schema(); + let col_a = col("a", &projected_schema)?; + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + lit(true), + )); + // Initial state should be lit(true) + let current = dynamic_filter.current()?; + assert_eq!(format!("{current}"), "true"); + + let dyn_phy_expr: Arc = Arc::clone(&dynamic_filter) as _; + + let description = projection.gather_filters_for_pushdown( + FilterPushdownPhase::Post, + vec![dyn_phy_expr], + &ConfigOptions::default(), + )?; + + let pushed_filters = &description.parent_filters()[0][0]; + + // Check currently pushed_filters is lit(true) + assert_eq!( + format!("{}", pushed_filters.predicate), + "DynamicFilter [ empty ]" + ); + + // Update to a > 5 (after projection, b is now called a) + let new_expr = + Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32))); + dynamic_filter.update(new_expr)?; + + // Now it should be a > 5 + let current = dynamic_filter.current()?; + assert_eq!(format!("{current}"), "a@0 > 5"); + + // Check currently pushed_filters is b - 1 > 5 (because b - 1 is projected as a) + assert_eq!( + format!("{}", pushed_filters.predicate), + "DynamicFilter [ b@0 - 1 > 5 ]" + ); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index 3e403171e0718..38a5b11870765 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -92,6 +92,30 @@ physical_plan 01)SortExec: TopK(fetch=3), expr=[value@1 DESC], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value, name], file_type=parquet, predicate=DynamicFilter [ empty ] +statement ok +set datafusion.explain.analyze_level = summary; + +query TT +EXPLAIN ANALYZE SELECT id, value AS v, value + id as name FROM test_parquet where value > 3 ORDER BY v DESC LIMIT 3; +---- +Plan with Metrics +01)SortPreservingMergeExec: [v@1 DESC], fetch=3, metrics=[output_rows=3, ] +02)--SortExec: TopK(fetch=3), expr=[v@1 DESC], preserve_partitioning=[true], filter=[v@1 IS NULL OR v@1 > 800], metrics=[output_rows=3, ] +03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, ] +04)------FilterExec: value@1 > 3, metrics=[output_rows=10, , selectivity=100% (10/10)] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, ] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, , files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_rows_pruned=10 total → 10 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18% (210/1.16 K)] + +statement ok +set datafusion.explain.analyze_level = dev; + +query III +SELECT id, value AS v, value + id as name FROM test_parquet where value > 3 ORDER BY v DESC LIMIT 3; +---- +10 1000 1010 +9 900 909 +8 800 808 + # Disable TopK dynamic filter pushdown statement ok SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = false; @@ -106,6 +130,13 @@ physical_plan 01)SortExec: TopK(fetch=3), expr=[value@1 DESC], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value, name], file_type=parquet +query IIT +SELECT id, value AS v, name FROM (SELECT * FROM test_parquet UNION ALL SELECT * FROM test_parquet) ORDER BY v DESC LIMIT 3; +---- +10 1000 j +10 1000 j +9 900 i + # Re-enable for next tests statement ok SET datafusion.optimizer.enable_topk_dynamic_filter_pushdown = true; diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index aba468d21fd08..8a1fef0722297 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -383,7 +383,7 @@ physical_plan 03)----ProjectionExec: expr=[__common_expr_1@0 as number_plus, number@1 as number, __common_expr_1@0 as other_number_plus, age@2 as age] 04)------ProjectionExec: expr=[CAST(number@0 AS Int64) + 1 as __common_expr_1, number@0 as number, age@1 as age] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, age], output_ordering=[number@0 DESC], file_type=parquet +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, age], output_ordering=[number@0 DESC], file_type=parquet, predicate=DynamicFilter [ empty ] # Cleanup statement ok