Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-expr-common = { workspace = true }
Expand Down
193 changes: 159 additions & 34 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ use crate::{
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::array::{Array, ArrayRef, BooleanBufferBuilder};
use arrow::array::{ArrayRef, BooleanBufferBuilder};
use arrow::compute::concat_batches;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_schema::DataType;
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
ScalarValue,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
use datafusion_expr::Accumulator;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::equivalence::{
join_equivalence_properties, ProjectionMapping,
};
Expand Down Expand Up @@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec {
}
}

/// Compute min/max bounds for each column in the given arrays
fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
arrays
.iter()
.map(|array| {
if array.is_empty() {
// Return NULL values for empty arrays
return Ok(ColumnBounds::new(
ScalarValue::try_from(array.data_type())?,
ScalarValue::try_from(array.data_type())?,
));
/// Accumulator for collecting min/max bounds from build-side data during hash join.
///
/// This struct encapsulates the logic for progressively computing column bounds
/// (minimum and maximum values) for a specific join key expression as batches
/// are processed during the build phase of a hash join.
///
/// The bounds are used for dynamic filter pushdown optimization, where filters
/// based on the actual data ranges can be pushed down to the probe side to
/// eliminate unnecessary data early.
struct CollectLeftAccumulator {
/// The physical expression to evaluate for each batch
expr: Arc<dyn PhysicalExpr>,
/// Accumulator for tracking the minimum value across all batches
min: MinAccumulator,
/// Accumulator for tracking the maximum value across all batches
max: MaxAccumulator,
}

impl CollectLeftAccumulator {
/// Creates a new accumulator for tracking bounds of a join key expression.
///
/// # Arguments
/// * `expr` - The physical expression to track bounds for
/// * `schema` - The schema of the input data
///
/// # Returns
/// A new `CollectLeftAccumulator` instance configured for the expression's data type
fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> {
/// Recursively unwraps dictionary types to get the underlying value type.
fn dictionary_value_type(data_type: &DataType) -> DataType {
match data_type {
DataType::Dictionary(_, value_type) => {
dictionary_value_type(value_type.as_ref())
}
_ => data_type.clone(),
}
}

let data_type = expr
.data_type(schema)
// Min/Max can operate on dictionary data but expect to be initialized with the underlying value type
.map(|dt| dictionary_value_type(&dt))?;
Ok(Self {
expr,
min: MinAccumulator::try_new(&data_type)?,
max: MaxAccumulator::try_new(&data_type)?,
})
}

// Use Arrow kernels for efficient min/max computation
let min_val = min_batch(array)?;
let max_val = max_batch(array)?;
/// Updates the accumulators with values from a new batch.
///
/// Evaluates the expression on the batch and updates both min and max
/// accumulators with the resulting values.
///
/// # Arguments
/// * `batch` - The record batch to process
///
/// # Returns
/// Ok(()) if the update succeeds, or an error if expression evaluation fails
fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> {
let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?;
self.min.update_batch(std::slice::from_ref(&array))?;
self.max.update_batch(std::slice::from_ref(&array))?;
Ok(())
}

Ok(ColumnBounds::new(min_val, max_val))
/// Finalizes the accumulation and returns the computed bounds.
///
/// Consumes self to extract the final min and max values from the accumulators.
///
/// # Returns
/// The `ColumnBounds` containing the minimum and maximum values observed
fn evaluate(mut self) -> Result<ColumnBounds> {
Ok(ColumnBounds::new(
self.min.evaluate()?,
self.max.evaluate()?,
))
}
}

/// State for collecting the build-side data during hash join
struct BuildSideState {
batches: Vec<RecordBatch>,
num_rows: usize,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
bounds_accumulators: Option<Vec<CollectLeftAccumulator>>,
}

impl BuildSideState {
/// Create a new BuildSideState with optional accumulators for bounds computation
fn try_new(
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
on_left: Vec<Arc<dyn PhysicalExpr>>,
schema: &SchemaRef,
should_compute_bounds: bool,
) -> Result<Self> {
Ok(Self {
batches: Vec::new(),
num_rows: 0,
metrics,
reservation,
bounds_accumulators: should_compute_bounds
.then(|| {
on_left
.iter()
.map(|expr| {
CollectLeftAccumulator::try_new(Arc::clone(expr), schema)
})
.collect::<Result<Vec<_>>>()
})
.transpose()?,
})
.collect()
}
}

#[expect(clippy::too_many_arguments)]
/// Collects all batches from the left (build) side stream and creates a hash map for joining.
///
/// This function is responsible for:
Expand Down Expand Up @@ -1239,6 +1334,7 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> {
/// # Returns
/// `JoinLeftData` containing the hash map, consolidated batch, join key values,
/// visited indices bitmap, and computed bounds (if requested).
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
random_state: RandomState,
left_stream: SendableRecordBatchStream,
Expand All @@ -1254,24 +1350,48 @@ async fn collect_left_input(
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, mut reservation) = left_stream
.try_fold(initial, |mut acc, batch| async {
let initial = BuildSideState::try_new(
metrics,
reservation,
on_left.clone(),
&schema,
should_compute_bounds,
)?;

let state = left_stream
.try_fold(initial, |mut state, batch| async move {
// Update accumulators if computing bounds
if let Some(ref mut accumulators) = state.bounds_accumulators {
for accumulator in accumulators {
accumulator.update_batch(&batch)?;
}
}

// Decide if we spill or not
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
acc.3.try_grow(batch_size)?;
state.reservation.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
acc.2.build_input_rows.add(batch.num_rows());
state.metrics.build_mem_used.add(batch_size);
state.metrics.build_input_batches.add(1);
state.metrics.build_input_rows.add(batch.num_rows());
// Update row count
acc.1 += batch.num_rows();
state.num_rows += batch.num_rows();
// Push batch to output
acc.0.push(batch);
Ok(acc)
state.batches.push(batch);
Ok(state)
})
.await?;

// Extract fields from state
let BuildSideState {
batches,
num_rows,
metrics,
mut reservation,
bounds_accumulators,
} = state;

// Estimation of memory size, required for hashtable, prior to allocation.
// Final result can be verified using `RawTable.allocation_info()`
let fixed_size_u32 = size_of::<JoinHashMapU32>();
Expand Down Expand Up @@ -1338,10 +1458,15 @@ async fn collect_left_input(
.collect::<Result<Vec<_>>>()?;

// Compute bounds for dynamic filter if enabled
let bounds = if should_compute_bounds && num_rows > 0 {
Some(compute_bounds(&left_values)?)
} else {
None
let bounds = match bounds_accumulators {
Some(accumulators) if num_rows > 0 => {
let bounds = accumulators
.into_iter()
.map(CollectLeftAccumulator::evaluate)
.collect::<Result<Vec<_>>>()?;
Some(bounds)
}
_ => None,
};

let data = JoinLeftData::new(
Expand Down
Loading