Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 33 additions & 35 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,32 +793,36 @@ impl ExecutionPlan for HashJoinExec {

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.once(|| {
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
collect_left_input(
None,
self.random_state.clone(),
Arc::clone(&self.left),
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
}),
PartitionMode::CollectLeft => {
let left = coalesce_partitions_if_needed(Arc::clone(&self.left));
let left_stream = left.execute(0, Arc::clone(&context))?;

self.left_fut.once(|| {
let reservation = MemoryConsumer::new("HashJoinInput")
.register(context.memory_pool());

collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
})
}
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
Some(partition),
self.random_state.clone(),
Arc::clone(&self.left),
left_stream,
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
Expand Down Expand Up @@ -927,38 +931,32 @@ impl ExecutionPlan for HashJoinExec {
}
}

fn coalesce_partitions_if_needed(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if plan.output_partitioning().partition_count() == 1 {
plan
} else {
Arc::new(CoalescePartitionsExec::new(plan))
}
}

/// Reads the left (build) side of the input, buffering it in memory, to build a
/// hash table (`LeftJoinData`)
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
partition: Option<usize>,
random_state: RandomState,
left: Arc<dyn ExecutionPlan>,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
) -> Result<JoinLeftData> {
let schema = left.schema();

let (left_input, left_input_partition) = if let Some(partition) = partition {
(left, partition)
} else if left.output_partitioning().partition_count() != 1 {
(Arc::new(CoalescePartitionsExec::new(left)) as _, 0)
} else {
(left, 0)
};

// Depending on partition argument load single partition or whole left side in memory
let stream = left_input.execute(left_input_partition, Arc::clone(&context))?;
let schema = left_stream.schema();

// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, mut reservation) = stream
let (batches, num_rows, metrics, mut reservation) = left_stream
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
Expand Down