diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index e793af8ed4b0..d4318235bafb 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -34,8 +34,8 @@ use datafusion_physical_plan::{ displayable, filter::FilterExec, filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPropagation, - PredicateSupport, PredicateSupports, + ChildFilterDescription, ChildPushdownResult, FilterDescription, + FilterPushdownPropagation, PredicateSupport, }, metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -228,11 +228,19 @@ impl FileSource for TestSource { ..self.clone() }); Ok(FilterPushdownPropagation { - filters: PredicateSupports::all_supported(filters), + filters: filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(), updated_node: Some(new_node), }) } else { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } } @@ -515,9 +523,12 @@ impl ExecutionPlan for TestNode { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filter(Arc::clone(&self.predicate))) + // Since TestNode marks all parent filters as supported and adds its own filter, + // we use from_child to create a description with all parent filters supported + let child = &self.input; + let child_desc = ChildFilterDescription::from_child(&parent_filters, child)? + .with_self_filter(Arc::clone(&self.predicate)); + Ok(FilterDescription::new().with_child(child_desc)) } fn handle_child_pushdown_result( @@ -534,7 +545,7 @@ impl ExecutionPlan for TestNode { let self_pushdown_result = child_pushdown_result.self_filters[0].clone(); // And pushed down 1 filter assert_eq!(self_pushdown_result.len(), 1); - let self_pushdown_result = self_pushdown_result.into_inner(); + let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect(); match &self_pushdown_result[0] { PredicateSupport::Unsupported(filter) => { diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b7c5b5d37686..8ca36e7cd321 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; -use datafusion_physical_plan::filter_pushdown::PredicateSupports; +use datafusion_physical_plan::filter_pushdown::{ + FilterPushdownPropagation, PredicateSupport, +}; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -621,7 +622,12 @@ impl FileSource for ParquetSource { config: &ConfigOptions, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { - return Ok(FilterPushdownPropagation::unsupported(filters)); + return Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )); }; // Determine if based on configs we should push filters down. // If either the table / scan itself or the config has pushdown enabled, @@ -635,20 +641,36 @@ impl FileSource for ParquetSource { let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; let mut source = self.clone(); - let filters = PredicateSupports::new_with_supported_check(filters, |filter| { - can_expr_be_pushed_down_with_schemas(filter, &file_schema) - }); - if filters.is_all_unsupported() { + let filters: Vec = filters + .into_iter() + .map(|filter| { + if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { + PredicateSupport::Supported(filter) + } else { + PredicateSupport::Unsupported(filter) + } + }) + .collect(); + if filters + .iter() + .all(|f| matches!(f, PredicateSupport::Unsupported(_))) + { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. return Ok(FilterPushdownPropagation::with_filters(filters)); } - let allowed_filters = filters.collect_supported(); + let allowed_filters = filters + .iter() + .filter_map(|f| match f { + PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Unsupported(_) => None, + }) + .collect_vec(); let predicate = match source.predicate { - Some(predicate) => conjunction( - std::iter::once(predicate).chain(allowed_filters.iter().cloned()), - ), - None => conjunction(allowed_filters.iter().cloned()), + Some(predicate) => { + conjunction(std::iter::once(predicate).chain(allowed_filters)) + } + None => conjunction(allowed_filters), }; source.predicate = Some(predicate); source = source.with_pushdown_filters(pushdown_filters); @@ -657,7 +679,10 @@ impl FileSource for ParquetSource { // even if we updated the predicate to include the filters (they will only be used for stats pruning). if !pushdown_filters { return Ok(FilterPushdownPropagation::with_filters( - filters.make_unsupported(), + filters + .into_iter() + .map(|f| PredicateSupport::Unsupported(f.into_inner())) + .collect_vec(), ) .with_updated_node(source)); } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index c5f21ebf1a0f..a95c07cc319c 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -30,7 +30,9 @@ use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; +use datafusion_physical_plan::filter_pushdown::{ + FilterPushdownPropagation, PredicateSupport, +}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -120,7 +122,12 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } /// Set optional schema adapter factory. diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 4dda95b0856b..78d7e295604d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -38,7 +38,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, }; /// A source of data, typically a list of files or memory @@ -168,7 +168,12 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::unsupported(filters)) + Ok(FilterPushdownPropagation::with_filters( + filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(), + )) } } @@ -316,7 +321,14 @@ impl ExecutionPlan for DataSourceExec { ) -> Result>> { // Push any remaining filters into our data source let res = self.data_source.try_pushdown_filters( - child_pushdown_result.parent_filters.collect_all(), + child_pushdown_result + .parent_filters + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) => expr, + PredicateSupport::Unsupported(expr) => expr, + }) + .collect(), config, )?; match res.updated_node { diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 885280576b4b..eb7ec81b260e 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -15,6 +15,22 @@ // specific language governing permissions and limitations // under the License. +//! Filter Pushdown Optimization Process +//! +//! The filter pushdown mechanism involves four key steps: +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] +//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] +//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child, +//! passing the appropriate filters (`Vec>`) for that child. +//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, +//! containing information about which filters were successfully pushed down vs. unsupported. +//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, +//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides +//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription + use std::sync::Arc; use crate::PhysicalOptimizerRule; @@ -22,12 +38,11 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, - PredicateSupport, PredicateSupports, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use itertools::izip; +use itertools::{izip, Itertools}; /// Attempts to recursively push given filters from the top of the tree into leafs. /// @@ -497,10 +512,10 @@ fn push_down_filters( // Our child doesn't know the difference between filters that were passed down // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. - let mut all_filters = result.filters.into_inner(); + let mut all_filters = result.filters.into_iter().collect_vec(); let parent_predicates = all_filters.split_off(num_self_filters); let self_predicates = all_filters; - self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates)); + self_filters_pushdown_supports.push(self_predicates); for (idx, result) in parent_supported_predicate_indices .iter() @@ -533,21 +548,15 @@ fn push_down_filters( let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; // Remap the result onto the parent filters as they were given to us. // Any filters that were not pushed down to any children are marked as unsupported. - let parent_pushdown_result = PredicateSupports::new( - parent_predicates_pushdown_states - .into_iter() - .zip(parent_predicates) - .map(|(state, filter)| match state { - ParentPredicateStates::NoChildren => { - PredicateSupport::Unsupported(filter) - } - ParentPredicateStates::Unsupported => { - PredicateSupport::Unsupported(filter) - } - ParentPredicateStates::Supported => PredicateSupport::Supported(filter), - }) - .collect(), - ); + let parent_pushdown_result = parent_predicates_pushdown_states + .into_iter() + .zip(parent_predicates) + .map(|(state, filter)| match state { + ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter), + ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter), + ParentPredicateStates::Supported => PredicateSupport::Supported(filter), + }) + .collect(); // TODO: by calling `handle_child_pushdown_result` we are assuming that the // `ExecutionPlan` implementation will not change the plan itself. // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 78bd4b4fc3a0..d008db48b82b 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -234,8 +234,7 @@ impl ExecutionPlan for CoalesceBatchesExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + FilterDescription::from_children(parent_filters, &self.children()) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 90385c58a6ac..375de7c7385e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,8 +17,8 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PredicateSupport, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -33,6 +33,7 @@ pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ expressions, Distribution, Partitioning, PhysicalExpr, }; +use itertools::Itertools; use std::any::Any; use std::fmt::Debug; @@ -520,10 +521,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok( - FilterDescription::new_with_child_count(self.children().len()) - .all_parent_filters_unsupported(parent_filters), - ) + // Default implementation: mark all filters as unsupported for all children + let mut desc = FilterDescription::new(); + let child_filters = parent_filters + .iter() + .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .collect_vec(); + for _ in 0..self.children().len() { + desc = desc.with_child(ChildFilterDescription { + parent_filters: child_filters.clone(), + self_filters: vec![], + }); + } + Ok(desc) } /// Handle the result of a child pushdown. @@ -587,16 +597,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// **Helper Methods for Customization:** /// There are various helper methods to simplify implementing this method: - /// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node - /// does not support filter pushdown at all, rejecting all filters. /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node /// supports filter pushdown but does not modify it, simply transmitting /// the children's pushdown results back up to its parent. - /// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to - /// dynamically determine support for each filter, useful with - /// [`FilterPushdownPropagation::with_filters`] and - /// [`FilterPushdownPropagation::with_updated_node`] to build mixed results - /// of supported and unsupported filters. + /// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters + /// to the propagation result, indicating which filters are supported by + /// the current node. + /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the + /// current node in the propagation result, used if the node + /// has modified its plan based on the pushdown results. /// /// **Filter Pushdown Phases:** /// There are two different phases in filter pushdown (`Pre` and others), @@ -605,7 +614,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported - /// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 252af9ebcd49..54015c7bcdd2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,11 +16,12 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; +use itertools::Itertools; + use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -28,8 +29,8 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, + ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, PredicateSupport, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -46,9 +47,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; @@ -65,7 +63,6 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; -use itertools::Itertools; use log::trace; const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; @@ -455,56 +452,26 @@ impl ExecutionPlan for FilterExec { _config: &ConfigOptions, ) -> Result { if !matches!(phase, FilterPushdownPhase::Pre) { - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)); + // For non-pre phase, filters pass through unchanged + let filter_supports = parent_filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(); + return Ok(FilterDescription::new().with_child(ChildFilterDescription { + parent_filters: filter_supports, + self_filters: vec![], + })); } - let self_filter = split_conjunction(&self.predicate) - .into_iter() - .cloned() - .collect_vec(); - let parent_filters = if let Some(projection_indices) = self.projection.as_ref() { - // We need to invert the projection on any referenced columns in the filter - // Create a mapping from the output columns to the input columns (the inverse of the projection) - let inverse_projection = projection_indices - .iter() - .enumerate() - .map(|(i, &p)| (p, i)) - .collect::>(); - parent_filters - .into_iter() - .map(|f| { - f.transform_up(|expr| { - let mut res = - if let Some(col) = expr.as_any().downcast_ref::() { - let index = col.index(); - let index_in_input_schema = - inverse_projection.get(&index).ok_or_else(|| { - DataFusionError::Internal(format!( - "Column {index} not found in projection" - )) - })?; - Transformed::yes(Arc::new(Column::new( - col.name(), - *index_in_input_schema, - )) as _) - } else { - Transformed::no(expr) - }; - // Columns can only exist in the leaves, no need to try all nodes - res.tnr = TreeNodeRecursion::Jump; - Ok(res) - }) - .data() - }) - .collect::>>()? - } else { - parent_filters - }; + let child = ChildFilterDescription::from_child(&parent_filters, self.input())? + .with_self_filters( + split_conjunction(&self.predicate) + .into_iter() + .cloned() + .collect(), + ); - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filters_for_children(vec![self_filter])) + Ok(FilterDescription::new().with_child(child)) } fn handle_child_pushdown_result( @@ -519,15 +486,26 @@ impl ExecutionPlan for FilterExec { )); } // We absorb any parent filters that were not handled by our children - let mut unhandled_filters = - child_pushdown_result.parent_filters.collect_unsupported(); + let mut unhandled_filters = child_pushdown_result + .parent_filters + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect_vec(); assert_eq!( child_pushdown_result.self_filters.len(), 1, "FilterExec should only have one child" ); - let unsupported_self_filters = - child_pushdown_result.self_filters[0].collect_unsupported(); + let unsupported_self_filters = child_pushdown_result.self_filters[0] + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect_vec(); unhandled_filters.extend(unsupported_self_filters); // If we have unhandled filters, we need to create a new FilterExec @@ -577,8 +555,15 @@ impl ExecutionPlan for FilterExec { }; Some(Arc::new(new) as _) }; + // Mark all parent filters as supported since we absorbed them + let supported_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|f| PredicateSupport::Supported(f.into_inner())) + .collect(); + Ok(FilterPushdownPropagation { - filters: child_pushdown_result.parent_filters.make_supported(), + filters: supported_filters, updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 725abd7fc8b5..b50f86382edf 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -15,9 +15,30 @@ // specific language governing permissions and limitations // under the License. +//! Filter Pushdown Optimization Process +//! +//! The filter pushdown mechanism involves four key steps: +//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`] +//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`] +//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child. +//! 2. **Optimizer Executes Pushdown**: The optimizer recursively pushes down filters for each child, +//! passing the appropriate filters (`Vec>`) for that child. +//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children, +//! containing information about which filters were successfully pushed down vs. unsupported. +//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent, +//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides +//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes). +//! +//! [`ExecutionPlan::gather_filters_for_pushdown`]: crate::ExecutionPlan::gather_filters_for_pushdown +//! [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result +//! +//! See also datafusion/physical-optimizer/src/filter_pushdown.rs. + +use std::collections::HashSet; use std::sync::Arc; -use std::vec::IntoIter; +use datafusion_common::Result; +use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; #[derive(Debug, Clone, Copy)] @@ -83,163 +104,6 @@ impl PredicateSupport { } } -/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of -/// supported and unsupported filters. Inner vector stores each predicate for one node. -#[derive(Debug, Clone)] -pub struct PredicateSupports(Vec); - -impl PredicateSupports { - /// Create a new FilterPushdowns with the given filters and their pushdown status. - pub fn new(pushdowns: Vec) -> Self { - Self(pushdowns) - } - - /// Create a new [`PredicateSupport`] with all filters as supported. - pub fn all_supported(filters: Vec>) -> Self { - let pushdowns = filters - .into_iter() - .map(PredicateSupport::Supported) - .collect(); - Self::new(pushdowns) - } - - /// Create a new [`PredicateSupport`] with all filters as unsupported. - pub fn all_unsupported(filters: Vec>) -> Self { - let pushdowns = filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(); - Self::new(pushdowns) - } - - /// Create a new [`PredicateSupport`] with filterrs marked as supported if - /// `f` returns true and unsupported otherwise. - pub fn new_with_supported_check( - filters: Vec>, - check: impl Fn(&Arc) -> bool, - ) -> Self { - let pushdowns = filters - .into_iter() - .map(|f| { - if check(&f) { - PredicateSupport::Supported(f) - } else { - PredicateSupport::Unsupported(f) - } - }) - .collect(); - Self::new(pushdowns) - } - - /// Transform all filters to supported, returning a new [`PredicateSupports`] - /// with all filters as [`PredicateSupport::Supported`]. - /// This does not modify the original [`PredicateSupport`]. - pub fn make_supported(self) -> Self { - let pushdowns = self - .0 - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => PredicateSupport::Supported(expr), - PredicateSupport::Unsupported(expr) => PredicateSupport::Supported(expr), - }) - .collect(); - Self::new(pushdowns) - } - - /// Transform all filters to unsupported, returning a new [`PredicateSupports`] - /// with all filters as [`PredicateSupport::Supported`]. - /// This does not modify the original [`PredicateSupport`]. - pub fn make_unsupported(self) -> Self { - let pushdowns = self - .0 - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => PredicateSupport::Unsupported(expr), - u @ PredicateSupport::Unsupported(_) => u, - }) - .collect(); - Self::new(pushdowns) - } - - /// Collect unsupported filters into a Vec, without removing them from the original - /// [`PredicateSupport`]. - pub fn collect_unsupported(&self) -> Vec> { - self.0 - .iter() - .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Supported(_) => None, - }) - .collect() - } - - /// Collect supported filters into a Vec, without removing them from the original - /// [`PredicateSupport`]. - pub fn collect_supported(&self) -> Vec> { - self.0 - .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, - }) - .collect() - } - - /// Collect all filters into a Vec, without removing them from the original - /// FilterPushdowns. - pub fn collect_all(self) -> Vec> { - self.0 - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) - | PredicateSupport::Unsupported(expr) => expr, - }) - .collect() - } - - pub fn into_inner(self) -> Vec { - self.0 - } - - /// Return an iterator over the inner `Vec`. - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } - - /// Return the number of filters in the inner `Vec`. - pub fn len(&self) -> usize { - self.0.len() - } - - /// Check if the inner `Vec` is empty. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - /// Check if all filters are supported. - pub fn is_all_supported(&self) -> bool { - self.0 - .iter() - .all(|f| matches!(f, PredicateSupport::Supported(_))) - } - - /// Check if all filters are unsupported. - pub fn is_all_unsupported(&self) -> bool { - self.0 - .iter() - .all(|f| matches!(f, PredicateSupport::Unsupported(_))) - } -} - -impl IntoIterator for PredicateSupports { - type Item = PredicateSupport; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - /// The result of pushing down filters into a child node. /// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. /// Nodes process this result and convert it into a [`FilterPushdownPropagation`] @@ -260,10 +124,10 @@ pub struct ChildPushdownResult { /// down into any child then the result is unsupported. /// If at least one children and all children that received the filter mark it as supported /// then the result is supported. - pub parent_filters: PredicateSupports, + pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. /// This is not combined with the parent filters so that nodes can treat each child independently. - pub self_filters: Vec, + pub self_filters: Vec>, } /// The result of pushing down filters into a node that it returns to its parent. @@ -276,7 +140,7 @@ pub struct ChildPushdownResult { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { - pub filters: PredicateSupports, + pub filters: Vec, pub updated_node: Option, } @@ -291,18 +155,8 @@ impl FilterPushdownPropagation { } } - /// Create a new [`FilterPushdownPropagation`] that tells the parent node - /// that none of the parent filters were not pushed down. - pub fn unsupported(parent_filters: Vec>) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); - Self { - filters: unsupported, - updated_node: None, - } - } - /// Create a new [`FilterPushdownPropagation`] with the specified filter support. - pub fn with_filters(filters: PredicateSupports) -> Self { + pub fn with_filters(filters: Vec) -> Self { Self { filters, updated_node: None, @@ -317,24 +171,76 @@ impl FilterPushdownPropagation { } #[derive(Debug, Clone)] -struct ChildFilterDescription { +pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. - parent_filters: PredicateSupports, + pub(crate) parent_filters: Vec, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. - self_filters: Vec>, + pub(crate) self_filters: Vec>, } impl ChildFilterDescription { - fn new() -> Self { - Self { - parent_filters: PredicateSupports::new(vec![]), - self_filters: vec![], + /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. + /// + /// See [`FilterDescription::from_children`] for more details + pub fn from_child( + parent_filters: &[Arc], + child: &Arc, + ) -> Result { + let child_schema = child.schema(); + + // Get column names from child schema for quick lookup + let child_column_names: HashSet<&str> = child_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + + // Analyze each parent filter + let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); + + for filter in parent_filters { + // Check which columns the filter references + let referenced_columns = collect_columns(filter); + + // Check if all referenced columns exist in the child schema + let all_columns_exist = referenced_columns + .iter() + .all(|col| child_column_names.contains(col.name())); + + if all_columns_exist { + // All columns exist in child - we can push down + // Need to reassign column indices to match child schema + let reassigned_filter = + reassign_predicate_columns(Arc::clone(filter), &child_schema, false)?; + child_parent_filters.push(PredicateSupport::Supported(reassigned_filter)); + } else { + // Some columns don't exist in child - cannot push down + child_parent_filters + .push(PredicateSupport::Unsupported(Arc::clone(filter))); + } } + + Ok(Self { + parent_filters: child_parent_filters, + self_filters: vec![], + }) + } + + /// Add a self filter (from the current node) to be pushed down to this child. + pub fn with_self_filter(mut self, filter: Arc) -> Self { + self.self_filters.push(filter); + self + } + + /// Add multiple self filters. + pub fn with_self_filters(mut self, filters: Vec>) -> Self { + self.self_filters.extend(filters); + self } } @@ -346,14 +252,46 @@ pub struct FilterDescription { child_filter_descriptions: Vec, } +impl Default for FilterDescription { + fn default() -> Self { + Self::new() + } +} + impl FilterDescription { - pub fn new_with_child_count(num_children: usize) -> Self { + /// Create a new empty FilterDescription + pub fn new() -> Self { Self { - child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], + child_filter_descriptions: vec![], } } - pub fn parent_filters(&self) -> Vec { + /// Add a child filter description + pub fn with_child(mut self, child: ChildFilterDescription) -> Self { + self.child_filter_descriptions.push(child); + self + } + + /// Build a filter description by analyzing which parent filters can be pushed to each child. + /// This method automatically determines filter routing based on column analysis: + /// - If all columns referenced by a filter exist in a child's schema, it can be pushed down + /// - Otherwise, it cannot be pushed down to that child + pub fn from_children( + parent_filters: Vec>, + children: &[&Arc], + ) -> Result { + let mut desc = Self::new(); + + // For each child, create a ChildFilterDescription + for child in children { + desc = desc + .with_child(ChildFilterDescription::from_child(&parent_filters, child)?); + } + + Ok(desc) + } + + pub fn parent_filters(&self) -> Vec> { self.child_filter_descriptions .iter() .map(|d| &d.parent_filters) @@ -368,70 +306,4 @@ impl FilterDescription { .cloned() .collect() } - - /// Mark all parent filters as supported for all children. - /// This is the case if the node allows filters to be pushed down through it - /// without any modification. - /// This broadcasts the parent filters to all children. - /// If handling of parent filters is different for each child then you should set the - /// field direclty. - /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently - /// use this to mark all parent filters as supported. - /// - /// [`RepartitionExec`]: crate::repartition::RepartitionExec - pub fn all_parent_filters_supported( - mut self, - parent_filters: Vec>, - ) -> Self { - let supported = PredicateSupports::all_supported(parent_filters); - for child in &mut self.child_filter_descriptions { - child.parent_filters = supported.clone(); - } - self - } - - /// Mark all parent filters as unsupported for all children. - /// This is the case if the node does not allow filters to be pushed down through it. - /// This broadcasts the parent filters to all children. - /// If handling of parent filters is different for each child then you should set the - /// field direclty. - /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] - /// assumes that filters cannot be pushed down to children. - /// - /// [`ExecutionPlan`]: crate::ExecutionPlan - pub fn all_parent_filters_unsupported( - mut self, - parent_filters: Vec>, - ) -> Self { - let unsupported = PredicateSupports::all_unsupported(parent_filters); - for child in &mut self.child_filter_descriptions { - child.parent_filters = unsupported.clone(); - } - self - } - - /// Add a filter generated / owned by the current node to be pushed down to all children. - /// This assumes that there is a single filter that that gets pushed down to all children - /// equally. - /// If there are multiple filters or pushdown to children is not homogeneous then - /// you should set the field directly. - /// For example: - /// - `TopK` uses this to push down a single filter to all children, it can use this method. - /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. - pub fn with_self_filter(mut self, predicate: Arc) -> Self { - for child in &mut self.child_filter_descriptions { - child.self_filters = vec![Arc::clone(&predicate)]; - } - self - } - - pub fn with_self_filters_for_children( - mut self, - filters: Vec>>, - ) -> Self { - for (child, filters) in self.child_filter_descriptions.iter_mut().zip(filters) { - child.self_filters = filters; - } - self - } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 620bfa2809a9..37dc3a767590 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -812,8 +812,7 @@ impl ExecutionPlan for RepartitionExec { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + FilterDescription::from_children(parent_filters, &self.children()) } fn handle_child_pushdown_result( diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 21f98fd01260..bb572c4315fb 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; -use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; +use crate::filter_pushdown::{ + ChildFilterDescription, FilterDescription, FilterPushdownPhase, +}; use crate::limit::LimitStream; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, @@ -1268,19 +1270,20 @@ impl ExecutionPlan for SortExec { config: &datafusion_common::config::ConfigOptions, ) -> Result { if !matches!(phase, FilterPushdownPhase::Post) { - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)); + return FilterDescription::from_children(parent_filters, &self.children()); } + + let mut child = + ChildFilterDescription::from_child(&parent_filters, self.input())?; + if let Some(filter) = &self.filter { if config.optimizer.enable_dynamic_filter_pushdown { - let filter = Arc::clone(filter) as Arc; - return Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters) - .with_self_filter(filter)); + child = + child.with_self_filter(Arc::clone(filter) as Arc); } } - Ok(FilterDescription::new_with_child_count(1) - .all_parent_filters_supported(parent_filters)) + + Ok(FilterDescription::new().with_child(child)) } }