From 8b642cbe1d4b13ef7b7655e5173fbc22741ecc71 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 15:27:09 -0400 Subject: [PATCH 1/7] make hash function in RepartitionExec configurable --- datafusion-testing | 2 +- .../src/joins/hash_join/stream.rs | 2 +- .../physical-plan/src/repartition/hash.rs | 140 ++++++++++++++++++ .../physical-plan/src/repartition/mod.rs | 106 +++++++++---- 4 files changed, 219 insertions(+), 31 deletions(-) create mode 100644 datafusion/physical-plan/src/repartition/hash.rs diff --git a/datafusion-testing b/datafusion-testing index 905df5f65cc9..307079bf7929 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 905df5f65cc9d0851719c21f5a4dd5cd77621f19 +Subproject commit 307079bf7929b62d06864662b3aecfb2e1340c81 diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index adc00d9fe75e..2c832cdbf9d3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -533,7 +533,7 @@ impl HashJoinStream { if need_produce_result_in_final(self.join_type) { let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); left_indices.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); + bitmap.set_bit(usize::try_from(x).expect("should fit"), true); }); } diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs new file mode 100644 index 000000000000..15fe34f537a3 --- /dev/null +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Hash utilities for repartitioning data + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl}; + +use ahash::RandomState; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; + +/// Internal hash function used for repartitioning inputs. +/// This is used for partitioned HashJoinExec and partitioned GroupByExec. +/// Currently we use AHash with fixed seeds, but this is subject to change. +/// We make no promises about stability of this function across versions. +/// Currently this is *not* stable across machines since AHash is not stable across platforms, +/// thus this should only be used in a single node context. +#[derive(Debug)] +pub(crate) struct RepartitionHash { + signature: datafusion_expr::Signature, + /// RandomState for consistent hashing - using the same seed as hash joins + random_state: RandomState, +} + +impl PartialEq for RepartitionHash { + fn eq(&self, other: &Self) -> bool { + // RandomState doesn't implement PartialEq, so we just compare signatures + self.signature == other.signature + } +} + +impl Eq for RepartitionHash {} + +impl std::hash::Hash for RepartitionHash { + fn hash(&self, state: &mut H) { + // Only hash the signature since RandomState doesn't implement Hash + self.signature.hash(state); + } +} + +impl RepartitionHash { + /// Create a new RepartitionHash + pub(crate) fn new() -> Self { + Self { + signature: datafusion_expr::Signature::one_of( + vec![datafusion_expr::TypeSignature::VariadicAny], + datafusion_expr::Volatility::Immutable, + ), + random_state: REPARTITION_RANDOM_STATE, + } + } +} + +impl ScalarUDFImpl for RepartitionHash { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "repartition_hash" + } + + fn signature(&self) -> &datafusion_expr::Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + // Always return UInt64Array regardless of input types + Ok(DataType::UInt64) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + use arrow::array::{Array, UInt64Array}; + use datafusion_common::hash_utils::create_hashes; + use std::sync::Arc; + + if args.args.is_empty() { + return datafusion_common::plan_err!("hash requires at least one argument"); + } + + // Convert all arguments to arrays + let arrays = ColumnarValue::values_to_arrays(&args.args)?; + + // Check that all arrays have the same length + let array_len = arrays[0].len(); + for (i, array) in arrays.iter().enumerate() { + if array.len() != array_len { + return datafusion_common::plan_err!( + "All input arrays must have the same length. Array 0 has length {}, but array {} has length {}", + array_len, i, array.len() + ); + } + } + + // If no rows, return an empty UInt64Array + if array_len == 0 { + return Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( + Vec::::new(), + )))); + } + + // Create hash buffer and compute hashes using DataFusion's internal algorithm + let mut hashes_buffer = vec![0u64; array_len]; + create_hashes(&arrays, &self.random_state, &mut hashes_buffer)?; + + // Return the hash values as a UInt64Array + Ok(ColumnarValue::Array(Arc::new(UInt64Array::from( + hashes_buffer, + )))) + } + + fn documentation(&self) -> Option<&datafusion_expr::Documentation> { + None + } +} + +/// RandomState used by RepartitionExec for consistent hash partitioning +/// This must match the seeds used in RepartitionExec to ensure our hash-based +/// filter expressions compute the same partition assignments as the actual partitioning +const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 22bc1b5cf924..2aeb95c2189d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,19 +31,20 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; -use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; use crate::repartition::distributor_channels::{ channels, partition_aware_channels, DistributionReceiver, DistributionSender, }; +use crate::repartition::hash::RepartitionHash; use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; +use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow_schema::{DataType, Field}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::utils::transpose; @@ -52,7 +53,8 @@ use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; +use datafusion_expr::{ColumnarValue, ScalarUDF}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, ScalarFunctionExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ @@ -65,6 +67,7 @@ use log::trace; use parking_lot::Mutex; mod distributor_channels; +pub mod hash; type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; @@ -157,6 +160,7 @@ impl RepartitionExecState { preserve_order: bool, name: String, context: Arc, + hash: ScalarUDF, ) -> Result<&mut ConsumingInputStreamsState> { let streams_and_metrics = match self { RepartitionExecState::NotInitialized => { @@ -227,6 +231,7 @@ impl RepartitionExecState { txs.clone(), partitioning.clone(), metrics, + hash.clone(), )); // In a separate task, wait for each input to be done @@ -258,10 +263,9 @@ pub struct BatchPartitioner { enum BatchPartitionerState { Hash { - random_state: ahash::RandomState, - exprs: Vec>, + hash: Arc, num_partitions: usize, - hash_buffer: Vec, + exprs: Vec>, }, RoundRobin { num_partitions: usize, @@ -281,19 +285,44 @@ impl BatchPartitioner { next_idx: 0, } } - Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { - exprs, - num_partitions, - // Use fixed random hash - random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), - hash_buffer: vec![], - }, + Partitioning::Hash(exprs, num_partitions) => { + let hash = ScalarUDF::new_from_impl(RepartitionHash::new()); + let name = hash.name().to_string(); + BatchPartitionerState::Hash { + hash: Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + exprs.clone(), + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + )), + num_partitions, + exprs, + } + } other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), }; - Ok(Self { state, timer }) } + /// Set the hash function to use for hash partitioning. + pub(crate) fn with_hash_function(mut self, hash: ScalarUDF) -> Self { + match &mut self.state { + BatchPartitionerState::Hash { hash: h, exprs, .. } => { + let name = hash.name().to_string(); + *h = Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + exprs.clone(), + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + )); + } + _ => {} + } + self + } + /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] /// based on the [`Partitioning`] specified on construction /// @@ -333,30 +362,36 @@ impl BatchPartitioner { Box::new(std::iter::once(Ok((idx, batch)))) } BatchPartitionerState::Hash { - random_state, - exprs, + hash, num_partitions: partitions, - hash_buffer, + .. } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&batch)?.into_array(batch.num_rows())) - .collect::>>()?; - - hash_buffer.clear(); - hash_buffer.resize(batch.num_rows(), 0); - - create_hashes(&arrays, random_state, hash_buffer)?; + let ColumnarValue::Array(hashes) = hash.evaluate(&batch)? else { + return internal_err!( + "Hash partitioning expression did not return an array" + ); + }; + let Some(hashes) = hashes.as_any().downcast_ref::() + else { + return internal_err!( + "Hash partitioning expression did not return a UInt64Array" + ); + }; let mut indices: Vec<_> = (0..*partitions) .map(|_| Vec::with_capacity(batch.num_rows())) .collect(); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + for (index, hash) in hashes.iter().enumerate() { + let Some(hash) = hash else { + return internal_err!( + "Hash partitioning expression returned null value" + ); + }; + indices[(hash % *partitions as u64) as usize].push(index as u32); } // Finished building index-arrays for output partitions @@ -486,6 +521,8 @@ pub struct RepartitionExec { preserve_order: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Hash function used for hash partitioning + hash: ScalarUDF, } #[derive(Debug, Clone)] @@ -657,6 +694,7 @@ impl ExecutionPlan for RepartitionExec { let name = self.name().to_owned(); let schema = self.schema(); let schema_captured = Arc::clone(&schema); + let hash = self.hash.clone(); // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned(); @@ -685,6 +723,7 @@ impl ExecutionPlan for RepartitionExec { preserve_order, name.clone(), Arc::clone(&context), + hash.clone(), )?; // now return stream for the specified *output* partition which will @@ -877,9 +916,16 @@ impl RepartitionExec { metrics: ExecutionPlanMetricsSet::new(), preserve_order, cache, + hash: ScalarUDF::new_from_impl(RepartitionHash::new()), }) } + /// Set a custom hash function to use for hash partitioning. + pub fn with_hash_function(mut self, hash: ScalarUDF) -> Self { + self.hash = hash; + self + } + fn maintains_input_order_helper( input: &Arc, preserve_order: bool, @@ -962,9 +1008,11 @@ impl RepartitionExec { >, partitioning: Partitioning, metrics: RepartitionMetrics, + hash: ScalarUDF, ) -> Result<()> { let mut partitioner = - BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())? + .with_hash_function(hash); // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); From 711746fa78293e1d0e14913a36683566137e7c17 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 15:28:58 -0400 Subject: [PATCH 2/7] fix imports --- datafusion/physical-plan/src/repartition/hash.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index 15fe34f537a3..e6bf42b648ca 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -17,14 +17,11 @@ //! Hash utilities for repartitioning data -use std::sync::Arc; - -use arrow::datatypes::{DataType, Field}; -use datafusion_common::{config::ConfigOptions, Result}; -use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl}; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; use ahash::RandomState; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; /// Internal hash function used for repartitioning inputs. /// This is used for partitioned HashJoinExec and partitioned GroupByExec. From ca3ba01240b3664cf18aa07bf2d6d08adcbb9562 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 16:27:15 -0400 Subject: [PATCH 3/7] fix --- .../physical-plan/src/repartition/hash.rs | 23 ++++++++++++++--- .../physical-plan/src/repartition/mod.rs | 25 +++++++++---------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index e6bf42b648ca..81299f999db2 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -17,11 +17,14 @@ //! Hash utilities for repartitioning data -use arrow::datatypes::DataType; -use datafusion_common::Result; -use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl}; use ahash::RandomState; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; /// Internal hash function used for repartitioning inputs. /// This is used for partitioned HashJoinExec and partitioned GroupByExec. @@ -65,6 +68,20 @@ impl RepartitionHash { } } +pub(crate) fn repartition_hash( + args: Vec, +) -> Result> { + let hash = ScalarUDF::new_from_impl(RepartitionHash::new()); + let name = hash.name().to_string(); + Ok(Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + args, + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + ))) +} + impl ScalarUDFImpl for RepartitionHash { fn as_any(&self) -> &dyn std::any::Any { self diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 2aeb95c2189d..6b49989e3063 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -152,6 +152,7 @@ impl RepartitionExecState { Ok(()) } + #[expect(clippy::too_many_arguments)] fn consume_input_streams( &mut self, input: Arc, @@ -307,18 +308,15 @@ impl BatchPartitioner { /// Set the hash function to use for hash partitioning. pub(crate) fn with_hash_function(mut self, hash: ScalarUDF) -> Self { - match &mut self.state { - BatchPartitionerState::Hash { hash: h, exprs, .. } => { - let name = hash.name().to_string(); - *h = Arc::new(ScalarFunctionExpr::new( - &name, - Arc::new(hash), - exprs.clone(), - Arc::new(Field::new(&name, DataType::UInt64, false)), - Arc::new(ConfigOptions::default()), - )); - } - _ => {} + if let BatchPartitionerState::Hash { hash: h, exprs, .. } = &mut self.state { + let name = hash.name().to_string(); + *h = Arc::new(ScalarFunctionExpr::new( + &name, + Arc::new(hash), + exprs.clone(), + Arc::new(Field::new(&name, DataType::UInt64, false)), + Arc::new(ConfigOptions::default()), + )); } self } @@ -661,7 +659,8 @@ impl ExecutionPlan for RepartitionExec { let mut repartition = RepartitionExec::try_new( children.swap_remove(0), self.partitioning().clone(), - )?; + )? + .with_hash_function(self.hash.clone()); if self.preserve_order { repartition = repartition.with_preserve_order(); } From c28abe17ad8ec89bc6618be99d6cb3925d207bc7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:00:01 -0400 Subject: [PATCH 4/7] fix lints --- datafusion-testing | 2 +- .../physical-plan/src/repartition/hash.rs | 23 +++---------------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index 307079bf7929..905df5f65cc9 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 307079bf7929b62d06864662b3aecfb2e1340c81 +Subproject commit 905df5f65cc9d0851719c21f5a4dd5cd77621f19 diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index 81299f999db2..e6bf42b648ca 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -17,14 +17,11 @@ //! Hash utilities for repartitioning data -use std::sync::Arc; - -use arrow::datatypes::{DataType, Field}; -use datafusion_common::{config::ConfigOptions, Result}; -use datafusion_expr::{ColumnarValue, ScalarUDF, ScalarUDFImpl}; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; use ahash::RandomState; -use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; /// Internal hash function used for repartitioning inputs. /// This is used for partitioned HashJoinExec and partitioned GroupByExec. @@ -68,20 +65,6 @@ impl RepartitionHash { } } -pub(crate) fn repartition_hash( - args: Vec, -) -> Result> { - let hash = ScalarUDF::new_from_impl(RepartitionHash::new()); - let name = hash.name().to_string(); - Ok(Arc::new(ScalarFunctionExpr::new( - &name, - Arc::new(hash), - args, - Arc::new(Field::new(&name, DataType::UInt64, false)), - Arc::new(ConfigOptions::default()), - ))) -} - impl ScalarUDFImpl for RepartitionHash { fn as_any(&self) -> &dyn std::any::Any { self From 8fe519d6219ba7855934026ecc45e2cc15dc762b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:46:21 -0400 Subject: [PATCH 5/7] Update datafusion/physical-plan/src/repartition/hash.rs Co-authored-by: Jonathan Chen --- datafusion/physical-plan/src/repartition/hash.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index e6bf42b648ca..e80fe4e6b448 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -92,7 +92,7 @@ impl ScalarUDFImpl for RepartitionHash { use std::sync::Arc; if args.args.is_empty() { - return datafusion_common::plan_err!("hash requires at least one argument"); + return datafusion_common::plan_err!("repartition_hash requires at least one argument"); } // Convert all arguments to arrays From a46c929cd1dcf127ff53a0da967a23ec8431c004 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:09:10 -0400 Subject: [PATCH 6/7] fix comment --- datafusion/physical-plan/src/repartition/hash.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index e80fe4e6b448..177509d9befa 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -131,7 +131,5 @@ impl ScalarUDFImpl for RepartitionHash { } } -/// RandomState used by RepartitionExec for consistent hash partitioning -/// This must match the seeds used in RepartitionExec to ensure our hash-based -/// filter expressions compute the same partition assignments as the actual partitioning +/// RandomState used for consistent hash partitioning const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0); From 7afdeea3eb1db9b131798ddc6210d8f041492add Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:46:55 -0400 Subject: [PATCH 7/7] move --- datafusion/physical-plan/src/repartition/hash.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/hash.rs b/datafusion/physical-plan/src/repartition/hash.rs index 177509d9befa..48454f6dd806 100644 --- a/datafusion/physical-plan/src/repartition/hash.rs +++ b/datafusion/physical-plan/src/repartition/hash.rs @@ -23,6 +23,9 @@ use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; use ahash::RandomState; +/// RandomState used for consistent hash partitioning +const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0); + /// Internal hash function used for repartitioning inputs. /// This is used for partitioned HashJoinExec and partitioned GroupByExec. /// Currently we use AHash with fixed seeds, but this is subject to change. @@ -130,6 +133,3 @@ impl ScalarUDFImpl for RepartitionHash { None } } - -/// RandomState used for consistent hash partitioning -const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 0);