From a13390a25e569b86e9b03ecda354a9dda5f4a904 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 15:44:41 +0200 Subject: [PATCH 1/6] Make repartition more cache-efficient Instead of gathering by partition in the outer loop and by column in the inner loop, we gather by column in the outer loop and by column in the inner. This should be more cache efficient in theory: Every column should only be loaded into cache once. The partition indices may need to be loaded multiple times, but those are fairly compact. Future work may split the inner loop for large partition values. --- .../physical-plan/src/repartition/mod.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index c86a37697a05..9247cf81e54e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -41,7 +41,7 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; -use arrow::compute::take_arrays; +use arrow::compute; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::transpose; @@ -292,30 +292,37 @@ impl BatchPartitioner { // Finished building index-arrays for output partitions timer.done(); - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - let it = indices + let indices = indices .into_iter() .enumerate() .filter_map(|(partition, indices)| { let indices: PrimitiveArray = indices.into(); (!indices.is_empty()).then_some((partition, indices)) }) - .map(move |(partition, indices)| { - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); + .collect::>(); - // Produce batches based on indices - let columns = take_arrays(batch.columns(), &indices, None)?; + let mut output_batch_columns = (0..indices.len()) + .map(|_| Vec::with_capacity(batch.num_columns())) + .collect::>(); - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices.len())); + for column in batch.columns() { + for (index, (_, indices)) in indices.iter().enumerate() { + output_batch_columns[index] + .push(compute::take(column, indices, None)?); + } + } + + let it = output_batch_columns + .into_iter() + .zip(indices.into_iter()) + .map(move |(columns, (partition, indices))| { + let options = RecordBatchOptions::new() + .with_row_count(Some(indices.len())); let batch = RecordBatch::try_new_with_options( batch.schema(), columns, &options, - ) - .unwrap(); + )?; Ok((partition, batch)) }); From 82222b68deec1e26006a84c1d87bc62e4b3ebb0b Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 16:11:44 +0200 Subject: [PATCH 2/6] repartition: Share allocation between partition indices This change lays out the indices for the partitions in a single contiguous memory area. Since we process these partitions in increasing order, we implicitly walk this vector from start to end - very cache friendly behaviour. --- .../physical-plan/src/repartition/mod.rs | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 9247cf81e54e..860bda3cafec 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -40,9 +40,9 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; -use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; +use arrow::array::{RecordBatch, RecordBatchOptions, UInt32Array}; use arrow::compute; -use arrow::datatypes::{SchemaRef, UInt32Type}; +use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::transpose; use datafusion_common::HashMap; @@ -281,32 +281,50 @@ impl BatchPartitioner { create_hashes(&arrays, random_state, hash_buffer)?; - let mut indices: Vec<_> = (0..*partitions) - .map(|_| Vec::with_capacity(batch.num_rows())) - .collect(); + let mut indices = Vec::from_iter(0..batch.num_rows() as u32); + let mut cum_histogram = vec![0; *partitions]; - for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize].push(index as u32); + for hash in hash_buffer.iter_mut() { + *hash %= *partitions as u64; + cum_histogram[*hash as usize] += 1; + } + + for idx in 1..cum_histogram.len() { + cum_histogram[idx] += cum_histogram[idx - 1]; + } + + indices.sort_unstable_by_key(|idx| hash_buffer[*idx as usize]); + + // The cumulative histogram now stores the ends of the index range for each partition: + // The indices for partition $i will be stored at indices[hist[i - 1]..hist[i]] + let indices: UInt32Array = indices.into(); + + // We now slice up indices by partition so we can use the `take` kernel + let mut partition_indices = Vec::with_capacity(*partitions); + for partition in 0..*partitions { + let end = cum_histogram[partition]; + let start = if partition == 0 { + 0 + } else { + cum_histogram[partition - 1] + }; + + if start != end { + partition_indices + .push((partition, indices.slice(start, end - start))); + } } // Finished building index-arrays for output partitions timer.done(); - let indices = indices - .into_iter() - .enumerate() - .filter_map(|(partition, indices)| { - let indices: PrimitiveArray = indices.into(); - (!indices.is_empty()).then_some((partition, indices)) - }) - .collect::>(); - - let mut output_batch_columns = (0..indices.len()) + let mut output_batch_columns = (0..partition_indices.len()) .map(|_| Vec::with_capacity(batch.num_columns())) .collect::>(); for column in batch.columns() { - for (index, (_, indices)) in indices.iter().enumerate() { + for (index, (_, indices)) in partition_indices.iter().enumerate() + { output_batch_columns[index] .push(compute::take(column, indices, None)?); } @@ -314,7 +332,7 @@ impl BatchPartitioner { let it = output_batch_columns .into_iter() - .zip(indices.into_iter()) + .zip(partition_indices.into_iter()) .map(move |(columns, (partition, indices))| { let options = RecordBatchOptions::new() .with_row_count(Some(indices.len())); From dd3e69fc290696ef629e4c76f061fef45f333be8 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 16:25:04 +0200 Subject: [PATCH 3/6] repartition: Track time for take kernel --- datafusion/physical-plan/src/repartition/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 860bda3cafec..206538bfac86 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -315,8 +315,6 @@ impl BatchPartitioner { } } - // Finished building index-arrays for output partitions - timer.done(); let mut output_batch_columns = (0..partition_indices.len()) .map(|_| Vec::with_capacity(batch.num_columns())) @@ -330,6 +328,8 @@ impl BatchPartitioner { } } + timer.done(); + let it = output_batch_columns .into_iter() .zip(partition_indices.into_iter()) From 4feecd2ffd36f3c89af6991a02daf15ef59cba42 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 16:43:28 +0200 Subject: [PATCH 4/6] repartition: Use counting sort approach for indices --- .../physical-plan/src/repartition/mod.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 206538bfac86..007f568a1bfb 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -293,20 +293,25 @@ impl BatchPartitioner { cum_histogram[idx] += cum_histogram[idx - 1]; } - indices.sort_unstable_by_key(|idx| hash_buffer[*idx as usize]); + // *basically* counting sort + for idx in (0..batch.num_rows()).rev() { + let partition = hash_buffer[idx] as usize; + cum_histogram[partition] -= 1; + indices[cum_histogram[partition]] = idx as u32; + } - // The cumulative histogram now stores the ends of the index range for each partition: - // The indices for partition $i will be stored at indices[hist[i - 1]..hist[i]] + // The cumulative histogram now stores the start of the index range for each partition: + // The indices for partition $i will be stored at indices[cum_histogram[i]..cum_histogram[i + 1]] let indices: UInt32Array = indices.into(); // We now slice up indices by partition so we can use the `take` kernel let mut partition_indices = Vec::with_capacity(*partitions); for partition in 0..*partitions { - let end = cum_histogram[partition]; - let start = if partition == 0 { - 0 + let start = cum_histogram[partition]; + let end = if partition == *partitions - 1 { + indices.len() } else { - cum_histogram[partition - 1] + cum_histogram[partition + 1] }; if start != end { @@ -315,7 +320,6 @@ impl BatchPartitioner { } } - let mut output_batch_columns = (0..partition_indices.len()) .map(|_| Vec::with_capacity(batch.num_columns())) .collect::>(); From bf25d38b5078fac5a68a1e6f45a929f78233609c Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 17:01:35 +0200 Subject: [PATCH 5/6] repartition: Reuse indices buffer --- .../physical-plan/src/repartition/mod.rs | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 007f568a1bfb..6f68462adc80 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -192,6 +192,7 @@ enum BatchPartitionerState { exprs: Vec>, num_partitions: usize, hash_buffer: Vec, + indices_buffer: Vec, }, RoundRobin { num_partitions: usize, @@ -216,7 +217,8 @@ impl BatchPartitioner { num_partitions, // Use fixed random hash random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), - hash_buffer: vec![], + hash_buffer: Vec::new(), + indices_buffer: Vec::new(), }, other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), }; @@ -267,6 +269,7 @@ impl BatchPartitioner { exprs, num_partitions: partitions, hash_buffer, + indices_buffer, } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); @@ -281,7 +284,9 @@ impl BatchPartitioner { create_hashes(&arrays, random_state, hash_buffer)?; - let mut indices = Vec::from_iter(0..batch.num_rows() as u32); + indices_buffer.clear(); + indices_buffer.resize(batch.num_rows(), 0); + let mut cum_histogram = vec![0; *partitions]; for hash in hash_buffer.iter_mut() { @@ -297,12 +302,14 @@ impl BatchPartitioner { for idx in (0..batch.num_rows()).rev() { let partition = hash_buffer[idx] as usize; cum_histogram[partition] -= 1; - indices[cum_histogram[partition]] = idx as u32; + indices_buffer[cum_histogram[partition]] = idx as u32; } // The cumulative histogram now stores the start of the index range for each partition: // The indices for partition $i will be stored at indices[cum_histogram[i]..cum_histogram[i + 1]] - let indices: UInt32Array = indices.into(); + // + // Temporarily taking the indices_buffer and will give it back later (pinky promise). + let indices: UInt32Array = std::mem::take(indices_buffer).into(); // We now slice up indices by partition so we can use the `take` kernel let mut partition_indices = Vec::with_capacity(*partitions); @@ -320,26 +327,38 @@ impl BatchPartitioner { } } - let mut output_batch_columns = (0..partition_indices.len()) - .map(|_| Vec::with_capacity(batch.num_columns())) + // Vector of (partition, partition_length, partition columns) -- the data needed to construct + // an output batch for a partition. + let mut output_batches = partition_indices + .iter() + .map(|(partition, indices)| { + (*partition, indices.len(), Vec::new()) + }) .collect::>(); for column in batch.columns() { for (index, (_, indices)) in partition_indices.iter().enumerate() { - output_batch_columns[index] + output_batches[index] + .2 .push(compute::take(column, indices, None)?); } } + // Release references to the indices buffer so we can put it back into the partitioner's state + drop(partition_indices); + + // Put back indices buffer -- This wont fail because we are in control of all references to the underlying buffers + *indices_buffer = + indices.into_parts().1.into_inner().into_vec().unwrap(); + + // Assume the rest of the function is trivial timer.done(); - let it = output_batch_columns - .into_iter() - .zip(partition_indices.into_iter()) - .map(move |(columns, (partition, indices))| { - let options = RecordBatchOptions::new() - .with_row_count(Some(indices.len())); + let it = output_batches.into_iter().map( + move |(partition, length, columns)| { + let options = + RecordBatchOptions::new().with_row_count(Some(length)); let batch = RecordBatch::try_new_with_options( batch.schema(), columns, @@ -347,7 +366,8 @@ impl BatchPartitioner { )?; Ok((partition, batch)) - }); + }, + ); Box::new(it) } From 51854146ac1ac828855c02f68c54b35a05187096 Mon Sep 17 00:00:00 2001 From: ctsk <9384305+ctsk@users.noreply.github.com> Date: Wed, 7 May 2025 17:17:02 +0200 Subject: [PATCH 6/6] repartition: Avoid modulo with power-of-two partitions --- datafusion/physical-plan/src/repartition/mod.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6f68462adc80..cced9ec686b6 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -289,9 +289,17 @@ impl BatchPartitioner { let mut cum_histogram = vec![0; *partitions]; - for hash in hash_buffer.iter_mut() { - *hash %= *partitions as u64; - cum_histogram[*hash as usize] += 1; + if partitions.is_power_of_two() { + for hash in hash_buffer.iter_mut() { + // modulo bit trick: a % (2^k) == a & (2^k - 1) + *hash &= *partitions as u64 - 1; + cum_histogram[*hash as usize] += 1; + } + } else { + for hash in hash_buffer.iter_mut() { + *hash %= *partitions as u64; + cum_histogram[*hash as usize] += 1; + } } for idx in 1..cum_histogram.len() {