Skip to content

Commit 62407e7

Browse files
committed
fix: Prevent duplicate expressions in DynamicPhysicalExpr
1 parent bfc5067 commit 62407e7

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ impl ExecutionPlan for HashJoinExec {
10201020
vec![],
10211021
self.right.output_ordering().is_some(),
10221022
bounds_accumulator,
1023+
self.mode,
10231024
)))
10241025
}
10251026

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,10 @@ impl SharedBoundsAccumulator {
128128
/// ## Partition Mode Execution Patterns
129129
///
130130
/// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
131-
/// across all output partitions. Each output partition calls `collect_build_side` to access
132-
/// the shared build data. Expected calls = number of output partitions.
131+
/// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
132+
/// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely.
133+
/// Expected calls = number of output partitions.
134+
///
133135
///
134136
/// - **Partitioned**: Each partition independently builds its own hash table by calling
135137
/// `collect_build_side` once. Expected calls = number of build partitions.
@@ -260,22 +262,34 @@ impl SharedBoundsAccumulator {
260262
/// consider making the resulting future shared so the ready result can be reused.
261263
///
262264
/// # Arguments
263-
/// * `partition` - The partition identifier reporting its bounds
265+
/// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
264266
/// * `partition_bounds` - The bounds computed by this partition (if any)
265267
///
266268
/// # Returns
267269
/// * `Result<()>` - Ok if successful, Err if filter update failed
268270
pub(crate) async fn report_partition_bounds(
269271
&self,
270-
partition: usize,
272+
left_side_partition_id: usize,
271273
partition_bounds: Option<Vec<ColumnBounds>>,
272274
) -> Result<()> {
273275
// Store bounds in the accumulator - this runs once per partition
274276
if let Some(bounds) = partition_bounds {
275-
self.inner
276-
.lock()
277-
.bounds
278-
.push(PartitionBounds::new(partition, bounds));
277+
let mut guard = self.inner.lock();
278+
279+
let should_push = if let Some(last_bound) = guard.bounds.last() {
280+
// In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
281+
// Since this function can be called multiple times for that same partition, we must deduplicate
282+
// by checking against the last recorded bound.
283+
last_bound.partition != left_side_partition_id
284+
} else {
285+
true
286+
};
287+
288+
if should_push {
289+
guard
290+
.bounds
291+
.push(PartitionBounds::new(left_side_partition_id, bounds));
292+
}
279293
}
280294

281295
if self.barrier.wait().await.is_leader() {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
2828
use crate::joins::utils::{
2929
equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
3030
};
31+
use crate::joins::PartitionMode;
3132
use crate::{
3233
handle_state,
3334
hash_utils::create_hashes,
@@ -210,6 +211,9 @@ pub(super) struct HashJoinStream {
210211
/// Optional future to signal when bounds have been reported by all partitions
211212
/// and the dynamic filter has been updated
212213
bounds_waiter: Option<OnceFut<()>>,
214+
215+
/// Partitioning mode to use
216+
mode: PartitionMode,
213217
}
214218

215219
impl RecordBatchStream for HashJoinStream {
@@ -312,6 +316,7 @@ impl HashJoinStream {
312316
hashes_buffer: Vec<u64>,
313317
right_side_ordered: bool,
314318
bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
319+
mode: PartitionMode,
315320
) -> Self {
316321
Self {
317322
partition,
@@ -331,6 +336,7 @@ impl HashJoinStream {
331336
right_side_ordered,
332337
bounds_accumulator,
333338
bounds_waiter: None,
339+
mode,
334340
}
335341
}
336342

@@ -406,11 +412,17 @@ impl HashJoinStream {
406412
// Report bounds to the accumulator which will handle synchronization and filter updates
407413
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
408414
let bounds_accumulator = Arc::clone(bounds_accumulator);
409-
let partition = self.partition;
415+
416+
let left_side_partition_id = match self.mode {
417+
PartitionMode::Partitioned => self.partition,
418+
PartitionMode::CollectLeft => 0,
419+
PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
420+
};
421+
410422
let left_data_bounds = left_data.bounds.clone();
411423
self.bounds_waiter = Some(OnceFut::new(async move {
412424
bounds_accumulator
413-
.report_partition_bounds(partition, left_data_bounds)
425+
.report_partition_bounds(left_side_partition_id, left_data_bounds)
414426
.await
415427
}));
416428
self.state = HashJoinStreamState::WaitPartitionBoundsReport;

0 commit comments

Comments
 (0)