Skip to content

Commit 64c4027

Browse files
authored
fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (#17371)
1 parent 6fd5685 commit 64c4027

File tree

2 files changed

+43
-37
lines changed
  • datafusion
    • core/tests/physical_optimizer/filter_pushdown
    • physical-plan/src/joins/hash_join

2 files changed

+43
-37
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,11 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
922922
let plan = FilterPushdown::new_post_optimization()
923923
.optimize(plan, &config)
924924
.unwrap();
925+
926+
// Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children`
927+
let children = plan.children().into_iter().map(Arc::clone).collect();
928+
let plan = plan.with_new_children(children).unwrap();
929+
925930
let config = SessionConfig::new().with_batch_size(10);
926931
let session_ctx = SessionContext::new_with_config(config);
927932
session_ctx.register_object_store(

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -354,12 +354,18 @@ pub struct HashJoinExec {
354354
/// Cache holding plan properties like equivalences, output partitioning etc.
355355
cache: PlanProperties,
356356
/// Dynamic filter for pushing down to the probe side
357-
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result
358-
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
359-
/// Shared bounds accumulator for coordinating dynamic filter updates across partitions
360-
/// Only created when dynamic filter pushdown is enabled.
361-
/// Lazily initialized at execution time to use actual runtime partition counts
362-
bounds_accumulator: Option<OnceLock<Arc<SharedBoundsAccumulator>>>,
357+
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
358+
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
359+
dynamic_filter: Option<HashJoinExecDynamicFilter>,
360+
}
361+
362+
#[derive(Clone)]
363+
struct HashJoinExecDynamicFilter {
364+
/// Dynamic filter that we'll update with the results of the build side once that is done.
365+
filter: Arc<DynamicFilterPhysicalExpr>,
366+
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
367+
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
368+
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
363369
}
364370

365371
impl fmt::Debug for HashJoinExec {
@@ -453,7 +459,6 @@ impl HashJoinExec {
453459
null_equality,
454460
cache,
455461
dynamic_filter: None,
456-
bounds_accumulator: None,
457462
})
458463
}
459464

@@ -852,7 +857,6 @@ impl ExecutionPlan for HashJoinExec {
852857
)?,
853858
// Keep the dynamic filter, bounds accumulator will be reset
854859
dynamic_filter: self.dynamic_filter.clone(),
855-
bounds_accumulator: None,
856860
}))
857861
}
858862

@@ -875,7 +879,6 @@ impl ExecutionPlan for HashJoinExec {
875879
cache: self.cache.clone(),
876880
// Reset dynamic filter and bounds accumulator to initial state
877881
dynamic_filter: None,
878-
bounds_accumulator: None,
879882
}))
880883
}
881884

@@ -957,32 +960,28 @@ impl ExecutionPlan for HashJoinExec {
957960
let batch_size = context.session_config().batch_size();
958961

959962
// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
960-
let bounds_accumulator = if enable_dynamic_filter_pushdown
961-
&& self.dynamic_filter.is_some()
962-
{
963-
if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator {
964-
let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap());
965-
let on_right = self
966-
.on
967-
.iter()
968-
.map(|(_, right_expr)| Arc::clone(right_expr))
969-
.collect::<Vec<_>>();
970-
971-
Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| {
972-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
973-
self.mode,
974-
self.left.as_ref(),
975-
self.right.as_ref(),
976-
dynamic_filter,
977-
on_right,
978-
))
979-
})))
980-
} else {
981-
None
982-
}
983-
} else {
984-
None
985-
};
963+
let bounds_accumulator = enable_dynamic_filter_pushdown
964+
.then(|| {
965+
self.dynamic_filter.as_ref().map(|df| {
966+
let filter = Arc::clone(&df.filter);
967+
let on_right = self
968+
.on
969+
.iter()
970+
.map(|(_, right_expr)| Arc::clone(right_expr))
971+
.collect::<Vec<_>>();
972+
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
973+
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
974+
self.mode,
975+
self.left.as_ref(),
976+
self.right.as_ref(),
977+
filter,
978+
on_right,
979+
))
980+
})))
981+
})
982+
})
983+
.flatten()
984+
.flatten();
986985

987986
// we have the batches and the hash map with their keys. We can how create a stream
988987
// over the right that uses this information to issue new batches.
@@ -1177,8 +1176,10 @@ impl ExecutionPlan for HashJoinExec {
11771176
column_indices: self.column_indices.clone(),
11781177
null_equality: self.null_equality,
11791178
cache: self.cache.clone(),
1180-
dynamic_filter: Some(dynamic_filter),
1181-
bounds_accumulator: Some(OnceLock::new()),
1179+
dynamic_filter: Some(HashJoinExecDynamicFilter {
1180+
filter: dynamic_filter,
1181+
bounds_accumulator: OnceLock::new(),
1182+
}),
11821183
});
11831184
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
11841185
}

0 commit comments

Comments
 (0)