Skip to content

Commit 2a1052c

Browse files
adriangbLiaCastaneda
authored andcommitted
Enable physical filter pushdown for hash joins (apache#16954)
(cherry picked from commit b10f453)
1 parent 2cdf059 commit 2a1052c

File tree

4 files changed

+350
-18
lines changed

4 files changed

+350
-18
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,269 @@ fn test_pushdown_into_scan_with_config_options() {
121121
);
122122
}
123123

124+
#[tokio::test]
125+
async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
126+
use datafusion_common::JoinType;
127+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
128+
129+
// Create build side with limited values
130+
let build_batches = vec![record_batch!(
131+
("a", Utf8, ["aa", "ab"]),
132+
("b", Utf8, ["ba", "bb"]),
133+
("c", Float64, [1.0, 2.0])
134+
)
135+
.unwrap()];
136+
let build_side_schema = Arc::new(Schema::new(vec![
137+
Field::new("a", DataType::Utf8, false),
138+
Field::new("b", DataType::Utf8, false),
139+
Field::new("c", DataType::Float64, false),
140+
]));
141+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
142+
.with_support(true)
143+
.with_batches(build_batches)
144+
.build();
145+
146+
// Create probe side with more values
147+
let probe_batches = vec![record_batch!(
148+
("d", Utf8, ["aa", "ab", "ac", "ad"]),
149+
("e", Utf8, ["ba", "bb", "bc", "bd"]),
150+
("f", Float64, [1.0, 2.0, 3.0, 4.0])
151+
)
152+
.unwrap()];
153+
let probe_side_schema = Arc::new(Schema::new(vec![
154+
Field::new("d", DataType::Utf8, false),
155+
Field::new("e", DataType::Utf8, false),
156+
Field::new("f", DataType::Float64, false),
157+
]));
158+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
159+
.with_support(true)
160+
.with_batches(probe_batches)
161+
.build();
162+
163+
// Create HashJoinExec
164+
let on = vec![(
165+
col("a", &build_side_schema).unwrap(),
166+
col("d", &probe_side_schema).unwrap(),
167+
)];
168+
let join = Arc::new(
169+
HashJoinExec::try_new(
170+
build_scan,
171+
probe_scan,
172+
on,
173+
None,
174+
&JoinType::Inner,
175+
None,
176+
PartitionMode::Partitioned,
177+
datafusion_common::NullEquality::NullEqualsNothing,
178+
)
179+
.unwrap(),
180+
);
181+
182+
let join_schema = join.schema();
183+
184+
// Finally let's add a SortExec on the outside to test pushdown of dynamic filters
185+
let sort_expr =
186+
PhysicalSortExpr::new(col("e", &join_schema).unwrap(), SortOptions::default());
187+
let plan = Arc::new(
188+
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), join)
189+
.with_fetch(Some(2)),
190+
) as Arc<dyn ExecutionPlan>;
191+
192+
let mut config = ConfigOptions::default();
193+
config.optimizer.enable_dynamic_filter_pushdown = true;
194+
config.execution.parquet.pushdown_filters = true;
195+
196+
// Appy the FilterPushdown optimizer rule
197+
let plan = FilterPushdown::new_post_optimization()
198+
.optimize(Arc::clone(&plan), &config)
199+
.unwrap();
200+
201+
// Test that filters are pushed down correctly to each side of the join
202+
insta::assert_snapshot!(
203+
format_plan_for_test(&plan),
204+
@r"
205+
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
206+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
207+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
208+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
209+
"
210+
);
211+
212+
// Put some data through the plan to check that the filter is updated to reflect the TopK state
213+
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
214+
session_ctx.register_object_store(
215+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
216+
Arc::new(InMemory::new()),
217+
);
218+
let state = session_ctx.state();
219+
let task_ctx = state.task_ctx();
220+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
221+
// Iterate one batch
222+
stream.next().await.unwrap().unwrap();
223+
224+
// Test that filters are pushed down correctly to each side of the join
225+
insta::assert_snapshot!(
226+
format_plan_for_test(&plan),
227+
@r"
228+
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
229+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
230+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
231+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
232+
"
233+
);
234+
}
235+
236+
// Test both static and dynamic filter pushdown in HashJoinExec.
237+
// Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
238+
// However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
239+
// Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
240+
#[tokio::test]
241+
async fn test_static_filter_pushdown_through_hash_join() {
242+
use datafusion_common::JoinType;
243+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
244+
245+
// Create build side with limited values
246+
let build_batches = vec![record_batch!(
247+
("a", Utf8, ["aa", "ab"]),
248+
("b", Utf8, ["ba", "bb"]),
249+
("c", Float64, [1.0, 2.0])
250+
)
251+
.unwrap()];
252+
let build_side_schema = Arc::new(Schema::new(vec![
253+
Field::new("a", DataType::Utf8, false),
254+
Field::new("b", DataType::Utf8, false),
255+
Field::new("c", DataType::Float64, false),
256+
]));
257+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
258+
.with_support(true)
259+
.with_batches(build_batches)
260+
.build();
261+
262+
// Create probe side with more values
263+
let probe_batches = vec![record_batch!(
264+
("d", Utf8, ["aa", "ab", "ac", "ad"]),
265+
("e", Utf8, ["ba", "bb", "bc", "bd"]),
266+
("f", Float64, [1.0, 2.0, 3.0, 4.0])
267+
)
268+
.unwrap()];
269+
let probe_side_schema = Arc::new(Schema::new(vec![
270+
Field::new("d", DataType::Utf8, false),
271+
Field::new("e", DataType::Utf8, false),
272+
Field::new("f", DataType::Float64, false),
273+
]));
274+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
275+
.with_support(true)
276+
.with_batches(probe_batches)
277+
.build();
278+
279+
// Create HashJoinExec
280+
let on = vec![(
281+
col("a", &build_side_schema).unwrap(),
282+
col("d", &probe_side_schema).unwrap(),
283+
)];
284+
let join = Arc::new(
285+
HashJoinExec::try_new(
286+
build_scan,
287+
probe_scan,
288+
on,
289+
None,
290+
&JoinType::Inner,
291+
None,
292+
PartitionMode::Partitioned,
293+
datafusion_common::NullEquality::NullEqualsNothing,
294+
)
295+
.unwrap(),
296+
);
297+
298+
// Create filters that can be pushed down to different sides
299+
// We need to create filters in the context of the join output schema
300+
let join_schema = join.schema();
301+
302+
// Filter on build side column: a = 'aa'
303+
let left_filter = col_lit_predicate("a", "aa", &join_schema);
304+
// Filter on probe side column: e = 'ba'
305+
let right_filter = col_lit_predicate("e", "ba", &join_schema);
306+
// Filter that references both sides: a = d (should not be pushed down)
307+
let cross_filter = Arc::new(BinaryExpr::new(
308+
col("a", &join_schema).unwrap(),
309+
Operator::Eq,
310+
col("d", &join_schema).unwrap(),
311+
)) as Arc<dyn PhysicalExpr>;
312+
313+
let filter =
314+
Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap());
315+
let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
316+
let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
317+
as Arc<dyn ExecutionPlan>;
318+
319+
// Test that filters are pushed down correctly to each side of the join
320+
insta::assert_snapshot!(
321+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
322+
@r"
323+
OptimizationTest:
324+
input:
325+
- FilterExec: a@0 = d@3
326+
- FilterExec: e@4 = ba
327+
- FilterExec: a@0 = aa
328+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
329+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
330+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
331+
output:
332+
Ok:
333+
- FilterExec: a@0 = d@3
334+
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
335+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
336+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba
337+
"
338+
);
339+
340+
// Test left join - filters should NOT be pushed down
341+
let join = Arc::new(
342+
HashJoinExec::try_new(
343+
TestScanBuilder::new(Arc::clone(&build_side_schema))
344+
.with_support(true)
345+
.build(),
346+
TestScanBuilder::new(Arc::clone(&probe_side_schema))
347+
.with_support(true)
348+
.build(),
349+
vec![(
350+
col("a", &build_side_schema).unwrap(),
351+
col("d", &probe_side_schema).unwrap(),
352+
)],
353+
None,
354+
&JoinType::Left,
355+
None,
356+
PartitionMode::Partitioned,
357+
datafusion_common::NullEquality::NullEqualsNothing,
358+
)
359+
.unwrap(),
360+
);
361+
362+
let join_schema = join.schema();
363+
let filter = col_lit_predicate("a", "aa", &join_schema);
364+
let plan =
365+
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;
366+
367+
// Test that filters are NOT pushed down for left join
368+
insta::assert_snapshot!(
369+
OptimizationTest::new(plan, FilterPushdown::new(), true),
370+
@r"
371+
OptimizationTest:
372+
input:
373+
- FilterExec: a@0 = aa
374+
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
375+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
376+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
377+
output:
378+
Ok:
379+
- FilterExec: a@0 = aa
380+
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
381+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
382+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
383+
"
384+
);
385+
}
386+
124387
#[test]
125388
fn test_filter_collapse() {
126389
// filter should be pushed down into the parquet scan with two filters

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
1919
use crate::filter_pushdown::{
20-
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21-
FilterPushdownPropagation, PushedDownPredicate,
20+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21+
FilterPushdownPropagation,
2222
};
2323
pub use crate::metrics::Metric;
2424
pub use crate::ordering::InputOrderMode;
@@ -33,7 +33,6 @@ pub use datafusion_physical_expr::window::WindowExpr;
3333
pub use datafusion_physical_expr::{
3434
expressions, Distribution, Partitioning, PhysicalExpr,
3535
};
36-
use itertools::Itertools;
3736

3837
use std::any::Any;
3938
use std::fmt::Debug;
@@ -521,19 +520,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
521520
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
522521
_config: &ConfigOptions,
523522
) -> Result<FilterDescription> {
524-
// Default implementation: mark all filters as unsupported for all children
525-
let mut desc = FilterDescription::new();
526-
let child_filters = parent_filters
527-
.iter()
528-
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
529-
.collect_vec();
530-
for _ in 0..self.children().len() {
531-
desc = desc.with_child(ChildFilterDescription {
532-
parent_filters: child_filters.clone(),
533-
self_filters: vec![],
534-
});
535-
}
536-
Ok(desc)
523+
Ok(FilterDescription::all_unsupported(
524+
&parent_filters,
525+
&self.children(),
526+
))
537527
}
538528

539529
/// Handle the result of a child pushdown.

datafusion/physical-plan/src/filter_pushdown.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ use std::sync::Arc;
4040
use datafusion_common::Result;
4141
use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns};
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
43+
use itertools::Itertools;
4344

44-
#[derive(Debug, Clone, Copy)]
45+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4546
pub enum FilterPushdownPhase {
4647
/// Pushdown that happens before most other optimizations.
4748
/// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down.
@@ -257,6 +258,19 @@ impl<T> FilterPushdownPropagation<T> {
257258
}
258259
}
259260

261+
/// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results.
262+
pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self {
263+
let filters = child_pushdown_result
264+
.parent_filters
265+
.into_iter()
266+
.map(|_| PushedDown::No)
267+
.collect();
268+
Self {
269+
filters,
270+
updated_node: None,
271+
}
272+
}
273+
260274
/// Create a new [`FilterPushdownPropagation`] with the specified filter support.
261275
/// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was.
262276
pub fn with_parent_pushdown_result(filters: Vec<PushedDown>) -> Self {
@@ -413,6 +427,25 @@ impl FilterDescription {
413427
Ok(desc)
414428
}
415429

430+
/// Mark all parent filters as unsupported for all children.
431+
pub fn all_unsupported(
432+
parent_filters: &[Arc<dyn PhysicalExpr>],
433+
children: &[&Arc<dyn crate::ExecutionPlan>],
434+
) -> Self {
435+
let mut desc = Self::new();
436+
let child_filters = parent_filters
437+
.iter()
438+
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
439+
.collect_vec();
440+
for _ in 0..children.len() {
441+
desc = desc.with_child(ChildFilterDescription {
442+
parent_filters: child_filters.clone(),
443+
self_filters: vec![],
444+
});
445+
}
446+
desc
447+
}
448+
416449
pub fn parent_filters(&self) -> Vec<Vec<PushedDownPredicate>> {
417450
self.child_filter_descriptions
418451
.iter()

0 commit comments

Comments
 (0)