Skip to content

Commit

Permalink
memory limited hash join (#5490)
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Mar 9, 2023
1 parent ff013e2 commit f5d23ff
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 142 deletions.
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;

/// [`MemoryReservation`] used at query operator level
/// `Option` wrapper allows to initialize empty reservation in operator constructor,
/// and set it to actual reservation at stream level.
pub(crate) type OperatorMemoryReservation = Arc<Mutex<Option<SharedMemoryReservation>>>;

/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
Expand Down
36 changes: 23 additions & 13 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryConsumer;
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::common::{OperatorMemoryReservation, SharedMemoryReservation};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
Expand Down Expand Up @@ -60,6 +60,8 @@ pub struct CrossJoinExec {
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Memory reservation for build-side data
reservation: OperatorMemoryReservation,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -83,6 +85,7 @@ impl CrossJoinExec {
right,
schema,
left_fut: Default::default(),
reservation: Default::default(),
metrics: ExecutionPlanMetricsSet::default(),
}
}
Expand Down Expand Up @@ -221,17 +224,29 @@ impl ExecutionPlan for CrossJoinExec {
let stream = self.right.execute(partition, context.clone())?;

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let reservation = Arc::new(Mutex::new(
MemoryConsumer::new(format!("CrossJoinStream[{partition}]"))
.register(context.memory_pool()),
));

// Initialization of operator-level reservation
{
let mut reservation_lock = self.reservation.lock();
if reservation_lock.is_none() {
*reservation_lock = Some(Arc::new(Mutex::new(
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()),
)));
};
}

let reservation = self.reservation.lock().clone().ok_or_else(|| {
DataFusionError::Internal(
"Operator-level memory reservation is not initialized".to_string(),
)
})?;

let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
reservation.clone(),
reservation,
)
});

Expand All @@ -242,7 +257,6 @@ impl ExecutionPlan for CrossJoinExec {
right_batch: Arc::new(parking_lot::Mutex::new(None)),
left_index: 0,
join_metrics,
reservation,
}))
}

Expand Down Expand Up @@ -346,8 +360,6 @@ struct CrossJoinStream {
right_batch: Arc<parking_lot::Mutex<Option<RecordBatch>>>,
/// join execution metrics
join_metrics: BuildProbeJoinMetrics,
/// memory reservation
reservation: SharedMemoryReservation,
}

impl RecordBatchStream for CrossJoinStream {
Expand Down Expand Up @@ -452,10 +464,7 @@ impl CrossJoinStream {

Some(result)
}
other => {
self.reservation.lock().free();
other
}
other => other,
})
}
}
Expand Down Expand Up @@ -683,6 +692,7 @@ mod tests {
err.to_string(),
"External error: Resources exhausted: Failed to allocate additional"
);
assert_contains!(err.to_string(), "CrossJoinExec");

Ok(())
}
Expand Down
Loading

0 comments on commit f5d23ff

Please sign in to comment.