diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index 2d358367b46a..b62d15e6d2f1 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -65,6 +65,8 @@ //! The optimizer rule currently checks the plan for exchange-like operators and leave operators //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). +use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; use std::any::Any; @@ -73,6 +75,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::execution_plan::CardinalityEffect::{self, Equal}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -289,6 +295,24 @@ impl ExecutionPlan for CooperativeExec { fn cardinality_effect(&self) -> CardinalityEffect { Equal } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } /// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`]. diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index d9dbbef5b5ea..cf8400f4acee 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -33,11 +33,16 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution_plan::CardinalityEffect; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, @@ -273,6 +278,27 @@ impl ExecutionPlan for ProjectionExec { Ok(Some(Arc::new(projection.clone()))) } } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + // TODO: In future, we can try to handle inverting aliases here. + // For the time being, we pass through untransformed filters, so filters on aliases are not handled. + // https://github.com/apache/datafusion/issues/17246 + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } fn stats_projection( diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 54a2c5057e7c..0df361a75bae 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -392,7 +392,7 @@ physical_plan 44)-----------------------------│ -------------------- ││ -------------------- │ 45)-----------------------------│ files: 1 ││ partition_count(in->out): │ 46)-----------------------------│ format: parquet ││ 1 -> 4 │ -47)-----------------------------│ ││ │ +47)-----------------------------│ predicate: true ││ │ 48)-----------------------------│ ││ partitioning_scheme: │ 49)-----------------------------│ ││ RoundRobinBatch(4) │ 50)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘ diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index afa78e43de2b..ce59b0204616 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -372,7 +372,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[number@0 as number, letter@1 as letter, age@2 as age, number@0 as column4, letter@1 as column5] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT