diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index c86a37697a05..cced9ec686b6 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -40,9 +40,9 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; -use arrow::compute::take_arrays; -use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow::array::{RecordBatch, RecordBatchOptions, UInt32Array}; +use arrow::compute; +use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::transpose; use datafusion_common::HashMap; @@ -192,6 +192,7 @@ enum BatchPartitionerState { exprs: Vec>, num_partitions: usize, hash_buffer: Vec, + indices_buffer: Vec, }, RoundRobin { num_partitions: usize, @@ -216,7 +217,8 @@ impl BatchPartitioner { num_partitions, // Use fixed random hash random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), - hash_buffer: vec![], + hash_buffer: Vec::new(), + indices_buffer: Vec::new(), }, other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), }; @@ -267,6 +269,7 @@ impl BatchPartitioner { exprs, num_partitions: partitions, hash_buffer, + indices_buffer, } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); @@ -281,44 +284,98 @@ impl BatchPartitioner { create_hashes(&arrays, random_state, hash_buffer)?; - let mut indices: Vec<_> = (0..*partitions) - .map(|_| Vec::with_capacity(batch.num_rows())) - .collect(); + indices_buffer.clear(); + indices_buffer.resize(batch.num_rows(), 0); - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + let mut cum_histogram = vec![0; *partitions]; + + if partitions.is_power_of_two() { + for hash in hash_buffer.iter_mut() { + // modulo bit trick: a % (2^k) == a & (2^k - 1) + *hash &= *partitions as u64 - 1; + cum_histogram[*hash as usize] += 1; + } + } else { + for hash in hash_buffer.iter_mut() { + *hash %= *partitions as u64; + cum_histogram[*hash as usize] += 1; + } } - // Finished building index-arrays for output partitions - timer.done(); + for idx in 1..cum_histogram.len() { + cum_histogram[idx] += cum_histogram[idx - 1]; + } - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - let it = indices - .into_iter() - .enumerate() - .filter_map(|(partition, indices)| { - let indices: PrimitiveArray = indices.into(); - (!indices.is_empty()).then_some((partition, indices)) + // *basically* counting sort + for idx in (0..batch.num_rows()).rev() { + let partition = hash_buffer[idx] as usize; + cum_histogram[partition] -= 1; + indices_buffer[cum_histogram[partition]] = idx as u32; + } + + // The cumulative histogram now stores the start of the index range for each partition: + // The indices for partition $i will be stored at indices[cum_histogram[i]..cum_histogram[i + 1]] + // + // Temporarily taking the indices_buffer and will give it back later (pinky promise). + let indices: UInt32Array = std::mem::take(indices_buffer).into(); + + // We now slice up indices by partition so we can use the `take` kernel + let mut partition_indices = Vec::with_capacity(*partitions); + for partition in 0..*partitions { + let start = cum_histogram[partition]; + let end = if partition == *partitions - 1 { + indices.len() + } else { + cum_histogram[partition + 1] + }; + + if start != end { + partition_indices + .push((partition, indices.slice(start, end - start))); + } + } + + // Vector of (partition, partition_length, partition columns) -- the data needed to construct + // an output batch for a partition. + let mut output_batches = partition_indices + .iter() + .map(|(partition, indices)| { + (*partition, indices.len(), Vec::new()) }) - .map(move |(partition, indices)| { - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); + .collect::>(); + + for column in batch.columns() { + for (index, (_, indices)) in partition_indices.iter().enumerate() + { + output_batches[index] + .2 + .push(compute::take(column, indices, None)?); + } + } - // Produce batches based on indices - let columns = take_arrays(batch.columns(), &indices, None)?; + // Release references to the indices buffer so we can put it back into the partitioner's state + drop(partition_indices); + + // Put back indices buffer -- This wont fail because we are in control of all references to the underlying buffers + *indices_buffer = + indices.into_parts().1.into_inner().into_vec().unwrap(); + + // Assume the rest of the function is trivial + timer.done(); - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices.len())); + let it = output_batches.into_iter().map( + move |(partition, length, columns)| { + let options = + RecordBatchOptions::new().with_row_count(Some(length)); let batch = RecordBatch::try_new_with_options( batch.schema(), columns, &options, - ) - .unwrap(); + )?; Ok((partition, batch)) - }); + }, + ); Box::new(it) }