Skip to content

Commit 3725020

Browse files
asubiottotobixdev
authored andcommitted
physical-plan: push filters down to UnionExec children (apache#18054)
Filters are safe to be pushed down, so we can override the default behavior here. Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 29d30cc commit 3725020

File tree

2 files changed

+49
-9
lines changed
  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src

2 files changed

+49
-9
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use datafusion_physical_plan::{
6060
ExecutionPlan,
6161
};
6262

63+
use datafusion_physical_plan::union::UnionExec;
6364
use futures::StreamExt;
6465
use object_store::{memory::InMemory, ObjectStore};
6566
use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder};
@@ -1833,6 +1834,34 @@ STORED AS PARQUET;
18331834
// Pushdown pruned most rows
18341835
}
18351836

1837+
#[test]
1838+
fn test_filter_pushdown_through_union() {
1839+
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
1840+
let scan2 = TestScanBuilder::new(schema()).with_support(true).build();
1841+
1842+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
1843+
1844+
let predicate = col_lit_predicate("a", "foo", &schema());
1845+
let plan = Arc::new(FilterExec::try_new(predicate, union).unwrap());
1846+
1847+
insta::assert_snapshot!(
1848+
OptimizationTest::new(plan, FilterPushdown::new(), true),
1849+
@r"
1850+
OptimizationTest:
1851+
input:
1852+
- FilterExec: a@0 = foo
1853+
- UnionExec
1854+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1855+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1856+
output:
1857+
Ok:
1858+
- UnionExec
1859+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
1860+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
1861+
"
1862+
);
1863+
}
1864+
18361865
/// Schema:
18371866
/// a: String
18381867
/// b: String

datafusion/physical-plan/src/union.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@ use crate::execution_plan::{
3636
boundedness_from_children, check_default_invariants, emission_type_from_children,
3737
InvariantLevel,
3838
};
39+
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
3940
use crate::metrics::BaselineMetrics;
4041
use crate::projection::{make_with_child, ProjectionExec};
4142
use crate::stream::ObservedStream;
4243

4344
use arrow::datatypes::{Field, Schema, SchemaRef};
4445
use arrow::record_batch::RecordBatch;
46+
use datafusion_common::config::ConfigOptions;
4547
use datafusion_common::stats::Precision;
4648
use datafusion_common::{exec_err, internal_datafusion_err, internal_err, Result};
4749
use datafusion_execution::TaskContext;
48-
use datafusion_physical_expr::{calculate_union, EquivalenceProperties};
50+
use datafusion_physical_expr::{calculate_union, EquivalenceProperties, PhysicalExpr};
4951

5052
use futures::Stream;
5153
use itertools::Itertools;
@@ -218,10 +220,6 @@ impl ExecutionPlan for UnionExec {
218220
})
219221
}
220222

221-
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
222-
self.inputs.iter().collect()
223-
}
224-
225223
fn maintains_input_order(&self) -> Vec<bool> {
226224
// If the Union has an output ordering, it maintains at least one
227225
// child's ordering (i.e. the meet).
@@ -247,6 +245,14 @@ impl ExecutionPlan for UnionExec {
247245
}
248246
}
249247

248+
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
249+
vec![false; self.children().len()]
250+
}
251+
252+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
253+
self.inputs.iter().collect()
254+
}
255+
250256
fn with_new_children(
251257
self: Arc<Self>,
252258
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -324,10 +330,6 @@ impl ExecutionPlan for UnionExec {
324330
}
325331
}
326332

327-
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
328-
vec![false; self.children().len()]
329-
}
330-
331333
fn supports_limit_pushdown(&self) -> bool {
332334
true
333335
}
@@ -352,6 +354,15 @@ impl ExecutionPlan for UnionExec {
352354

353355
Ok(Some(UnionExec::try_new(new_children.clone())?))
354356
}
357+
358+
fn gather_filters_for_pushdown(
359+
&self,
360+
_phase: FilterPushdownPhase,
361+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
362+
_config: &ConfigOptions,
363+
) -> Result<FilterDescription> {
364+
FilterDescription::from_children(parent_filters, &self.children())
365+
}
355366
}
356367

357368
/// Combines multiple input streams by interleaving them.

0 commit comments

Comments
 (0)