Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ impl HashJoinStream {
if need_produce_result_in_final(self.join_type) {
let mut bitmap = build_side.left_data.visited_indices_bitmap().lock();
left_indices.iter().flatten().for_each(|x| {
bitmap.set_bit(x as usize, true);
bitmap.set_bit(usize::try_from(x).expect("should fit"), true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by

});
}

Expand Down
135 changes: 135 additions & 0 deletions datafusion/physical-plan/src/repartition/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Hash utilities for repartitioning data
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};

use ahash::RandomState;

/// RandomState used for consistent hash partitioning
const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0);

/// Internal hash function used for repartitioning inputs.
/// This is used for partitioned HashJoinExec and partitioned GroupByExec.
/// Currently we use AHash with fixed seeds, but this is subject to change.
/// We make no promises about stability of this function across versions.
/// Currently this is *not* stable across machines since AHash is not stable across platforms,
/// thus this should only be used in a single node context.
#[derive(Debug)]
pub(crate) struct RepartitionHash {
signature: datafusion_expr::Signature,
/// RandomState for consistent hashing - using the same seed as hash joins
random_state: RandomState,
}

impl PartialEq for RepartitionHash {
fn eq(&self, other: &Self) -> bool {
// RandomState doesn't implement PartialEq, so we just compare signatures
self.signature == other.signature
}
}

impl Eq for RepartitionHash {}

impl std::hash::Hash for RepartitionHash {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// Only hash the signature since RandomState doesn't implement Hash
self.signature.hash(state);
}
}

impl RepartitionHash {
/// Create a new RepartitionHash
pub(crate) fn new() -> Self {
Self {
signature: datafusion_expr::Signature::one_of(
vec![datafusion_expr::TypeSignature::VariadicAny],
datafusion_expr::Volatility::Immutable,
),
random_state: REPARTITION_RANDOM_STATE,
}
}
}

impl ScalarUDFImpl for RepartitionHash {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"repartition_hash"
}

fn signature(&self) -> &datafusion_expr::Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
// Always return UInt64Array regardless of input types
Ok(DataType::UInt64)
}

fn invoke_with_args(
&self,
args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
use arrow::array::{Array, UInt64Array};
use datafusion_common::hash_utils::create_hashes;
use std::sync::Arc;

if args.args.is_empty() {
return datafusion_common::plan_err!("repartition_hash requires at least one argument");
}

// Convert all arguments to arrays
let arrays = ColumnarValue::values_to_arrays(&args.args)?;

// Check that all arrays have the same length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ColumnarValue::values_to_arrays does this check already

let array_len = arrays[0].len();
for (i, array) in arrays.iter().enumerate() {
if array.len() != array_len {
return datafusion_common::plan_err!(
"All input arrays must have the same length. Array 0 has length {}, but array {} has length {}",
array_len, i, array.len()
);
}
}

// If no rows, return an empty UInt64Array
if array_len == 0 {
return Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
Vec::<u64>::new(),
))));
}

// Create hash buffer and compute hashes using DataFusion's internal algorithm
let mut hashes_buffer = vec![0u64; array_len];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will flag that previously the same vec was re-used with a capacity bump followed by a clear. Now we're creating a new one for each batch. It's some more allocations, but we're also pre-allocating the entire size, etc. I'm not sure if this will be measurable or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should maintain that behavior if possible. If the primary goal here is to encapsulate re-partitioning logic in this module, can we add a function for this? Something like:

/// Calculates the partition used by the repartition operator for each row. All arrays should have the same length. 
fn compute_partition_indices(buf: &mut Vec<u64>, arrays: &[ArrayRef], num_partitions: usize) -> Result<()> {
        buf.resize(arrays[0].len(), 0);
        create_hashes(arrays, REPARTITION_RANDOM_STATE, buf);
        buf.iter_mut().for_each(|hash| *hash %= num_partitions as u64);
    }

That way the repartitioning code can simply utilize this with the once allocated vector. But the dynamic filter can use the UDF, which still utilizes this under the hood

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed, I think that's better. I'll cook something up.

create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?;

// Return the hash values as a UInt64Array
Ok(ColumnarValue::Array(Arc::new(UInt64Array::from(
hashes_buffer,
))))
}

fn documentation(&self) -> Option<&datafusion_expr::Documentation> {
None
}
}
107 changes: 77 additions & 30 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::hash_utils::create_hashes;
use crate::metrics::BaselineMetrics;
use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec};
use crate::repartition::distributor_channels::{
channels, partition_aware_channels, DistributionReceiver, DistributionSender,
};
use crate::repartition::hash::RepartitionHash;
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, UInt64Array};
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow_schema::{DataType, Field};
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::utils::transpose;
Expand All @@ -52,7 +53,8 @@ use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion_expr::{ColumnarValue, ScalarUDF};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, ScalarFunctionExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;

use crate::filter_pushdown::{
Expand All @@ -65,6 +67,7 @@ use log::trace;
use parking_lot::Mutex;

mod distributor_channels;
pub mod hash;

type MaybeBatch = Option<Result<RecordBatch>>;
type InputPartitionsToCurrentPartitionSender = Vec<DistributionSender<MaybeBatch>>;
Expand Down Expand Up @@ -149,6 +152,7 @@ impl RepartitionExecState {
Ok(())
}

#[expect(clippy::too_many_arguments)]
fn consume_input_streams(
&mut self,
input: Arc<dyn ExecutionPlan>,
Expand All @@ -157,6 +161,7 @@ impl RepartitionExecState {
preserve_order: bool,
name: String,
context: Arc<TaskContext>,
hash: ScalarUDF,
) -> Result<&mut ConsumingInputStreamsState> {
let streams_and_metrics = match self {
RepartitionExecState::NotInitialized => {
Expand Down Expand Up @@ -227,6 +232,7 @@ impl RepartitionExecState {
txs.clone(),
partitioning.clone(),
metrics,
hash.clone(),
));

// In a separate task, wait for each input to be done
Expand Down Expand Up @@ -258,10 +264,9 @@ pub struct BatchPartitioner {

enum BatchPartitionerState {
Hash {
random_state: ahash::RandomState,
exprs: Vec<Arc<dyn PhysicalExpr>>,
hash: Arc<dyn PhysicalExpr>,
num_partitions: usize,
hash_buffer: Vec<u64>,
exprs: Vec<Arc<dyn PhysicalExpr>>,
},
RoundRobin {
num_partitions: usize,
Expand All @@ -281,19 +286,41 @@ impl BatchPartitioner {
next_idx: 0,
}
}
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
exprs,
num_partitions,
// Use fixed random hash
random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
hash_buffer: vec![],
},
Partitioning::Hash(exprs, num_partitions) => {
let hash = ScalarUDF::new_from_impl(RepartitionHash::new());
let name = hash.name().to_string();
BatchPartitionerState::Hash {
hash: Arc::new(ScalarFunctionExpr::new(
&name,
Arc::new(hash),
exprs.clone(),
Arc::new(Field::new(&name, DataType::UInt64, false)),
Arc::new(ConfigOptions::default()),
)),
num_partitions,
exprs,
}
}
other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"),
};

Ok(Self { state, timer })
}

/// Set the hash function to use for hash partitioning.
pub(crate) fn with_hash_function(mut self, hash: ScalarUDF) -> Self {
if let BatchPartitionerState::Hash { hash: h, exprs, .. } = &mut self.state {
let name = hash.name().to_string();
*h = Arc::new(ScalarFunctionExpr::new(
&name,
Arc::new(hash),
exprs.clone(),
Arc::new(Field::new(&name, DataType::UInt64, false)),
Arc::new(ConfigOptions::default()),
));
}
self
}

/// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`]
/// based on the [`Partitioning`] specified on construction
///
Expand Down Expand Up @@ -333,30 +360,36 @@ impl BatchPartitioner {
Box::new(std::iter::once(Ok((idx, batch))))
}
BatchPartitionerState::Hash {
random_state,
exprs,
hash,
num_partitions: partitions,
hash_buffer,
..
} => {
// Tracking time required for distributing indexes across output partitions
let timer = self.timer.timer();

let arrays = exprs
.iter()
.map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

hash_buffer.clear();
hash_buffer.resize(batch.num_rows(), 0);

create_hashes(&arrays, random_state, hash_buffer)?;
let ColumnarValue::Array(hashes) = hash.evaluate(&batch)? else {
return internal_err!(
"Hash partitioning expression did not return an array"
);
};
let Some(hashes) = hashes.as_any().downcast_ref::<UInt64Array>()
else {
return internal_err!(
"Hash partitioning expression did not return a UInt64Array"
);
};

let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize].push(index as u32);
for (index, hash) in hashes.iter().enumerate() {
let Some(hash) = hash else {
return internal_err!(
"Hash partitioning expression returned null value"
);
};
indices[(hash % *partitions as u64) as usize].push(index as u32);
}

// Finished building index-arrays for output partitions
Expand Down Expand Up @@ -486,6 +519,8 @@ pub struct RepartitionExec {
preserve_order: bool,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Hash function used for hash partitioning
hash: ScalarUDF,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -624,7 +659,8 @@ impl ExecutionPlan for RepartitionExec {
let mut repartition = RepartitionExec::try_new(
children.swap_remove(0),
self.partitioning().clone(),
)?;
)?
.with_hash_function(self.hash.clone());
if self.preserve_order {
repartition = repartition.with_preserve_order();
}
Expand Down Expand Up @@ -657,6 +693,7 @@ impl ExecutionPlan for RepartitionExec {
let name = self.name().to_owned();
let schema = self.schema();
let schema_captured = Arc::clone(&schema);
let hash = self.hash.clone();

// Get existing ordering to use for merging
let sort_exprs = self.sort_exprs().cloned();
Expand Down Expand Up @@ -685,6 +722,7 @@ impl ExecutionPlan for RepartitionExec {
preserve_order,
name.clone(),
Arc::clone(&context),
hash.clone(),
)?;

// now return stream for the specified *output* partition which will
Expand Down Expand Up @@ -877,9 +915,16 @@ impl RepartitionExec {
metrics: ExecutionPlanMetricsSet::new(),
preserve_order,
cache,
hash: ScalarUDF::new_from_impl(RepartitionHash::new()),
})
}

/// Set a custom hash function to use for hash partitioning.
pub fn with_hash_function(mut self, hash: ScalarUDF) -> Self {
self.hash = hash;
self
}

fn maintains_input_order_helper(
input: &Arc<dyn ExecutionPlan>,
preserve_order: bool,
Expand Down Expand Up @@ -962,9 +1007,11 @@ impl RepartitionExec {
>,
partitioning: Partitioning,
metrics: RepartitionMetrics,
hash: ScalarUDF,
) -> Result<()> {
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?
.with_hash_function(hash);

// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
Expand Down
Loading