Skip to content

Commit 6851d8e

Browse files
liamzwbaoxudong963
authored andcommitted
Add the missing equivalence info for filter pushdown
1 parent e5e5c48 commit 6851d8e

File tree

3 files changed

+36
-2
lines changed

3 files changed

+36
-2
lines changed

datafusion/datasource/src/source.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@ use crate::file_scan_config::FileScanConfig;
3333
use datafusion_common::config::ConfigOptions;
3434
use datafusion_common::{Constraints, Result, Statistics};
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36-
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
36+
use datafusion_physical_expr::{
37+
conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
38+
};
3739
use datafusion_physical_expr_common::sort_expr::LexOrdering;
40+
use datafusion_physical_plan::filter::collect_columns_from_predicate;
3841
use datafusion_physical_plan::filter_pushdown::{
3942
ChildPushdownResult, FilterPushdownPropagation,
4043
};
@@ -328,6 +331,9 @@ impl ExecutionPlan for DataSourceExec {
328331
new_node.data_source = data_source;
329332
new_node.cache =
330333
Self::compute_properties(Arc::clone(&new_node.data_source));
334+
// Add the missing filters' equivalence info when filters pushdown is applied
335+
let filter = conjunction(res.filters.collect_supported());
336+
new_node = new_node.add_filter_equivalence_info(filter)?;
331337
Ok(FilterPushdownPropagation {
332338
filters: res.filters,
333339
updated_node: Some(Arc::new(new_node)),
@@ -374,6 +380,20 @@ impl DataSourceExec {
374380
self
375381
}
376382

383+
/// Add filters' equivalence info
384+
fn add_filter_equivalence_info(
385+
mut self,
386+
filter: Arc<dyn PhysicalExpr>,
387+
) -> Result<Self> {
388+
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
389+
for (lhs, rhs) in equal_pairs {
390+
self.cache
391+
.eq_properties
392+
.add_equal_conditions(&Arc::clone(lhs), &Arc::clone(rhs))?
393+
}
394+
Ok(self)
395+
}
396+
377397
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
378398
PlanProperties::new(
379399
data_source.eq_properties(),

datafusion/physical-plan/src/filter.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,9 @@ impl RecordBatchStream for FilterExecStream {
749749
}
750750

751751
/// Return the equals Column-Pairs and Non-equals Column-Pairs
752-
fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> EqualAndNonEqual {
752+
pub fn collect_columns_from_predicate(
753+
predicate: &Arc<dyn PhysicalExpr>,
754+
) -> EqualAndNonEqual {
753755
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
754756
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
755757

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,18 @@ impl PredicateSupports {
102102
.collect()
103103
}
104104

105+
/// Collect supported filters into a Vec, without removing them from the original
106+
/// [`PredicateSupport`].
107+
pub fn collect_supported(&self) -> Vec<Arc<dyn PhysicalExpr>> {
108+
self.0
109+
.iter()
110+
.filter_map(|f| match f {
111+
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
112+
PredicateSupport::Unsupported(_) => None,
113+
})
114+
.collect()
115+
}
116+
105117
/// Collect all filters into a Vec, without removing them from the original
106118
/// FilterPushdowns.
107119
pub fn collect_all(self) -> Vec<Arc<dyn PhysicalExpr>> {

0 commit comments

Comments
 (0)