-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Refactor HashJoinExec to progressively accumulate dynamic filter bounds instead of computing them after data is accumulated #17444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2740a7a
25be14a
9cc484f
77787f2
d16b011
671949a
5564824
e6c2b75
b54f71f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,20 +54,21 @@ use crate::{ | |
PlanProperties, SendableRecordBatchStream, Statistics, | ||
}; | ||
|
||
use arrow::array::{Array, ArrayRef, BooleanBufferBuilder}; | ||
use arrow::array::{ArrayRef, BooleanBufferBuilder}; | ||
use arrow::compute::concat_batches; | ||
use arrow::datatypes::SchemaRef; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow::util::bit_util; | ||
use arrow_schema::DataType; | ||
use datafusion_common::config::ConfigOptions; | ||
use datafusion_common::utils::memory::estimate_memory_size; | ||
use datafusion_common::{ | ||
internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result, | ||
ScalarValue, | ||
}; | ||
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; | ||
use datafusion_execution::TaskContext; | ||
use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch}; | ||
use datafusion_expr::Accumulator; | ||
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; | ||
use datafusion_physical_expr::equivalence::{ | ||
join_equivalence_properties, ProjectionMapping, | ||
}; | ||
|
@@ -1188,29 +1189,123 @@ impl ExecutionPlan for HashJoinExec { | |
} | ||
} | ||
|
||
/// Compute min/max bounds for each column in the given arrays | ||
fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> { | ||
arrays | ||
.iter() | ||
.map(|array| { | ||
if array.is_empty() { | ||
// Return NULL values for empty arrays | ||
return Ok(ColumnBounds::new( | ||
ScalarValue::try_from(array.data_type())?, | ||
ScalarValue::try_from(array.data_type())?, | ||
)); | ||
/// Accumulator for collecting min/max bounds from build-side data during hash join. | ||
/// | ||
/// This struct encapsulates the logic for progressively computing column bounds | ||
/// (minimum and maximum values) for a specific join key expression as batches | ||
/// are processed during the build phase of a hash join. | ||
/// | ||
/// The bounds are used for dynamic filter pushdown optimization, where filters | ||
/// based on the actual data ranges can be pushed down to the probe side to | ||
/// eliminate unnecessary data early. | ||
struct CollectLeftAccumulator { | ||
/// The physical expression to evaluate for each batch | ||
expr: Arc<dyn PhysicalExpr>, | ||
/// Accumulator for tracking the minimum value across all batches | ||
min: MinAccumulator, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using the min/max accumulators is a great idea It may also help the symptoms @LiaCastaneda is reporting in As I believe I remember there is a better optimized implementation for Lists in those accumulators than what was here previously |
||
/// Accumulator for tracking the maximum value across all batches | ||
max: MaxAccumulator, | ||
} | ||
|
||
impl CollectLeftAccumulator { | ||
/// Creates a new accumulator for tracking bounds of a join key expression. | ||
/// | ||
/// # Arguments | ||
/// * `expr` - The physical expression to track bounds for | ||
/// * `schema` - The schema of the input data | ||
/// | ||
/// # Returns | ||
/// A new `CollectLeftAccumulator` instance configured for the expression's data type | ||
fn try_new(expr: Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> Result<Self> { | ||
/// Recursively unwraps dictionary types to get the underlying value type. | ||
fn dictionary_value_type(data_type: &DataType) -> DataType { | ||
match data_type { | ||
DataType::Dictionary(_, value_type) => { | ||
dictionary_value_type(value_type.as_ref()) | ||
} | ||
_ => data_type.clone(), | ||
} | ||
} | ||
|
||
let data_type = expr | ||
.data_type(schema) | ||
// Min/Max can operate on dictionary data but expect to be initialized with the underlying value type | ||
.map(|dt| dictionary_value_type(&dt))?; | ||
Ok(Self { | ||
expr, | ||
min: MinAccumulator::try_new(&data_type)?, | ||
max: MaxAccumulator::try_new(&data_type)?, | ||
}) | ||
} | ||
|
||
// Use Arrow kernels for efficient min/max computation | ||
let min_val = min_batch(array)?; | ||
let max_val = max_batch(array)?; | ||
Comment on lines
-1204
to
-1206
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the accumulator |
||
/// Updates the accumulators with values from a new batch. | ||
/// | ||
/// Evaluates the expression on the batch and updates both min and max | ||
/// accumulators with the resulting values. | ||
/// | ||
/// # Arguments | ||
/// * `batch` - The record batch to process | ||
/// | ||
/// # Returns | ||
/// Ok(()) if the update succeeds, or an error if expression evaluation fails | ||
fn update_batch(&mut self, batch: &RecordBatch) -> Result<()> { | ||
let array = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; | ||
self.min.update_batch(std::slice::from_ref(&array))?; | ||
self.max.update_batch(std::slice::from_ref(&array))?; | ||
Ok(()) | ||
} | ||
|
||
Ok(ColumnBounds::new(min_val, max_val)) | ||
/// Finalizes the accumulation and returns the computed bounds. | ||
/// | ||
/// Consumes self to extract the final min and max values from the accumulators. | ||
/// | ||
/// # Returns | ||
/// The `ColumnBounds` containing the minimum and maximum values observed | ||
fn evaluate(mut self) -> Result<ColumnBounds> { | ||
Ok(ColumnBounds::new( | ||
self.min.evaluate()?, | ||
self.max.evaluate()?, | ||
)) | ||
} | ||
} | ||
|
||
/// State for collecting the build-side data during hash join | ||
struct BuildSideState { | ||
batches: Vec<RecordBatch>, | ||
num_rows: usize, | ||
metrics: BuildProbeJoinMetrics, | ||
reservation: MemoryReservation, | ||
bounds_accumulators: Option<Vec<CollectLeftAccumulator>>, | ||
} | ||
|
||
impl BuildSideState { | ||
/// Create a new BuildSideState with optional accumulators for bounds computation | ||
fn try_new( | ||
metrics: BuildProbeJoinMetrics, | ||
reservation: MemoryReservation, | ||
on_left: Vec<Arc<dyn PhysicalExpr>>, | ||
schema: &SchemaRef, | ||
should_compute_bounds: bool, | ||
) -> Result<Self> { | ||
Ok(Self { | ||
batches: Vec::new(), | ||
num_rows: 0, | ||
metrics, | ||
reservation, | ||
bounds_accumulators: should_compute_bounds | ||
.then(|| { | ||
on_left | ||
.iter() | ||
.map(|expr| { | ||
CollectLeftAccumulator::try_new(Arc::clone(expr), schema) | ||
}) | ||
.collect::<Result<Vec<_>>>() | ||
}) | ||
.transpose()?, | ||
}) | ||
.collect() | ||
} | ||
} | ||
|
||
#[expect(clippy::too_many_arguments)] | ||
/// Collects all batches from the left (build) side stream and creates a hash map for joining. | ||
/// | ||
/// This function is responsible for: | ||
|
@@ -1239,6 +1334,7 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<ColumnBounds>> { | |
/// # Returns | ||
/// `JoinLeftData` containing the hash map, consolidated batch, join key values, | ||
/// visited indices bitmap, and computed bounds (if requested). | ||
#[allow(clippy::too_many_arguments)] | ||
async fn collect_left_input( | ||
random_state: RandomState, | ||
left_stream: SendableRecordBatchStream, | ||
|
@@ -1254,24 +1350,48 @@ async fn collect_left_input( | |
// This operation performs 2 steps at once: | ||
// 1. creates a [JoinHashMap] of all batches from the stream | ||
// 2. stores the batches in a vector. | ||
let initial = (Vec::new(), 0, metrics, reservation); | ||
let (batches, num_rows, metrics, mut reservation) = left_stream | ||
.try_fold(initial, |mut acc, batch| async { | ||
let initial = BuildSideState::try_new( | ||
metrics, | ||
reservation, | ||
on_left.clone(), | ||
&schema, | ||
should_compute_bounds, | ||
)?; | ||
|
||
let state = left_stream | ||
.try_fold(initial, |mut state, batch| async move { | ||
// Update accumulators if computing bounds | ||
if let Some(ref mut accumulators) = state.bounds_accumulators { | ||
for accumulator in accumulators { | ||
accumulator.update_batch(&batch)?; | ||
} | ||
} | ||
|
||
// Decide if we spill or not | ||
let batch_size = get_record_batch_memory_size(&batch); | ||
// Reserve memory for incoming batch | ||
acc.3.try_grow(batch_size)?; | ||
state.reservation.try_grow(batch_size)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really like this |
||
// Update metrics | ||
acc.2.build_mem_used.add(batch_size); | ||
acc.2.build_input_batches.add(1); | ||
acc.2.build_input_rows.add(batch.num_rows()); | ||
state.metrics.build_mem_used.add(batch_size); | ||
state.metrics.build_input_batches.add(1); | ||
state.metrics.build_input_rows.add(batch.num_rows()); | ||
// Update row count | ||
acc.1 += batch.num_rows(); | ||
state.num_rows += batch.num_rows(); | ||
// Push batch to output | ||
acc.0.push(batch); | ||
Ok(acc) | ||
state.batches.push(batch); | ||
Ok(state) | ||
}) | ||
.await?; | ||
|
||
// Extract fields from state | ||
let BuildSideState { | ||
batches, | ||
num_rows, | ||
metrics, | ||
mut reservation, | ||
bounds_accumulators, | ||
} = state; | ||
|
||
// Estimation of memory size, required for hashtable, prior to allocation. | ||
// Final result can be verified using `RawTable.allocation_info()` | ||
let fixed_size_u32 = size_of::<JoinHashMapU32>(); | ||
|
@@ -1338,10 +1458,15 @@ async fn collect_left_input( | |
.collect::<Result<Vec<_>>>()?; | ||
|
||
// Compute bounds for dynamic filter if enabled | ||
let bounds = if should_compute_bounds && num_rows > 0 { | ||
Some(compute_bounds(&left_values)?) | ||
} else { | ||
None | ||
let bounds = match bounds_accumulators { | ||
Some(accumulators) if num_rows > 0 => { | ||
let bounds = accumulators | ||
.into_iter() | ||
.map(CollectLeftAccumulator::evaluate) | ||
.collect::<Result<Vec<_>>>()?; | ||
Some(bounds) | ||
} | ||
_ => None, | ||
}; | ||
|
||
let data = JoinLeftData::new( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry about bringing in all of the aggregate functions for all physical plans
I would personally prefer to have Min/Max accumulators moved into
datafusion-functions-aggregate-common
and avoid this new dependencyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I opened #17492 which also removes the dep for the parquet crate. However I might merge this PR first and do it as a followup since the followup will be just changing an import.