Skip to content
27 changes: 19 additions & 8 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use datafusion_physical_plan::{
displayable,
filter::FilterExec,
filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
PredicateSupport, PredicateSupports,
ChildFilterDescription, ChildPushdownResult, FilterDescription,
FilterPushdownPropagation, PredicateSupport,
},
metrics::ExecutionPlanMetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
Expand Down Expand Up @@ -228,11 +228,19 @@ impl FileSource for TestSource {
..self.clone()
});
Ok(FilterPushdownPropagation {
filters: PredicateSupports::all_supported(filters),
filters: filters
.into_iter()
.map(PredicateSupport::Supported)
.collect(),
updated_node: Some(new_node),
})
} else {
Ok(FilterPushdownPropagation::unsupported(filters))
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
))
}
}

Expand Down Expand Up @@ -515,9 +523,12 @@ impl ExecutionPlan for TestNode {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
Ok(FilterDescription::new_with_child_count(1)
.all_parent_filters_supported(parent_filters)
.with_self_filter(Arc::clone(&self.predicate)))
// Since TestNode marks all parent filters as supported and adds its own filter,
// we use from_child to create a description with all parent filters supported
let child = &self.input;
let child_desc = ChildFilterDescription::from_child(&parent_filters, child)?
.with_self_filter(Arc::clone(&self.predicate));
Ok(FilterDescription::new().with_child(child_desc))
}

fn handle_child_pushdown_result(
Expand All @@ -534,7 +545,7 @@ impl ExecutionPlan for TestNode {
let self_pushdown_result = child_pushdown_result.self_filters[0].clone();
// And pushed down 1 filter
assert_eq!(self_pushdown_result.len(), 1);
let self_pushdown_result = self_pushdown_result.into_inner();
let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect();

match &self_pushdown_result[0] {
PredicateSupport::Unsupported(filter) => {
Expand Down
51 changes: 38 additions & 13 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::PredicateSupports;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PredicateSupport,
};
use datafusion_physical_plan::metrics::Count;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
Expand Down Expand Up @@ -621,7 +622,12 @@ impl FileSource for ParquetSource {
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let Some(file_schema) = self.file_schema.clone() else {
return Ok(FilterPushdownPropagation::unsupported(filters));
return Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
));
};
// Determine if based on configs we should push filters down.
// If either the table / scan itself or the config has pushdown enabled,
Expand All @@ -635,20 +641,36 @@ impl FileSource for ParquetSource {
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;

let mut source = self.clone();
let filters = PredicateSupports::new_with_supported_check(filters, |filter| {
can_expr_be_pushed_down_with_schemas(filter, &file_schema)
});
if filters.is_all_unsupported() {
let filters: Vec<PredicateSupport> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
PredicateSupport::Supported(filter)
} else {
PredicateSupport::Unsupported(filter)
}
})
.collect();
if filters
.iter()
.all(|f| matches!(f, PredicateSupport::Unsupported(_)))
{
// No filters can be pushed down, so we can just return the remaining filters
// and avoid replacing the source in the physical plan.
return Ok(FilterPushdownPropagation::with_filters(filters));
}
let allowed_filters = filters.collect_supported();
let allowed_filters = filters
.iter()
.filter_map(|f| match f {
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
PredicateSupport::Unsupported(_) => None,
})
.collect_vec();
let predicate = match source.predicate {
Some(predicate) => conjunction(
std::iter::once(predicate).chain(allowed_filters.iter().cloned()),
),
None => conjunction(allowed_filters.iter().cloned()),
Some(predicate) => {
conjunction(std::iter::once(predicate).chain(allowed_filters))
}
None => conjunction(allowed_filters),
};
source.predicate = Some(predicate);
source = source.with_pushdown_filters(pushdown_filters);
Expand All @@ -657,7 +679,10 @@ impl FileSource for ParquetSource {
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
if !pushdown_filters {
return Ok(FilterPushdownPropagation::with_filters(
filters.make_unsupported(),
filters
.into_iter()
.map(|f| PredicateSupport::Unsupported(f.into_inner()))
.collect_vec(),
)
.with_updated_node(source));
}
Expand Down
11 changes: 9 additions & 2 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PredicateSupport,
};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -120,7 +122,12 @@ pub trait FileSource: Send + Sync {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
))
}

/// Set optional schema adapter factory.
Expand Down
18 changes: 15 additions & 3 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
};

/// A source of data, typically a list of files or memory
Expand Down Expand Up @@ -168,7 +168,12 @@ pub trait DataSource: Send + Sync + Debug {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
))
}
}

Expand Down Expand Up @@ -316,7 +321,14 @@ impl ExecutionPlan for DataSourceExec {
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// Push any remaining filters into our data source
let res = self.data_source.try_pushdown_filters(
child_pushdown_result.parent_filters.collect_all(),
child_pushdown_result
.parent_filters
.into_iter()
.map(|f| match f {
PredicateSupport::Supported(expr) => expr,
PredicateSupport::Unsupported(expr) => expr,
})
.collect(),
config,
)?;
match res.updated_node {
Expand Down
49 changes: 29 additions & 20 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,34 @@
// specific language governing permissions and limitations
// under the License.

//! Filter Pushdown Optimization Process
//!
//! The filter pushdown mechanism involves four key steps:
//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
//! containing information about which filters were successfully pushed down vs. unsupported.
//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
//!
//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription

use std::sync::Arc;

use crate::PhysicalOptimizerRule;

use datafusion_common::{config::ConfigOptions, Result};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
PredicateSupport, PredicateSupports,
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
};
use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use itertools::izip;
use itertools::{izip, Itertools};

/// Attempts to recursively push given filters from the top of the tree into leafs.
///
Expand Down Expand Up @@ -497,10 +512,10 @@ fn push_down_filters(
// Our child doesn't know the difference between filters that were passed down
// from our parents and filters that the current node injected. We need to de-entangle
// this since we do need to distinguish between them.
let mut all_filters = result.filters.into_inner();
let mut all_filters = result.filters.into_iter().collect_vec();
let parent_predicates = all_filters.split_off(num_self_filters);
let self_predicates = all_filters;
self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates));
self_filters_pushdown_supports.push(self_predicates);

for (idx, result) in parent_supported_predicate_indices
.iter()
Expand Down Expand Up @@ -533,21 +548,15 @@ fn push_down_filters(
let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
// Remap the result onto the parent filters as they were given to us.
// Any filters that were not pushed down to any children are marked as unsupported.
let parent_pushdown_result = PredicateSupports::new(
parent_predicates_pushdown_states
.into_iter()
.zip(parent_predicates)
.map(|(state, filter)| match state {
ParentPredicateStates::NoChildren => {
PredicateSupport::Unsupported(filter)
}
ParentPredicateStates::Unsupported => {
PredicateSupport::Unsupported(filter)
}
ParentPredicateStates::Supported => PredicateSupport::Supported(filter),
})
.collect(),
);
let parent_pushdown_result = parent_predicates_pushdown_states
.into_iter()
.zip(parent_predicates)
.map(|(state, filter)| match state {
ParentPredicateStates::NoChildren => PredicateSupport::Unsupported(filter),
ParentPredicateStates::Unsupported => PredicateSupport::Unsupported(filter),
ParentPredicateStates::Supported => PredicateSupport::Supported(filter),
})
.collect();
// TODO: by calling `handle_child_pushdown_result` we are assuming that the
// `ExecutionPlan` implementation will not change the plan itself.
// Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
Ok(FilterDescription::new_with_child_count(1)
.all_parent_filters_supported(parent_filters))
FilterDescription::from_children(parent_filters, &self.children())
}

fn handle_child_pushdown_result(
Expand Down
36 changes: 22 additions & 14 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, PredicateSupport,
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
Expand All @@ -33,6 +33,7 @@ pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
expressions, Distribution, Partitioning, PhysicalExpr,
};
use itertools::Itertools;

use std::any::Any;
use std::fmt::Debug;
Expand Down Expand Up @@ -520,10 +521,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
Ok(
FilterDescription::new_with_child_count(self.children().len())
.all_parent_filters_unsupported(parent_filters),
)
// Default implementation: mark all filters as unsupported for all children
let mut desc = FilterDescription::new();
let child_filters = parent_filters
.iter()
.map(|f| PredicateSupport::Unsupported(Arc::clone(f)))
.collect_vec();
for _ in 0..self.children().len() {
desc = desc.with_child(ChildFilterDescription {
parent_filters: child_filters.clone(),
self_filters: vec![],
});
}
Ok(desc)
}

/// Handle the result of a child pushdown.
Expand Down Expand Up @@ -587,16 +597,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// **Helper Methods for Customization:**
/// There are various helper methods to simplify implementing this method:
/// - [`FilterPushdownPropagation::unsupported`]: Indicates that the node
/// does not support filter pushdown at all, rejecting all filters.
/// - [`FilterPushdownPropagation::transparent`]: Indicates that the node
/// supports filter pushdown but does not modify it, simply transmitting
/// the children's pushdown results back up to its parent.
/// - [`PredicateSupports::new_with_supported_check`]: Takes a callback to
/// dynamically determine support for each filter, useful with
/// [`FilterPushdownPropagation::with_filters`] and
/// [`FilterPushdownPropagation::with_updated_node`] to build mixed results
/// of supported and unsupported filters.
/// - [`FilterPushdownPropagation::with_filters`]: Allows adding filters
/// to the propagation result, indicating which filters are supported by
/// the current node.
/// - [`FilterPushdownPropagation::with_updated_node`]: Allows updating the
/// current node in the propagation result, used if the node
/// has modified its plan based on the pushdown results.
///
/// **Filter Pushdown Phases:**
/// There are two different phases in filter pushdown (`Pre` and others),
Expand All @@ -605,7 +614,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// [`FilterPushdownPhase`] for more details on phase-specific behavior.
///
/// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported
/// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
Expand Down
Loading