Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,11 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();

// Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children`
let children = plan.children().into_iter().map(Arc::clone).collect();
let plan = plan.with_new_children(children).unwrap();

let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
Expand Down
75 changes: 38 additions & 37 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,18 @@ pub struct HashJoinExec {
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Dynamic filter for pushing down to the probe side
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
/// Shared bounds accumulator for coordinating dynamic filter updates across partitions
/// Only created when dynamic filter pushdown is enabled.
/// Lazily initialized at execution time to use actual runtime partition counts
bounds_accumulator: Option<OnceLock<Arc<SharedBoundsAccumulator>>>,
/// Set when dynamic filter pushdown is detected in handle_child_pushdown_result.
/// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates.
dynamic_filter: Option<HashJoinExecDynamicFilter>,
}

#[derive(Clone)]
struct HashJoinExecDynamicFilter {
/// Dynamic filter that we'll update with the results of the build side once that is done.
filter: Arc<DynamicFilterPhysicalExpr>,
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
}

impl fmt::Debug for HashJoinExec {
Expand Down Expand Up @@ -453,7 +459,6 @@ impl HashJoinExec {
null_equality,
cache,
dynamic_filter: None,
bounds_accumulator: None,
})
}

Expand Down Expand Up @@ -837,7 +842,6 @@ impl ExecutionPlan for HashJoinExec {
)?,
// Keep the dynamic filter, bounds accumulator will be reset
dynamic_filter: self.dynamic_filter.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's possible that subsequent optimization rules can break reference integrity, does it make sense to preserve this unconditionally?

Maybe we can an additional check and see if the dynamic filter is preserved on the incoming right child?

I wonder if in any case we can reset the bounds accumulator to OnceLock::new() since it's lazily initialized during execute()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's possible that subsequent optimization rules can break reference integrity, does it make sense to preserve this unconditionally?

FWIW this should never cause incorrect results, just disable the optimization (the filter never gets updated).
For HashJoinExec the dynamic filter and bounds accumulator go hand in hand: it makes sense to copy them together.

I wonder if in any case we can reset the bounds accumulator to OnceLock::new() since it's lazily initialized during execute()

We could, but I'm not sure that's a case we'll ever hit. When would with_new_children be called after execution has started?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be reasonable to always call DynamicFilterPhysicalExpr::update(lit(true)) from ExecutionPlan::with_new_children 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I'm not sure that's a case we'll ever hit. When would with_new_children be called after execution has started?

Yeah, while probably possible from a usage perspective I agree it's unrealistic.

FWIW this should never cause incorrect results, just disable the optimization (the filter never gets updated).

Good point - I guess the downside is just the stale dynamic_filter being left around in the case it becomes orphaned. But that's probably not a big deal.

bounds_accumulator: None,
}))
}

Expand All @@ -860,7 +864,6 @@ impl ExecutionPlan for HashJoinExec {
cache: self.cache.clone(),
// Reset dynamic filter and bounds accumulator to initial state
dynamic_filter: None,
bounds_accumulator: None,
}))
}

Expand Down Expand Up @@ -942,32 +945,28 @@ impl ExecutionPlan for HashJoinExec {
let batch_size = context.session_config().batch_size();

// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
let bounds_accumulator = if enable_dynamic_filter_pushdown
&& self.dynamic_filter.is_some()
{
if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator {
let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap());
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();

Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
dynamic_filter,
on_right,
))
})))
} else {
None
}
} else {
None
};
let bounds_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
))
})))
})
})
.flatten()
.flatten();

// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
Expand Down Expand Up @@ -1162,8 +1161,10 @@ impl ExecutionPlan for HashJoinExec {
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Some(dynamic_filter),
bounds_accumulator: Some(OnceLock::new()),
dynamic_filter: Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
bounds_accumulator: OnceLock::new(),
}),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
Expand Down
Loading