diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index adc00d9fe75e..2c832cdbf9d3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -533,7 +533,7 @@ impl HashJoinStream { if need_produce_result_in_final(self.join_type) { let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); left_indices.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); + bitmap.set_bit(usize::try_from(x).expect("should fit"), true); }); } diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs new file mode 100644 index 000000000000..48454f6dd806 --- /dev/null +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash utilities for repartitioning data + +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + +use ahash::RandomState; + +/// RandomState used for consistent hash partitioning +const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0); + +/// Internal hash function used for repartitioning inputs. +/// This is used for partitioned HashJoinExec and partitioned GroupByExec. +/// Currently we use AHash with fixed seeds, but this is subject to change. +/// We make no promises about stability of this function across versions. +/// Currently this is *not* stable across machines since AHash is not stable across platforms, +/// thus this should only be used in a single node context. +#[derive(Debug)] +pub(crate) struct RepartitionHash { + signature: datafusion_expr::Signature, + /// RandomState for consistent hashing - using the same seed as hash joins + random_state: RandomState, +} + +impl PartialEq for RepartitionHash { + fn eq(&self, other: &Self) -> bool { + // RandomState doesn't implement PartialEq, so we just compare signatures + self.signature == other.signature + } +} + +impl Eq for RepartitionHash {} + +impl std::hash::Hash for RepartitionHash { + fn hash(&self, state: &mut H) { + // Only hash the signature since RandomState doesn't implement Hash + self.signature.hash(state); + } +} + +impl RepartitionHash { + /// Create a new RepartitionHash + pub(crate) fn new() -> Self { + Self { + signature: datafusion_expr::Signature::one_of( + vec![datafusion_expr::TypeSignature::VariadicAny], + datafusion_expr::Volatility::Immutable, + ), + random_state: REPARTITION_RANDOM_STATE, + } + } +} + +impl ScalarUDFImpl for RepartitionHash { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "repartition_hash" + } + + fn signature(&self) -> &datafusion_expr::Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + // Always return UInt64Array regardless of input types + Ok(DataType::UInt64) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + use arrow::array::{Array, UInt64Array}; + use datafusion_common::hash_utils::create_hashes; + use std::sync::Arc; + + if args.args.is_empty() { + return datafusion_common::plan_err!("repartition_hash requires at least one argument"); + } + + // Convert all arguments to arrays + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + + // Check that all arrays have the same length + let array_len = arrays[0].len(); + for (i, array) in arrays.iter().enumerate() { + if array.len() != array_len { + return datafusion_common::plan_err!( + "All input arrays must have the same length. Array 0 has length {}, but array {} has length {}", + array_len, i, array.len() + ); + } + } + + // If no rows, return an empty UInt64Array + if array_len == 0 { + return Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( + Vec::::new(), + )))); + } + + // Create hash buffer and compute hashes using DataFusion's internal algorithm + let mut hashes_buffer = vec![0u64; array_len]; + create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + + // Return the hash values as a UInt64Array + Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( + hashes_buffer, + )))) + } + + fn documentation(&self) -> Option<&datafusion_expr::Documentation> { + None + } +} diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 22bc1b5cf924..6b49989e3063 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,19 +31,20 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; -use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; +use crate::repartition::hash::RepartitionHash; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; +use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow_schema::{DataType, Field}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::utils::transpose; @@ -52,7 +53,8 @@ use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_expr::{ColumnarValue, ScalarUDF}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, ScalarFunctionExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ @@ -65,6 +67,7 @@ use log::trace; use parking_lot::Mutex; mod distributor_channels; +pub mod hash; type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; @@ -149,6 +152,7 @@ impl RepartitionExecState { Ok(()) } + #[expect(clippy::too_many_arguments)] fn consume_input_streams( &mut self, input: Arc, @@ -157,6 +161,7 @@ impl RepartitionExecState { preserve_order: bool, name: String, context: Arc, + hash: ScalarUDF, ) -> Result<&mut ConsumingInputStreamsState> { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { @@ -227,6 +232,7 @@ impl RepartitionExecState { txs.clone(), partitioning.clone(), metrics, + hash.clone(), )); // In a separate task, wait for each input to be done @@ -258,10 +264,9 @@ pub struct BatchPartitioner { enum BatchPartitionerState { Hash { - random_state: ahash::RandomState, - exprs: Vec>, + hash: Arc, num_partitions: usize, - hash_buffer: Vec, + exprs: Vec>, }, RoundRobin { num_partitions: usize, @@ -281,19 +286,41 @@ impl BatchPartitioner { next_idx: 0, } } - Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { - exprs, - num_partitions, - // Use fixed random hash - random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), - hash_buffer: vec![], - }, + Partitioning::Hash(exprs, num_partitions) => { + let hash = ScalarUDF::new_from_impl(RepartitionHash::new()); + let name = hash.name().to_string(); + BatchPartitionerState::Hash { + hash: Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + exprs.clone(), + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + )), + num_partitions, + exprs, + } + } other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), }; - Ok(Self { state, timer }) } + /// Set the hash function to use for hash partitioning. + pub(crate) fn with_hash_function(mut self, hash: ScalarUDF) -> Self { + if let BatchPartitionerState::Hash { hash: h, exprs, .. } = &mut self.state { + let name = hash.name().to_string(); + *h = Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + exprs.clone(), + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + )); + } + self + } + /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] /// based on the [`Partitioning`] specified on construction /// @@ -333,30 +360,36 @@ impl BatchPartitioner { Box::new(std::iter::once(Ok((idx, batch)))) } BatchPartitionerState::Hash { - random_state, - exprs, + hash, num_partitions: partitions, - hash_buffer, + .. } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) - .collect::>>()?; - - hash_buffer.clear(); - hash_buffer.resize(batch.num_rows(), 0); - - create_hashes(&arrays, random_state, hash_buffer)?; + let ColumnarValue::Array(hashes) = hash.evaluate(&batch)? else { + return internal_err!( + "Hash partitioning expression did not return an array" + ); + }; + let Some(hashes) = hashes.as_any().downcast_ref::() + else { + return internal_err!( + "Hash partitioning expression did not return a UInt64Array" + ); + }; let mut indices: Vec<_> = (0..*partitions) .map(|_| Vec::with_capacity(batch.num_rows())) .collect(); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + for (index, hash) in hashes.iter().enumerate() { + let Some(hash) = hash else { + return internal_err!( + "Hash partitioning expression returned null value" + ); + }; + indices[(hash % *partitions as u64) as usize].push(index as u32); } // Finished building index-arrays for output partitions @@ -486,6 +519,8 @@ pub struct RepartitionExec { preserve_order: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Hash function used for hash partitioning + hash: ScalarUDF, } #[derive(Debug, Clone)] @@ -624,7 +659,8 @@ impl ExecutionPlan for RepartitionExec { let mut repartition = RepartitionExec::try_new( children.swap_remove(0), self.partitioning().clone(), - )?; + )? + .with_hash_function(self.hash.clone()); if self.preserve_order { repartition = repartition.with_preserve_order(); } @@ -657,6 +693,7 @@ impl ExecutionPlan for RepartitionExec { let name = self.name().to_owned(); let schema = self.schema(); let schema_captured = Arc::clone(&schema); + let hash = self.hash.clone(); // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned(); @@ -685,6 +722,7 @@ impl ExecutionPlan for RepartitionExec { preserve_order, name.clone(), Arc::clone(&context), + hash.clone(), )?; // now return stream for the specified *output* partition which will @@ -877,9 +915,16 @@ impl RepartitionExec { metrics: ExecutionPlanMetricsSet::new(), preserve_order, cache, + hash: ScalarUDF::new_from_impl(RepartitionHash::new()), }) } + /// Set a custom hash function to use for hash partitioning. + pub fn with_hash_function(mut self, hash: ScalarUDF) -> Self { + self.hash = hash; + self + } + fn maintains_input_order_helper( input: &Arc, preserve_order: bool, @@ -962,9 +1007,11 @@ impl RepartitionExec { >, partitioning: Partitioning, metrics: RepartitionMetrics, + hash: ScalarUDF, ) -> Result<()> { let mut partitioner = - BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())? + .with_hash_function(hash); // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions();