From 4346cb868c95bfde3fb23351836157524a06050b Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Thu, 20 Mar 2025 18:52:11 +0100 Subject: [PATCH 01/10] refactor: Catch PartitionMode:Auto in execute explicitly --- datafusion/physical-plan/src/joins/hash_join.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e9d6354e21d71..21e2b697737d8 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -791,6 +791,13 @@ impl ExecutionPlan for HashJoinExec { ); } + if self.mode == PartitionMode::Auto { + return plan_err!( + "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", + PartitionMode::Auto + ); + } + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.once(|| { @@ -825,12 +832,7 @@ impl ExecutionPlan for HashJoinExec { 1, )) } - PartitionMode::Auto => { - return plan_err!( - "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", - PartitionMode::Auto - ); - } + PartitionMode::Auto => unreachable!(), }; let batch_size = context.session_config().batch_size(); From 94f3fa7d9f2cc15f6d4115256f304248a9f4da8d Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 21 Mar 2025 10:16:03 +0100 Subject: [PATCH 02/10] refactor(hash_join): Move coalesce logic into function --- datafusion/physical-plan/src/joins/hash_join.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 21e2b697737d8..b85b74cdbdb68 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -929,6 +929,14 @@ impl ExecutionPlan for HashJoinExec { } } +fn coalesce_partitions_if_needed(plan: Arc) -> Arc { + if plan.output_partitioning().partition_count() == 1 { + plan + } else { + Arc::new(CoalescePartitionsExec::new(plan)) + } +} + /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) #[allow(clippy::too_many_arguments)] @@ -947,10 +955,9 @@ async fn collect_left_input( let (left_input, left_input_partition) = if let Some(partition) = partition { (left, partition) - } else if left.output_partitioning().partition_count() != 1 { - (Arc::new(CoalescePartitionsExec::new(left)) as _, 0) } else { - (left, 0) + let left_input = coalesce_partitions_if_needed(left); + (left_input, 0) }; // Depending on partition argument load single partition or whole left side in memory From a0ba09197fdbd1b766d692768f3ed53939ae5ca6 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 21 Mar 2025 10:16:03 +0100 Subject: [PATCH 03/10] refactor(hash_join): Move coalesce logic out of collect_left_input --- .../physical-plan/src/joins/hash_join.rs | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b85b74cdbdb68..5ddb6c5b0fafa 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -803,10 +803,12 @@ impl ExecutionPlan for HashJoinExec { PartitionMode::CollectLeft => self.left_fut.once(|| { let reservation = MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); + + let left = coalesce_partitions_if_needed(self.left.clone()); collect_left_input( - None, + 0, self.random_state.clone(), - Arc::clone(&self.left), + left, on_left.clone(), Arc::clone(&context), join_metrics.clone(), @@ -821,7 +823,7 @@ impl ExecutionPlan for HashJoinExec { .register(context.memory_pool()); OnceFut::new(collect_left_input( - Some(partition), + partition, self.random_state.clone(), Arc::clone(&self.left), on_left.clone(), @@ -941,7 +943,7 @@ fn coalesce_partitions_if_needed(plan: Arc) -> Arc, + partition: usize, random_state: RandomState, left: Arc, on_left: Vec, @@ -953,15 +955,7 @@ async fn collect_left_input( ) -> Result { let schema = left.schema(); - let (left_input, left_input_partition) = if let Some(partition) = partition { - (left, partition) - } else { - let left_input = coalesce_partitions_if_needed(left); - (left_input, 0) - }; - - // Depending on partition argument load single partition or whole left side in memory - let stream = left_input.execute(left_input_partition, Arc::clone(&context))?; + let stream = left.execute(partition, Arc::clone(&context))?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream From da06eff8feaa557b9508553ea97c029254c1ef76 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 21 Mar 2025 10:28:53 +0100 Subject: [PATCH 04/10] refactor(hash_join): Execute build side earlier --- .../physical-plan/src/joins/hash_join.rs | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 5ddb6c5b0fafa..a271442b781c9 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -800,34 +800,36 @@ impl ExecutionPlan for HashJoinExec { let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { - PartitionMode::CollectLeft => self.left_fut.once(|| { - let reservation = - MemoryConsumer::new("HashJoinInput").register(context.memory_pool()); + PartitionMode::CollectLeft => { + let left = coalesce_partitions_if_needed(Arc::clone(&self.left)); + let left_stream = left.execute(0, Arc::clone(&context))?; - let left = coalesce_partitions_if_needed(self.left.clone()); - collect_left_input( - 0, - self.random_state.clone(), - left, - on_left.clone(), - Arc::clone(&context), - join_metrics.clone(), - reservation, - need_produce_result_in_final(self.join_type), - self.right().output_partitioning().partition_count(), - ) - }), + self.left_fut.once(|| { + let reservation = MemoryConsumer::new("HashJoinInput") + .register(context.memory_pool()); + + collect_left_input( + self.random_state.clone(), + left_stream, + on_left.clone(), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + self.right().output_partitioning().partition_count(), + ) + }) + } PartitionMode::Partitioned => { + let left_stream = self.left.execute(partition, Arc::clone(&context))?; + let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); OnceFut::new(collect_left_input( - partition, self.random_state.clone(), - Arc::clone(&self.left), + left_stream, on_left.clone(), - Arc::clone(&context), join_metrics.clone(), reservation, need_produce_result_in_final(self.join_type), @@ -943,25 +945,21 @@ fn coalesce_partitions_if_needed(plan: Arc) -> Arc, + left_stream: SendableRecordBatchStream, on_left: Vec, - context: Arc, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, ) -> Result { - let schema = left.schema(); - - let stream = left.execute(partition, Arc::clone(&context))?; + let schema = left_stream.schema(); // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. let initial = (Vec::new(), 0, metrics, reservation); - let (batches, num_rows, metrics, mut reservation) = stream + let (batches, num_rows, metrics, mut reservation) = left_stream .try_fold(initial, |mut acc, batch| async { let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch From 6c4972cb77108ba7f13dbc4031bff05efd2f13d6 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 21 Mar 2025 10:35:57 +0100 Subject: [PATCH 05/10] chore(hash_join): Drop unnecessary clippy hint \o/ --- datafusion/physical-plan/src/joins/hash_join.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a271442b781c9..211e4b48dddca 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -943,7 +943,6 @@ fn coalesce_partitions_if_needed(plan: Arc) -> Arc Date: Wed, 26 Mar 2025 08:57:21 +0100 Subject: [PATCH 06/10] Back out "refactor: Catch PartitionMode:Auto in execute explicitly" This backs out commit 4346cb868c95bfde3fb23351836157524a06050b. --- datafusion/physical-plan/src/joins/hash_join.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 211e4b48dddca..e3252b048d741 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -791,13 +791,6 @@ impl ExecutionPlan for HashJoinExec { ); } - if self.mode == PartitionMode::Auto { - return plan_err!( - "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", - PartitionMode::Auto - ); - } - let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => { @@ -836,7 +829,12 @@ impl ExecutionPlan for HashJoinExec { 1, )) } - PartitionMode::Auto => unreachable!(), + PartitionMode::Auto => { + return plan_err!( + "Invalid HashJoinExec, unsupported PartitionMode {:?} in execute()", + PartitionMode::Auto + ); + } }; let batch_size = context.session_config().batch_size(); From 3dc02287f27255e4802d8c0678c435844bad517f Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 28 Mar 2025 14:17:15 +0100 Subject: [PATCH 07/10] Remove CoalescePartitions from CollectLeft-HashJoins --- datafusion/physical-plan/src/joins/hash_join.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e3252b048d741..b434683e12ce6 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -794,8 +794,7 @@ impl ExecutionPlan for HashJoinExec { let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => { - let left = coalesce_partitions_if_needed(Arc::clone(&self.left)); - let left_stream = left.execute(0, Arc::clone(&context))?; + let left_stream = self.left.execute(0, Arc::clone(&context))?; self.left_fut.once(|| { let reservation = MemoryConsumer::new("HashJoinInput") @@ -931,14 +930,6 @@ impl ExecutionPlan for HashJoinExec { } } -fn coalesce_partitions_if_needed(plan: Arc) -> Arc { - if plan.output_partitioning().partition_count() == 1 { - plan - } else { - Arc::new(CoalescePartitionsExec::new(plan)) - } -} - /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) async fn collect_left_input( From e96123d55d72bae6559a7e9628232f766ef22250 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 28 Mar 2025 14:24:27 +0100 Subject: [PATCH 08/10] Fix imports --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b434683e12ce6..c35db769d79b1 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -40,7 +40,6 @@ use crate::projection::{ use crate::spill::get_record_batch_memory_size; use crate::ExecutionPlanProperties; use crate::{ - coalesce_partitions::CoalescePartitionsExec, common::can_project, handle_state, hash_utils::create_hashes, @@ -1643,6 +1642,7 @@ impl EmbeddedProjection for HashJoinExec { #[cfg(test)] mod tests { use super::*; + use crate::coalesce_partitions::CoalescePartitionsExec; use crate::test::TestMemoryExec; use crate::{ common, expressions::Column, repartition::RepartitionExec, test::build_table_i32, From e225f7cc89286bbcaf24c596a3b908aabc371725 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 28 Mar 2025 14:32:13 +0100 Subject: [PATCH 09/10] Fix tests --- datafusion/physical-plan/src/joins/hash_join.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c35db769d79b1..8a51946523959 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -2090,6 +2090,7 @@ mod tests { let left = TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None) .unwrap(); + let left = Arc::new(CoalescePartitionsExec::new(left)); let right = build_table( ("a1", &vec![1, 2, 3]), @@ -2162,6 +2163,7 @@ mod tests { let left = TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None) .unwrap(); + let left = Arc::new(CoalescePartitionsExec::new(left)); let right = build_table( ("a2", &vec![20, 30, 10]), ("b2", &vec![5, 6, 4]), From c69c471873ab8c1e6fb216d7dd15b0c9a67a6af9 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Fri, 28 Mar 2025 21:06:07 +0100 Subject: [PATCH 10/10] Check CollectLeft-HJ distribution in execute --- datafusion/physical-plan/src/joins/hash_join.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8a51946523959..fcd5fd5a5a074 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -790,6 +790,13 @@ impl ExecutionPlan for HashJoinExec { ); } + if self.mode == PartitionMode::CollectLeft && left_partitions != 1 { + return internal_err!( + "Invalid HashJoinExec,the output partition count of the left child must be 1 in CollectLeft mode,\ + consider using CoalescePartitionsExec" + ); + } + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => {