diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index d4318235bafb..ea12bea1cf89 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -29,13 +29,13 @@ use datafusion_datasource::{ use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown}; use datafusion_physical_plan::{ displayable, filter::FilterExec, filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, }, metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, @@ -227,19 +227,13 @@ impl FileSource for TestSource { predicate: Some(conjunction(filters.clone())), ..self.clone() }); - Ok(FilterPushdownPropagation { - filters: filters - .into_iter() - .map(PredicateSupport::Supported) - .collect(), - updated_node: Some(new_node), - }) + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::Yes; filters.len()], + ) + .with_updated_node(new_node)) } else { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], )) } } @@ -547,26 +541,29 @@ impl ExecutionPlan for TestNode { assert_eq!(self_pushdown_result.len(), 1); let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect(); - match &self_pushdown_result[0] { - PredicateSupport::Unsupported(filter) => { + let first_pushdown_result = self_pushdown_result[0].clone(); + + match &first_pushdown_result.discriminant { + PushedDown::No => { // We have a filter to push down - let new_child = - FilterExec::try_new(Arc::clone(filter), Arc::clone(&self.input))?; + let new_child = FilterExec::try_new( + Arc::clone(&first_pushdown_result.predicate), + Arc::clone(&self.input), + )?; let new_self = TestNode::new(false, Arc::new(new_child), self.predicate.clone()); let mut res = - FilterPushdownPropagation::transparent(child_pushdown_result); + FilterPushdownPropagation::if_all(child_pushdown_result); res.updated_node = Some(Arc::new(new_self) as Arc); Ok(res) } - PredicateSupport::Supported(_) => { - let res = - FilterPushdownPropagation::transparent(child_pushdown_result); + PushedDown::Yes => { + let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } } } else { - let res = FilterPushdownPropagation::transparent(child_pushdown_result); + let res = FilterPushdownPropagation::if_all(child_pushdown_result); Ok(res) } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8ca36e7cd321..0c73661a6de6 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::filter_pushdown::PushedDown; use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PushedDownPredicate, }; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -622,11 +623,8 @@ impl FileSource for ParquetSource { config: &ConfigOptions, ) -> datafusion_common::Result>> { let Some(file_schema) = self.file_schema.clone() else { - return Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], )); }; // Determine if based on configs we should push filters down. @@ -641,29 +639,31 @@ impl FileSource for ParquetSource { let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled; let mut source = self.clone(); - let filters: Vec = filters + let filters: Vec = filters .into_iter() .map(|filter| { if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) { - PredicateSupport::Supported(filter) + PushedDownPredicate::supported(filter) } else { - PredicateSupport::Unsupported(filter) + PushedDownPredicate::unsupported(filter) } }) .collect(); if filters .iter() - .all(|f| matches!(f, PredicateSupport::Unsupported(_))) + .all(|f| matches!(f.discriminant, PushedDown::No)) { // No filters can be pushed down, so we can just return the remaining filters // and avoid replacing the source in the physical plan. - return Ok(FilterPushdownPropagation::with_filters(filters)); + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], + )); } let allowed_filters = filters .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, + .filter_map(|f| match f.discriminant { + PushedDown::Yes => Some(Arc::clone(&f.predicate)), + PushedDown::No => None, }) .collect_vec(); let predicate = match source.predicate { @@ -678,15 +678,15 @@ impl FileSource for ParquetSource { // If pushdown_filters is false we tell our parents that they still have to handle the filters, // even if we updated the predicate to include the filters (they will only be used for stats pruning). if !pushdown_filters { - return Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(|f| PredicateSupport::Unsupported(f.into_inner())) - .collect_vec(), + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], ) .with_updated_node(source)); } - Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source)) + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + filters.iter().map(|f| f.discriminant).collect(), + ) + .with_updated_node(source)) } fn with_schema_adapter_factory( diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index a95c07cc319c..29fa38a8ee36 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -30,9 +30,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{not_impl_err, Result, Statistics}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; -use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PredicateSupport, -}; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -122,11 +120,8 @@ pub trait FileSource: Send + Sync { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], )) } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 92c25556382c..a002d4992cb2 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -42,7 +42,7 @@ use datafusion_physical_expr::{ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; /// A source of data, typically a list of files or memory @@ -172,11 +172,8 @@ pub trait DataSource: Send + Sync + Debug { filters: Vec>, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::with_filters( - filters - .into_iter() - .map(PredicateSupport::Unsupported) - .collect(), + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], )) } } @@ -324,17 +321,14 @@ impl ExecutionPlan for DataSourceExec { config: &ConfigOptions, ) -> Result>> { // Push any remaining filters into our data source - let res = self.data_source.try_pushdown_filters( - child_pushdown_result - .parent_filters - .into_iter() - .map(|f| match f { - PredicateSupport::Supported(expr) => expr, - PredicateSupport::Unsupported(expr) => expr, - }) - .collect(), - config, - )?; + let parent_filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|f| f.filter) + .collect_vec(); + let res = self + .data_source + .try_pushdown_filters(parent_filters.clone(), config)?; match res.updated_node { Some(data_source) => { let mut new_node = self.clone(); @@ -346,9 +340,10 @@ impl ExecutionPlan for DataSourceExec { let filter = conjunction( res.filters .iter() - .filter_map(|f| match f { - PredicateSupport::Supported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Unsupported(_) => None, + .zip(parent_filters) + .filter_map(|(s, f)| match s { + PushedDown::Yes => Some(f), + PushedDown::No => None, }) .collect_vec(), ); diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index eb7ec81b260e..66ccc1a79853 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -38,7 +38,8 @@ use crate::PhysicalOptimizerRule; use datafusion_common::{config::ConfigOptions, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport, + ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase, + FilterPushdownPropagation, PushedDown, }; use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -434,24 +435,14 @@ impl PhysicalOptimizerRule for FilterPushdown { } } -/// Support state of each predicate for the children of the node. -/// These predicates are coming from the parent node. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ParentPredicateStates { - NoChildren, - Unsupported, - Supported, -} - fn push_down_filters( node: Arc, parent_predicates: Vec>, config: &ConfigOptions, phase: FilterPushdownPhase, ) -> Result>> { - // If the node has any child, these will be rewritten as supported or unsupported - let mut parent_predicates_pushdown_states = - vec![ParentPredicateStates::NoChildren; parent_predicates.len()]; + let mut parent_filter_pushdown_supports: Vec> = + vec![vec![]; parent_predicates.len()]; let mut self_filters_pushdown_supports = vec![]; let mut new_children = Vec::with_capacity(node.children().len()); @@ -459,45 +450,66 @@ fn push_down_filters( let filter_description = node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?; - for (child, parent_filters, self_filters) in izip!( + let filter_description_parent_filters = filter_description.parent_filters(); + let filter_description_self_filters = filter_description.self_filters(); + if filter_description_parent_filters.len() != children.len() { + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown expected FilterDescription to have parent filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", + expected_num_children = children.len(), + actual_num_children = filter_description_parent_filters.len(), + node_name = node.name(), + ), + )); + } + if filter_description_self_filters.len() != children.len() { + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown expected FilterDescription to have self filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", + expected_num_children = children.len(), + actual_num_children = filter_description_self_filters.len(), + node_name = node.name(), + ), + )); + } + + for (child_idx, (child, parent_filters, self_filters)) in izip!( children, filter_description.parent_filters(), filter_description.self_filters() - ) { + ) + .enumerate() + { // Here, `parent_filters` are the predicates which are provided by the parent node of // the current node, and tried to be pushed down over the child which the loop points // currently. `self_filters` are the predicates which are provided by the current node, // and tried to be pushed down over the child similarly. let num_self_filters = self_filters.len(); - let mut parent_supported_predicate_indices = vec![]; - let mut all_predicates = self_filters; + let mut all_predicates = self_filters.clone(); + + // Track which parent filters are supported for this child + let mut parent_filter_indices = vec![]; // Iterate over each predicate coming from the parent - for (idx, filter) in parent_filters.into_iter().enumerate() { + for (parent_filter_idx, filter) in parent_filters.into_iter().enumerate() { // Check if we can push this filter down to our child. // These supports are defined in `gather_filters_for_pushdown()` - match filter { - PredicateSupport::Supported(predicate) => { + match filter.discriminant { + PushedDown::Yes => { // Queue this filter up for pushdown to this child - all_predicates.push(predicate); - parent_supported_predicate_indices.push(idx); - // Mark this filter as supported by our children if no child has marked it as unsupported - if parent_predicates_pushdown_states[idx] - != ParentPredicateStates::Unsupported - { - parent_predicates_pushdown_states[idx] = - ParentPredicateStates::Supported; - } + all_predicates.push(filter.predicate); + parent_filter_indices.push(parent_filter_idx); } - PredicateSupport::Unsupported(_) => { - // Mark as unsupported by our children - parent_predicates_pushdown_states[idx] = - ParentPredicateStates::Unsupported; + PushedDown::No => { + // This filter won't be pushed down to this child + // Will be marked as unsupported later in the initialization loop } } } + let num_parent_filters = all_predicates.len() - num_self_filters; + // Any filters that could not be pushed down to a child are marked as not-supported to our parents let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?; @@ -513,57 +525,67 @@ fn push_down_filters( // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); - let parent_predicates = all_filters.split_off(num_self_filters); - let self_predicates = all_filters; - self_filters_pushdown_supports.push(self_predicates); + if all_filters.len() != num_self_filters + num_parent_filters { + return Err(datafusion_common::DataFusionError::Internal( + format!( + "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}", + num_self_filters = num_self_filters, + num_parent_filters = num_parent_filters, + num_filters_from_child = all_filters.len(), + child = child.name(), + ), + )); + } + let parent_filters = all_filters + .split_off(num_self_filters) + .into_iter() + .collect_vec(); + self_filters_pushdown_supports.push( + all_filters + .into_iter() + .zip(self_filters) + .map(|(s, f)| s.wrap_expression(f)) + .collect(), + ); - for (idx, result) in parent_supported_predicate_indices - .iter() - .zip(parent_predicates) + // Start by marking all parent filters as unsupported for this child + for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { + parent_filter_pushdown_support.push(PushedDown::No); + assert_eq!( + parent_filter_pushdown_support.len(), + child_idx + 1, + "Parent filter pushdown supports should have the same length as the number of children" + ); + } + // Map results from pushed-down filters back to original parent filter indices + for (result_idx, parent_filter_support) in parent_filters.into_iter().enumerate() { - let current_node_state = match result { - PredicateSupport::Supported(_) => ParentPredicateStates::Supported, - PredicateSupport::Unsupported(_) => ParentPredicateStates::Unsupported, - }; - match (current_node_state, parent_predicates_pushdown_states[*idx]) { - (r, ParentPredicateStates::NoChildren) => { - // If we have no result, use the current state from this child - parent_predicates_pushdown_states[*idx] = r; - } - (ParentPredicateStates::Supported, ParentPredicateStates::Supported) => { - // If the current child and all previous children are supported, - // the filter continues to support it - parent_predicates_pushdown_states[*idx] = - ParentPredicateStates::Supported; - } - _ => { - // Either the current child or a previous child marked this filter as unsupported - parent_predicates_pushdown_states[*idx] = - ParentPredicateStates::Unsupported; - } - } + let original_parent_idx = parent_filter_indices[result_idx]; + parent_filter_pushdown_supports[original_parent_idx][child_idx] = + parent_filter_support; } } + // Re-create this node with new children let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; - // Remap the result onto the parent filters as they were given to us. - // Any filters that were not pushed down to any children are marked as unsupported. - let parent_pushdown_result = parent_predicates_pushdown_states - .into_iter() - .zip(parent_predicates) - .map(|(state, filter)| match state { - ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter), - ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter), - ParentPredicateStates::Supported => PredicateSupport::Supported(filter), - }) - .collect(); + // TODO: by calling `handle_child_pushdown_result` we are assuming that the // `ExecutionPlan` implementation will not change the plan itself. // Should we have a separate method for dynamic pushdown that does not allow modifying the plan? let mut res = updated_node.handle_child_pushdown_result( phase, ChildPushdownResult { - parent_filters: parent_pushdown_result, + parent_filters: parent_predicates + .into_iter() + .enumerate() + .map( + |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult { + filter: parent_filter, + child_results: parent_filter_pushdown_supports[parent_filter_idx] + .clone(), + }, + ) + .collect(), self_filters: self_filters_pushdown_supports, }, config, diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index d008db48b82b..d98530d28e91 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -243,9 +243,7 @@ impl ExecutionPlan for CoalesceBatchesExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 375de7c7385e..6d51bf195dc6 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -18,7 +18,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PushedDownPredicate, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -525,7 +525,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { let mut desc = FilterDescription::new(); let child_filters = parent_filters .iter() - .map(|f| PredicateSupport::Unsupported(Arc::clone(f))) + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) .collect_vec(); for _ in 0..self.children().len() { desc = desc.with_child(ChildFilterDescription { @@ -563,7 +563,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// they have been handled. /// - A `HashJoinExec` might ignore the pushdown result if filters need to /// be applied during the join operation. It passes the parent filters back - /// up wrapped in [`FilterPushdownPropagation::transparent`], discarding + /// up wrapped in [`FilterPushdownPropagation::if_any`], discarding /// any self-filters from children. /// /// **Example Walkthrough:** @@ -597,10 +597,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// **Helper Methods for Customization:** /// There are various helper methods to simplify implementing this method: - /// - [`FilterPushdownPropagation::transparent`]: Indicates that the node - /// supports filter pushdown but does not modify it, simply transmitting - /// the children's pushdown results back up to its parent. - /// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters + /// - [`FilterPushdownPropagation::if_any`]: Marks all parent filters as + /// supported as long as at least one child supports them. + /// - [`FilterPushdownPropagation::if_all`]: Marks all parent filters as + /// supported as long as all children support them. + /// - [`FilterPushdownPropagation::with_parent_pushdown_result`]: Allows adding filters /// to the propagation result, indicating which filters are supported by /// the current node. /// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the @@ -613,16 +614,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// operator may or may not be allowed to modify the plan. See /// [`FilterPushdownPhase`] for more details on phase-specific behavior. /// - /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported + /// [`PushedDownPredicate::supported`]: crate::filter_pushdown::PushedDownPredicate::supported fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } /// Injects arbitrary run-time state into this execution plan, returning a new plan diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 66886c576620..8157e1b721a6 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -30,7 +30,7 @@ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PredicateSupport, + FilterPushdownPropagation, PushedDown, PushedDownPredicate, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -455,7 +455,7 @@ impl ExecutionPlan for FilterExec { // For non-pre phase, filters pass through unchanged let filter_supports = parent_filters .into_iter() - .map(PredicateSupport::Supported) + .map(PushedDownPredicate::supported) .collect(); return Ok(FilterDescription::new().with_child(ChildFilterDescription { parent_filters: filter_supports, @@ -481,32 +481,28 @@ impl ExecutionPlan for FilterExec { _config: &ConfigOptions, ) -> Result>> { if !matches!(phase, FilterPushdownPhase::Pre) { - return Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )); + return Ok(FilterPushdownPropagation::if_all(child_pushdown_result)); } // We absorb any parent filters that were not handled by our children - let mut unhandled_filters = child_pushdown_result - .parent_filters - .iter() - .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Supported(_) => None, - }) - .collect_vec(); - assert_eq!( - child_pushdown_result.self_filters.len(), - 1, - "FilterExec should only have one child" - ); - let unsupported_self_filters = child_pushdown_result.self_filters[0] + let unsupported_parent_filters = + child_pushdown_result.parent_filters.iter().filter_map(|f| { + matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter)) + }); + let unsupported_self_filters = child_pushdown_result + .self_filters + .first() + .expect("we have exactly one child") .iter() - .filter_map(|f| match f { - PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), - PredicateSupport::Supported(_) => None, + .filter_map(|f| match f.discriminant { + PushedDown::Yes => None, + PushedDown::No => Some(&f.predicate), }) + .cloned(); + + let unhandled_filters = unsupported_parent_filters + .into_iter() + .chain(unsupported_self_filters) .collect_vec(); - unhandled_filters.extend(unsupported_self_filters); // If we have unhandled filters, we need to create a new FilterExec let filter_input = Arc::clone(self.input()); @@ -555,15 +551,9 @@ impl ExecutionPlan for FilterExec { }; Some(Arc::new(new) as _) }; - // Mark all parent filters as supported since we absorbed them - let supported_filters = child_pushdown_result - .parent_filters - .into_iter() - .map(|f| PredicateSupport::Supported(f.into_inner())) - .collect(); Ok(FilterPushdownPropagation { - filters: supported_filters, + filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()], updated_node, }) } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index b50f86382edf..a3e94a75c8e7 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -86,25 +86,111 @@ impl std::fmt::Display for FilterPushdownPhase { /// The result of a plan for pushing down a filter into a child node. /// This contains references to filters so that nodes can mutate a filter /// before pushing it down to a child node (e.g. to adjust a projection) -/// or can directly take ownership of `Unsupported` filters that their children +/// or can directly take ownership of filters that their children /// could not handle. #[derive(Debug, Clone)] -pub enum PredicateSupport { - Supported(Arc), - Unsupported(Arc), +pub struct PushedDownPredicate { + pub discriminant: PushedDown, + pub predicate: Arc, } -impl PredicateSupport { +impl PushedDownPredicate { + /// Return the wrapped [`PhysicalExpr`], discarding whether it is supported or unsupported. pub fn into_inner(self) -> Arc { - match self { - PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => { - expr - } + self.predicate + } + + /// Create a new [`PushedDownPredicate`] with supported pushdown. + pub fn supported(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::Yes, + predicate, + } + } + + /// Create a new [`PushedDownPredicate`] with unsupported pushdown. + pub fn unsupported(predicate: Arc) -> Self { + Self { + discriminant: PushedDown::No, + predicate, + } + } +} + +/// Discriminant for the result of pushing down a filter into a child node. +#[derive(Debug, Clone, Copy)] +pub enum PushedDown { + /// The predicate was successfully pushed down into the child node. + Yes, + /// The predicate could not be pushed down into the child node. + No, +} + +impl PushedDown { + /// Logical AND operation: returns `Yes` only if both operands are `Yes`. + pub fn and(self, other: PushedDown) -> PushedDown { + match (self, other) { + (PushedDown::Yes, PushedDown::Yes) => PushedDown::Yes, + _ => PushedDown::No, + } + } + + /// Logical OR operation: returns `Yes` if either operand is `Yes`. + pub fn or(self, other: PushedDown) -> PushedDown { + match (self, other) { + (PushedDown::Yes, _) | (_, PushedDown::Yes) => PushedDown::Yes, + (PushedDown::No, PushedDown::No) => PushedDown::No, + } + } + + /// Wrap a [`PhysicalExpr`] with this pushdown result. + pub fn wrap_expression(self, expr: Arc) -> PushedDownPredicate { + PushedDownPredicate { + discriminant: self, + predicate: expr, + } + } +} + +/// The result of pushing down a single parent filter into all children. +#[derive(Debug, Clone)] +pub struct ChildFilterPushdownResult { + pub filter: Arc, + pub child_results: Vec, +} + +impl ChildFilterPushdownResult { + /// Combine all child results using OR logic. + /// Returns `Yes` if **any** child supports the filter. + /// Returns `No` if **all** children reject the filter or if there are no children. + pub fn any(&self) -> PushedDown { + if self.child_results.is_empty() { + // If there are no children, filters cannot be supported + PushedDown::No + } else { + self.child_results + .iter() + .fold(PushedDown::No, |acc, result| acc.or(*result)) + } + } + + /// Combine all child results using AND logic. + /// Returns `Yes` if **all** children support the filter. + /// Returns `No` if **any** child rejects the filter or if there are no children. + pub fn all(&self) -> PushedDown { + if self.child_results.is_empty() { + // If there are no children, filters cannot be supported + PushedDown::No + } else { + self.child_results + .iter() + .fold(PushedDown::Yes, |acc, result| acc.and(*result)) } } } /// The result of pushing down filters into a child node. +/// /// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. /// Nodes process this result and convert it into a [`FilterPushdownPropagation`] /// that is returned to their parent. @@ -112,51 +198,68 @@ impl PredicateSupport { /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct ChildPushdownResult { - /// The combined result of pushing down each parent filter into each child. - /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: - /// - // | filter | child 1 | child 2 | child 3 | result | - // |--------|-------------|-----------|-----------|-------------| - // | a | Supported | Supported | Supported | Supported | - // | b | Unsupported | Supported | Supported | Unsupported | - /// - /// That is: if any child marks a filter as unsupported or if the filter was not pushed - /// down into any child then the result is unsupported. - /// If at least one children and all children that received the filter mark it as supported - /// then the result is supported. - pub parent_filters: Vec, + /// The parent filters that were pushed down as received by the current node when [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) was called. + /// Note that this may *not* be the same as the filters that were passed to the children as the current node may have modified them + /// (e.g. by reassigning column indices) when it returned them from [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result) in a [`FilterDescription`]. + /// Attached to each filter is a [`PushedDown`] *per child* that indicates whether the filter was supported or unsupported by each child. + /// To get combined results see [`ChildFilterPushdownResult::any`] and [`ChildFilterPushdownResult::all`]. + pub parent_filters: Vec, /// The result of pushing down each filter this node provided into each of it's children. - /// This is not combined with the parent filters so that nodes can treat each child independently. - pub self_filters: Vec>, + /// The outer vector corresponds to each child, and the inner vector corresponds to each filter. + /// Since this node may have generated a different filter for each child the inner vector may have different lengths or the expressions may not match at all. + /// It is up to each node to interpret this result based on the filters it provided for each child in [`ExecutionPlan::gather_filters_for_pushdown`](crate::ExecutionPlan::handle_child_pushdown_result). + pub self_filters: Vec>, } -/// The result of pushing down filters into a node that it returns to its parent. -/// This is what nodes return from [`ExecutionPlan::handle_child_pushdown_result`] to communicate +/// The result of pushing down filters into a node. +/// +/// Returned from [`ExecutionPlan::handle_child_pushdown_result`] to communicate /// to the optimizer: /// -/// 1. What to do with any parent filters that were not completely handled by the children. +/// 1. What to do with any parent filters that were could not be pushed down into the children. /// 2. If the node needs to be replaced in the execution plan with a new node or not. /// /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result #[derive(Debug, Clone)] pub struct FilterPushdownPropagation { - pub filters: Vec, + /// What filters were pushed into the parent node. + pub filters: Vec, + /// The updated node, if it was updated during pushdown pub updated_node: Option, } impl FilterPushdownPropagation { - /// Create a new [`FilterPushdownPropagation`] that tells the parent node - /// that echoes back up to the parent the result of pushing down the filters - /// into the children. - pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { + /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter + /// is supported if it was supported by *all* children. + pub fn if_all(child_pushdown_result: ChildPushdownResult) -> Self { + let filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|result| result.all()) + .collect(); + Self { + filters, + updated_node: None, + } + } + + /// Create a new [`FilterPushdownPropagation`] that tells the parent node that each parent filter + /// is supported if it was supported by *any* child. + pub fn if_any(child_pushdown_result: ChildPushdownResult) -> Self { + let filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|result| result.any()) + .collect(); Self { - filters: child_pushdown_result.parent_filters, + filters, updated_node: None, } } /// Create a new [`FilterPushdownPropagation`] with the specified filter support. - pub fn with_filters(filters: Vec) -> Self { + /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was. + pub fn with_parent_pushdown_result(filters: Vec) -> Self { Self { filters, updated_node: None, @@ -164,19 +267,25 @@ impl FilterPushdownPropagation { } /// Bind an updated node to the [`FilterPushdownPropagation`]. + /// Use this when the current node wants to update iself in the tree or replace itself with a new node (e.g. one of it's children). + /// You do not need to call this if one of the children of the current node may have updated itself, that is handled by the optimizer. pub fn with_updated_node(mut self, updated_node: T) -> Self { self.updated_node = Some(updated_node); self } } +/// Describes filter pushdown for a single child node. +/// +/// This structure contains two types of filters: +/// - **Parent filters**: Filters received from the parent node, marked as supported or unsupported +/// - **Self filters**: Filters generated by the current node to be pushed down to this child #[derive(Debug, Clone)] pub struct ChildFilterDescription { /// Description of which parent filters can be pushed down into this node. /// Since we need to transmit filter pushdown results back to this node's parent /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. - /// We do this using a [`PredicateSupport`] which simplifies manipulating supported/unsupported filters. - pub(crate) parent_filters: Vec, + pub(crate) parent_filters: Vec, /// Description of which filters this node is pushing down to its children. /// Since this is not transmitted back to the parents we can have variable sized inner arrays /// instead of having to track supported/unsupported. @@ -186,6 +295,10 @@ pub struct ChildFilterDescription { impl ChildFilterDescription { /// Build a child filter description by analyzing which parent filters can be pushed to a specific child. /// + /// This method performs column analysis to determine which filters can be pushed down: + /// - If all columns referenced by a filter exist in the child's schema, it can be pushed down + /// - Otherwise, it cannot be pushed down to that child + /// /// See [`FilterDescription::from_children`] for more details pub fn from_child( parent_filters: &[Arc], @@ -217,11 +330,12 @@ impl ChildFilterDescription { // Need to reassign column indices to match child schema let reassigned_filter = reassign_predicate_columns(Arc::clone(filter), &child_schema, false)?; - child_parent_filters.push(PredicateSupport::Supported(reassigned_filter)); + child_parent_filters + .push(PushedDownPredicate::supported(reassigned_filter)); } else { // Some columns don't exist in child - cannot push down child_parent_filters - .push(PredicateSupport::Unsupported(Arc::clone(filter))); + .push(PushedDownPredicate::unsupported(Arc::clone(filter))); } } @@ -244,6 +358,14 @@ impl ChildFilterDescription { } } +/// Describes how filters should be pushed down to children. +/// +/// This structure contains filter descriptions for each child node, specifying: +/// - Which parent filters can be pushed down to each child +/// - Which self-generated filters should be pushed down to each child +/// +/// The filter routing is determined by column analysis - filters can only be pushed +/// to children whose schemas contain all the referenced columns. #[derive(Debug, Clone)] pub struct FilterDescription { /// A filter description for each child. @@ -291,7 +413,7 @@ impl FilterDescription { Ok(desc) } - pub fn parent_filters(&self) -> Vec> { + pub fn parent_filters(&self) -> Vec> { self.child_filter_descriptions .iter() .map(|d| &d.parent_filters) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 37dc3a767590..754a208126ee 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -821,9 +821,7 @@ impl ExecutionPlan for RepartitionExec { child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result>> { - Ok(FilterPushdownPropagation::transparent( - child_pushdown_result, - )) + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) } } diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 68816626bf67..375f06d34b44 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -93,7 +93,7 @@ impl Postgres { let spawned_task = SpawnedTask::spawn(async move { if let Err(e) = connection.await { - log::error!("Postgres connection error: {:?}", e); + log::error!("Postgres connection error: {e:?}"); } });