Skip to content
43 changes: 20 additions & 23 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use datafusion_datasource::{
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown};
use datafusion_physical_plan::{
displayable,
filter::FilterExec,
filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription,
FilterPushdownPropagation, PredicateSupport,
FilterPushdownPropagation,
},
metrics::ExecutionPlanMetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
Expand Down Expand Up @@ -227,19 +227,13 @@ impl FileSource for TestSource {
predicate: Some(conjunction(filters.clone())),
..self.clone()
});
Ok(FilterPushdownPropagation {
filters: filters
.into_iter()
.map(PredicateSupport::Supported)
.collect(),
updated_node: Some(new_node),
})
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::Yes; filters.len()],
)
.with_updated_node(new_node))
} else {
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
}
Expand Down Expand Up @@ -547,26 +541,29 @@ impl ExecutionPlan for TestNode {
assert_eq!(self_pushdown_result.len(), 1);
let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect();

match &self_pushdown_result[0] {
PredicateSupport::Unsupported(filter) => {
let first_pushdown_result = self_pushdown_result[0].clone();

match &first_pushdown_result.discriminant {
PushedDown::No => {
// We have a filter to push down
let new_child =
FilterExec::try_new(Arc::clone(filter), Arc::clone(&self.input))?;
let new_child = FilterExec::try_new(
Arc::clone(&first_pushdown_result.predicate),
Arc::clone(&self.input),
)?;
let new_self =
TestNode::new(false, Arc::new(new_child), self.predicate.clone());
let mut res =
FilterPushdownPropagation::transparent(child_pushdown_result);
FilterPushdownPropagation::if_all(child_pushdown_result);
res.updated_node = Some(Arc::new(new_self) as Arc<dyn ExecutionPlan>);
Ok(res)
}
PredicateSupport::Supported(_) => {
let res =
FilterPushdownPropagation::transparent(child_pushdown_result);
PushedDown::Yes => {
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
Ok(res)
}
}
} else {
let res = FilterPushdownPropagation::transparent(child_pushdown_result);
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
Ok(res)
}
}
Expand Down
40 changes: 20 additions & 20 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PredicateSupport,
FilterPushdownPropagation, PushedDownPredicate,
};
use datafusion_physical_plan::metrics::Count;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -622,11 +623,8 @@ impl FileSource for ParquetSource {
config: &ConfigOptions,
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
let Some(file_schema) = self.file_schema.clone() else {
return Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
));
};
// Determine if based on configs we should push filters down.
Expand All @@ -641,29 +639,31 @@ impl FileSource for ParquetSource {
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;

let mut source = self.clone();
let filters: Vec<PredicateSupport> = filters
let filters: Vec<PushedDownPredicate> = filters
.into_iter()
.map(|filter| {
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
PredicateSupport::Supported(filter)
PushedDownPredicate::supported(filter)
} else {
PredicateSupport::Unsupported(filter)
PushedDownPredicate::unsupported(filter)
}
})
.collect();
if filters
.iter()
.all(|f| matches!(f, PredicateSupport::Unsupported(_)))
.all(|f| matches!(f.discriminant, PushedDown::No))
{
// No filters can be pushed down, so we can just return the remaining filters
// and avoid replacing the source in the physical plan.
return Ok(FilterPushdownPropagation::with_filters(filters));
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
));
}
let allowed_filters = filters
.iter()
.filter_map(|f| match f {
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
PredicateSupport::Unsupported(_) => None,
.filter_map(|f| match f.discriminant {
PushedDown::Yes => Some(Arc::clone(&f.predicate)),
PushedDown::No => None,
})
.collect_vec();
let predicate = match source.predicate {
Expand All @@ -678,15 +678,15 @@ impl FileSource for ParquetSource {
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
if !pushdown_filters {
return Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(|f| PredicateSupport::Unsupported(f.into_inner()))
.collect_vec(),
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
)
.with_updated_node(source));
}
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
filters.iter().map(|f| f.discriminant).collect(),
)
.with_updated_node(source))
}

fn with_schema_adapter_factory(
Expand Down
11 changes: 3 additions & 8 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::filter_pushdown::{
FilterPushdownPropagation, PredicateSupport,
};
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;

Expand Down Expand Up @@ -122,11 +120,8 @@ pub trait FileSource: Send + Sync {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}

Expand Down
35 changes: 15 additions & 20 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion_physical_expr::{
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::filter::collect_columns_from_predicate;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};

/// A source of data, typically a list of files or memory
Expand Down Expand Up @@ -172,11 +172,8 @@ pub trait DataSource: Send + Sync + Debug {
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
Ok(FilterPushdownPropagation::with_filters(
filters
.into_iter()
.map(PredicateSupport::Unsupported)
.collect(),
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
}
Expand Down Expand Up @@ -324,17 +321,14 @@ impl ExecutionPlan for DataSourceExec {
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// Push any remaining filters into our data source
let res = self.data_source.try_pushdown_filters(
child_pushdown_result
.parent_filters
.into_iter()
.map(|f| match f {
PredicateSupport::Supported(expr) => expr,
PredicateSupport::Unsupported(expr) => expr,
})
.collect(),
config,
)?;
let parent_filters = child_pushdown_result
.parent_filters
.into_iter()
.map(|f| f.filter)
.collect_vec();
let res = self
.data_source
.try_pushdown_filters(parent_filters.clone(), config)?;
match res.updated_node {
Some(data_source) => {
let mut new_node = self.clone();
Expand All @@ -346,9 +340,10 @@ impl ExecutionPlan for DataSourceExec {
let filter = conjunction(
res.filters
.iter()
.filter_map(|f| match f {
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
PredicateSupport::Unsupported(_) => None,
.zip(parent_filters)
.filter_map(|(s, f)| match s {
PushedDown::Yes => Some(f),
PushedDown::No => None,
})
.collect_vec(),
);
Expand Down
Loading