From ac051b7447d989fed95ad9a2815800b6ba1a8a3c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 3 Jun 2025 14:34:29 -0400 Subject: [PATCH 1/4] Add `coalesce` / `BatchCoalescer` for statefully combining selected batches: --- arrow-select/src/coalesce.rs | 596 +++++++++++++++++++++++++++++++++++ arrow-select/src/filter.rs | 10 +- arrow-select/src/lib.rs | 1 + arrow-select/src/take.rs | 6 + arrow/src/compute/kernels.rs | 4 +- arrow/src/compute/mod.rs | 1 + 6 files changed, 615 insertions(+), 3 deletions(-) create mode 100644 arrow-select/src/coalesce.rs diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs new file mode 100644 index 000000000000..8dfe436afaaf --- /dev/null +++ b/arrow-select/src/coalesce.rs @@ -0,0 +1,596 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`BatchCoalescer`] concatenates multiple [`RecordBatch`]es after +//! operations such as [`filter`] and [`take`]. +//! +//! [`filter`]: crate::filter::filter +//! [`take`]: crate::take::take +use crate::concat::concat_batches; +use arrow_array::{ + builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions, +}; +use arrow_schema::{ArrowError, SchemaRef}; +use std::collections::VecDeque; +use std::sync::Arc; + +// Originally From DataFusion's coalesce module: +// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25 + +/// Concatenate multiple [`RecordBatch`]es +/// +/// Implements the common pattern of incrementally creating output +/// [`RecordBatch`]es of a specific size from an input stream of +/// [`RecordBatch`]es. +/// +/// This is useful after operations such as [`filter`] and [`take`] that produce +/// smaller batches, and we want to coalesce them into larger +/// +/// [`filter`]: crate::filter::filter +/// [`take`]: crate::take::take +/// +/// See: +/// +/// # Example +/// ``` +/// use arrow_array::record_batch; +/// use arrow_select::coalesce::{BatchCoalescer}; +/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap(); +/// +/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows +/// let target_batch_size = 4; +/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4); +/// +/// // push the batches +/// coalescer.push_batch(batch1).unwrap(); +/// // only pushed 3 rows (not yet 4, enough to produce a batch) +/// assert!(coalescer.next_batch().is_none()); +/// coalescer.push_batch(batch2).unwrap(); +/// // now we have 5 rows, so we can produce a batch +/// let finished = coalescer.next_batch().unwrap(); +/// // 4 rows came out (target batch size is 4) +/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap(); +/// assert_eq!(finished, expected); +/// +/// // Have no more input, but still have an in-progress batch +/// assert!(coalescer.next_batch().is_none()); +/// // We can finish the batch, which will produce the remaining rows +/// coalescer.finish_batch().unwrap(); +/// let expected = record_batch!(("a", Int32, [5])).unwrap(); +/// assert_eq!(coalescer.next_batch().unwrap(), expected); +/// +/// // The coalescer is now empty +/// assert!(coalescer.next_batch().is_none()); +/// ``` +/// +/// # Background +/// +/// Generally speaking, larger [`RecordBatch`]es are more efficient to process +/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because +/// there is fixed processing overhead per batch. This coalescer builds up these +/// larger batches incrementally. +/// +/// ```text +/// ┌────────────────────┐ +/// │ RecordBatch │ +/// │ num_rows = 100 │ +/// └────────────────────┘ ┌────────────────────┐ +/// │ │ +/// ┌────────────────────┐ Coalesce │ │ +/// │ │ Batches │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │ +/// │ │ │ RecordBatch │ +/// │ │ │ num_rows = 400 │ +/// └────────────────────┘ │ │ +/// │ │ +/// ┌────────────────────┐ │ │ +/// │ │ │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 100 │ └────────────────────┘ +/// │ │ +/// └────────────────────┘ +/// ``` +/// +/// # Notes: +/// +/// 1. Output rows are produced in the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at exactly +/// `target_batch_size` rows. +/// +/// 3. Eventually this may also be able to handle other optimizations such as a +/// combined filter/coalesce operation. See +/// +#[derive(Debug)] +pub struct BatchCoalescer { + /// The input schema + schema: SchemaRef, + /// output batch size + batch_size: usize, + /// In-progress buffered batches + buffer: Vec, + /// Buffered row count. Always less than `batch_size` + buffered_rows: usize, + /// Completed batches + completed: VecDeque, +} + +impl BatchCoalescer { + /// Create a new `BatchCoalescer` + /// + /// # Arguments + /// - `schema` - the schema of the output batches + /// - `batch_size` - the number of rows in each output batch. + /// Typical values are `4096` or `8192` rows. + /// + pub fn new(schema: SchemaRef, batch_size: usize) -> Self { + Self { + schema, + batch_size, + buffer: vec![], + // We will for sure store at least one completed batch + completed: VecDeque::with_capacity(1), + buffered_rows: 0, + } + } + + /// Return the schema of the output batches + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Push next batch into the Coalescer + /// + /// See [`Self::next_batch()`] to retrieve any completed batches. + pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { + if batch.num_rows() == 0 { + // If the batch is empty, we don't need to do anything + return Ok(()); + } + + let mut batch = gc_string_view_batch(&batch); + + // If pushing this batch would exceed the target batch size, + // finish the current batch and start a new one + while batch.num_rows() > (self.batch_size - self.buffered_rows) { + let remaining_rows = self.batch_size - self.buffered_rows; + debug_assert!(remaining_rows > 0); + let head_batch = batch.slice(0, remaining_rows); + batch = batch.slice(remaining_rows, batch.num_rows() - remaining_rows); + self.buffered_rows += head_batch.num_rows(); + self.buffer.push(head_batch); + self.finish_batch()?; + } + // Add the remaining rows to the buffer + self.buffered_rows += batch.num_rows(); + self.buffer.push(batch); + Ok(()) + } + + /// Concatenates any buffered batches into a single `RecordBatch` and + /// clears any output buffers + /// + /// Normally this is called when the input stream is exhausted, and + /// we want to finalize the last batch of rows. + /// + /// See [`Self::next_batch()`] for the completed batches. + pub fn finish_batch(&mut self) -> Result<(), ArrowError> { + let batch = concat_batches(&self.schema, &self.buffer)?; + self.buffer.clear(); + self.buffered_rows = 0; + self.completed.push_back(batch); + Ok(()) + } + + /// Returns the next completed batch, if any + pub fn next_batch(&mut self) -> Option { + self.completed.pop_front() + } +} + +/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed +/// +/// Decides when to consolidate the StringView into a new buffer to reduce +/// memory usage and improve string locality for better performance. +/// +/// This differs from `StringViewArray::gc` because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a precise block size to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. +fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { + let new_columns: Vec = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + let Some(s) = c.as_string_view_opt() else { + return Arc::clone(c); + }; + let ideal_buffer_size: usize = s + .views() + .iter() + .map(|v| { + let len = (*v as u32) as usize; + if len > 12 { + len + } else { + 0 + } + }) + .sum(); + let actual_buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()); + if ideal_buffer_size > 0 { + builder = builder.with_fixed_block_size(ideal_buffer_size as u32); + } + + for v in s.iter() { + builder.append_option(v); + } + + let gc_string = builder.finish(); + + debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + + Arc::new(gc_string) + } else { + Arc::clone(c) + } + }) + .collect(); + let mut options = RecordBatchOptions::new(); + options = options.with_row_count(Some(batch.num_rows())); + RecordBatch::try_new_with_options(batch.schema(), new_columns, &options) + .expect("Failed to re-create the gc'ed record batch") +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::builder::ArrayBuilder; + use arrow_array::{StringViewArray, UInt32Array}; + use arrow_schema::{DataType, Field, Schema}; + use std::ops::Range; + + #[test] + fn test_coalesce() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat_n(batch, 10)) + // expected output is exactly 20 rows (except for the final batch) + .with_batch_size(21) + .with_expected_output_sizes(vec![21, 21, 21, 17]) + .run() + } + + #[test] + fn test_coalesce_empty() { + let arrays: Vec = vec![Arc::new(UInt32Array::from(vec![] as Vec))]; + let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); + let batch = RecordBatch::try_new_with_options( + schema, + arrays, + &RecordBatchOptions::new().with_row_count(Some(0)), + ) + .unwrap(); + + Test::new() + .with_batches(vec![]) + .with_batch(batch) + .with_batch_size(21) + .with_expected_output_sizes(vec![0]) + .run() + } + + #[test] + fn test_single_large_batch_greater_than_target() { + // test a single large batch + let batch = uint32_batch(0..4096); + Test::new() + .with_batch(batch) + .with_batch_size(1000) + .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96]) + .run(); + } + + #[test] + fn test_single_large_batch_smaller_than_target() { + // test a single large batch + let batch = uint32_batch(0..4096); + Test::new() + .with_batch(batch) + .with_batch_size(8192) + .with_expected_output_sizes(vec![4096]) + .run(); + } + + #[test] + fn test_single_large_batch_equal_to_target() { + // test a single large batch + let batch = uint32_batch(0..4096); + Test::new() + .with_batch(batch) + .with_batch_size(4096) + .with_expected_output_sizes(vec![4096]) + .run(); + } + + #[test] + fn test_single_large_batch_equally_divisible_in_target() { + // test a single large batch + let batch = uint32_batch(0..4096); + Test::new() + .with_batch(batch) + .with_batch_size(1024) + .with_expected_output_sizes(vec![1024, 1024, 1024, 1024]) + .run(); + } + + /// Test for [`BatchCoalescer`] + /// + /// Pushes the input batches to the coalescer and verifies that the resulting + /// batches have the expected number of rows and contents. + #[derive(Debug, Clone, Default)] + struct Test { + /// Batches to feed to the coalescer. Tests must have at least one + /// schema + input_batches: Vec, + /// Expected output sizes of the resulting batches + expected_output_sizes: Vec, + /// target batch size + target_batch_size: usize, + } + + impl Test { + fn new() -> Self { + Self::default() + } + + /// Set the target batch size + fn with_batch_size(mut self, target_batch_size: usize) -> Self { + self.target_batch_size = target_batch_size; + self + } + + /// Extend the input batches with `batch` + fn with_batch(mut self, batch: RecordBatch) -> Self { + self.input_batches.push(batch); + self + } + + /// Extends the input batches with `batches` + fn with_batches(mut self, batches: impl IntoIterator) -> Self { + self.input_batches.extend(batches); + self + } + + /// Extends `sizes` to expected output sizes + fn with_expected_output_sizes(mut self, sizes: impl IntoIterator) -> Self { + self.expected_output_sizes.extend(sizes); + self + } + + /// Runs the test -- see documentation on [`Test`] for details + fn run(self) { + let Self { + input_batches, + target_batch_size, + expected_output_sizes, + } = self; + + let schema = input_batches[0].schema(); + + // create a single large input batch for output comparison + let single_input_batch = concat_batches(&schema, &input_batches).unwrap(); + + let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size); + + for batch in input_batches { + coalescer.push_batch(batch).unwrap(); + } + coalescer.finish_batch().unwrap(); + let mut output_batches = vec![]; + while let Some(batch) = coalescer.next_batch() { + output_batches.push(batch); + } + + // make sure we got the expected number of output batches and content + let mut starting_idx = 0; + assert_eq!(expected_output_sizes.len(), output_batches.len()); + let actual_output_sizes: Vec = + output_batches.iter().map(|b| b.num_rows()).collect(); + assert_eq!( + expected_output_sizes, actual_output_sizes, + "Unexpected number of rows in output batches\n\ + Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}" + ); + for (i, (expected_size, batch)) in + expected_output_sizes.iter().zip(output_batches).enumerate() + { + // compare the contents of the batch after normalization (using + // `==` compares the underlying memory layout too) + let expected_batch = single_input_batch.slice(starting_idx, *expected_size); + let expected_batch = normalize_batch(expected_batch); + let batch = normalize_batch(batch); + assert_eq!( + expected_batch, batch, + "Unexpected content in batch {i}:\ + \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}" + ); + starting_idx += *expected_size; + } + } + } + + /// Return a batch of UInt32 with the specified range + fn uint32_batch(range: Range) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); + + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from_iter_values(range))], + ) + .unwrap() + } + + #[test] + fn test_gc_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("a"), Some("b"), Some("c")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_test_batch_empty() { + let schema = Schema::empty(); + let batch = RecordBatch::new_empty(schema.into()); + let output_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_columns(), output_batch.num_columns()); + assert_eq!(batch.num_rows(), output_batch.num_rows()); + } + + #[test] + fn test_gc_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("This string is longer than 12 bytes")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("this string is longer than 12 bytes")], + } + .build(); + + // slice only 11 rows, so most of the buffer is not used + let array = array.slice(11, 22); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + /// Compares the values of two string view arrays + fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { + assert_eq!(arr1.len(), arr2.len()); + for (s1, s2) in arr1.iter().zip(arr2.iter()) { + assert_eq!(s1, s2); + } + } + + /// runs garbage collection on string view array + /// and ensures the number of rows are the same + fn do_gc(array: StringViewArray) -> StringViewArray { + let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); + let gc_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_rows(), gc_batch.num_rows()); + assert_eq!(batch.schema(), gc_batch.schema()); + gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + /// Describes parameters for creating a `StringViewArray` + struct StringViewTest { + /// The number of rows in the array + rows: usize, + /// The strings to use in the array (repeated over and over + strings: Vec>, + } + + impl StringViewTest { + /// Create a `StringViewArray` with the parameters specified in this struct + fn build(self) -> StringViewArray { + let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192); + loop { + for &v in self.strings.iter() { + builder.append_option(v); + if builder.len() >= self.rows { + return builder.finish(); + } + } + } + } + } + + /// Normalize the `RecordBatch` so that the memory layout is consistent + /// (e.g. StringArray is compacted). + fn normalize_batch(batch: RecordBatch) -> RecordBatch { + // Only need to normalize StringViews (as == also tests for memory layout) + let (schema, mut columns, row_count) = batch.into_parts(); + + for column in columns.iter_mut() { + let Some(string_view) = column.as_string_view_opt() else { + continue; + }; + + // Re-create the StringViewArray to ensure memory layout is + // consistent + let mut builder = StringViewBuilder::new(); + for s in string_view.iter() { + builder.append_option(s); + } + // Update the column with the new StringViewArray + *column = Arc::new(builder.finish()); + } + + let options = RecordBatchOptions::new().with_row_count(Some(row_count)); + RecordBatch::try_new_with_options(schema, columns, &options).unwrap() + } +} diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index cf16140aad1f..fa91c0690b4c 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -156,10 +156,16 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray { BooleanArray::new(mask, None) } -/// Returns a filtered `values` [Array] where the corresponding elements of +/// Returns a filtered `values` [`Array`] where the corresponding elements of /// `predicate` are `true`. /// -/// See also [`FilterBuilder`] for more control over the filtering process. +/// # See also +/// * [`FilterBuilder`] for more control over the filtering process. +/// * [`filter_record_batch`] to filter a [`RecordBatch`] +/// * [`BatchCoalescer`]: to filter multiple [`RecordBatch`] and coalesce +/// the results into a single array. +/// +/// [`BatchCoalescer`]: crate::coalesce::BatchCoalescer /// /// # Example /// ```rust diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs index 1648dc2833d8..a2ddff351c9a 100644 --- a/arrow-select/src/lib.rs +++ b/arrow-select/src/lib.rs @@ -24,6 +24,7 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![warn(missing_docs)] +pub mod coalesce; pub mod concat; mod dictionary; pub mod filter; diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index 4feb8c27b938..ef287eb24427 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -64,6 +64,12 @@ use num::{One, Zero}; /// /// When `options` is not set to check bounds, taking indexes after `len` will panic. /// +/// # See also +/// * [`BatchCoalescer`]: to filter multiple [`RecordBatch`] and coalesce +/// the results into a single array. +/// +/// [`BatchCoalescer`]: crate::coalesce::BatchCoalescer +/// /// # Examples /// ``` /// # use arrow_array::{StringArray, UInt32Array, cast::AsArray}; diff --git a/arrow/src/compute/kernels.rs b/arrow/src/compute/kernels.rs index 86fdbe66c8ae..6317a4229f5e 100644 --- a/arrow/src/compute/kernels.rs +++ b/arrow/src/compute/kernels.rs @@ -21,7 +21,9 @@ pub use arrow_arith::{aggregate, arithmetic, arity, bitwise, boolean, numeric, t pub use arrow_cast::cast; pub use arrow_cast::parse as cast_utils; pub use arrow_ord::{cmp, partition, rank, sort}; -pub use arrow_select::{concat, filter, interleave, nullif, take, union_extract, window, zip}; +pub use arrow_select::{ + coalesce, concat, filter, interleave, nullif, take, union_extract, window, zip, +}; pub use arrow_string::{concat_elements, length, regexp, substring}; /// Comparison kernels for `Array`s. diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs index bff7214718fc..2c0cf0104a14 100644 --- a/arrow/src/compute/mod.rs +++ b/arrow/src/compute/mod.rs @@ -24,6 +24,7 @@ pub use self::kernels::arithmetic::*; pub use self::kernels::arity::*; pub use self::kernels::boolean::*; pub use self::kernels::cast::*; +pub use self::kernels::coalesce::*; pub use self::kernels::comparison::*; pub use self::kernels::concat::*; pub use self::kernels::filter::*; From 313ed723b2c6fadfcf4d6dfd7894adc4144135c9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 4 Jun 2025 10:11:44 -0400 Subject: [PATCH 2/4] Adjust API --- arrow-select/src/coalesce.rs | 63 +++++++++++++++++++++++------------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 8dfe436afaaf..1771b897b652 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -59,23 +59,23 @@ use std::sync::Arc; /// // push the batches /// coalescer.push_batch(batch1).unwrap(); /// // only pushed 3 rows (not yet 4, enough to produce a batch) -/// assert!(coalescer.next_batch().is_none()); +/// assert!(coalescer.next_completed_batch().is_none()); /// coalescer.push_batch(batch2).unwrap(); /// // now we have 5 rows, so we can produce a batch -/// let finished = coalescer.next_batch().unwrap(); +/// let finished = coalescer.next_completed_batch().unwrap(); /// // 4 rows came out (target batch size is 4) /// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap(); /// assert_eq!(finished, expected); /// /// // Have no more input, but still have an in-progress batch -/// assert!(coalescer.next_batch().is_none()); +/// assert!(coalescer.next_completed_batch().is_none()); /// // We can finish the batch, which will produce the remaining rows -/// coalescer.finish_batch().unwrap(); +/// coalescer.finish_buffered_batch().unwrap(); /// let expected = record_batch!(("a", Int32, [5])).unwrap(); -/// assert_eq!(coalescer.next_batch().unwrap(), expected); +/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected); /// /// // The coalescer is now empty -/// assert!(coalescer.next_batch().is_none()); +/// assert!(coalescer.next_completed_batch().is_none()); /// ``` /// /// # Background @@ -157,7 +157,7 @@ impl BatchCoalescer { /// Push next batch into the Coalescer /// - /// See [`Self::next_batch()`] to retrieve any completed batches. + /// See [`Self::next_completed_batch()`] to retrieve any completed batches. pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { if batch.num_rows() == 0 { // If the batch is empty, we don't need to do anything @@ -175,7 +175,7 @@ impl BatchCoalescer { batch = batch.slice(remaining_rows, batch.num_rows() - remaining_rows); self.buffered_rows += head_batch.num_rows(); self.buffer.push(head_batch); - self.finish_batch()?; + self.finish_buffered_batch()?; } // Add the remaining rows to the buffer self.buffered_rows += batch.num_rows(); @@ -189,8 +189,11 @@ impl BatchCoalescer { /// Normally this is called when the input stream is exhausted, and /// we want to finalize the last batch of rows. /// - /// See [`Self::next_batch()`] for the completed batches. - pub fn finish_batch(&mut self) -> Result<(), ArrowError> { + /// See [`Self::next_completed_batch()`] for the completed batches. + pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> { + if self.buffer.is_empty() { + return Ok(()); + } let batch = concat_batches(&self.schema, &self.buffer)?; self.buffer.clear(); self.buffered_rows = 0; @@ -198,8 +201,18 @@ impl BatchCoalescer { Ok(()) } + /// Returns true if there is any buffered data + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() && self.completed.is_empty() + } + + /// Returns true if there are any completed batches + pub fn has_completed_batch(&self) -> bool { + !self.completed.is_empty() + } + /// Returns the next completed batch, if any - pub fn next_batch(&mut self) -> Option { + pub fn next_completed_batch(&mut self) -> Option { self.completed.pop_front() } } @@ -298,20 +311,13 @@ mod tests { #[test] fn test_coalesce_empty() { - let arrays: Vec = vec![Arc::new(UInt32Array::from(vec![] as Vec))]; let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); - let batch = RecordBatch::try_new_with_options( - schema, - arrays, - &RecordBatchOptions::new().with_row_count(Some(0)), - ) - .unwrap(); Test::new() .with_batches(vec![]) - .with_batch(batch) + .with_schema(schema) .with_batch_size(21) - .with_expected_output_sizes(vec![0]) + .with_expected_output_sizes(vec![]) .run() } @@ -368,6 +374,10 @@ mod tests { /// Batches to feed to the coalescer. Tests must have at least one /// schema input_batches: Vec, + + /// The schema. If not provided, the first batch's schema is used. + schema: Option, + /// Expected output sizes of the resulting batches expected_output_sizes: Vec, /// target batch size @@ -397,6 +407,12 @@ mod tests { self } + /// Specifies the schema for the test + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + /// Extends `sizes` to expected output sizes fn with_expected_output_sizes(mut self, sizes: impl IntoIterator) -> Self { self.expected_output_sizes.extend(sizes); @@ -407,11 +423,12 @@ mod tests { fn run(self) { let Self { input_batches, + schema, target_batch_size, expected_output_sizes, } = self; - let schema = input_batches[0].schema(); + let schema = schema.unwrap_or_else(|| input_batches[0].schema()); // create a single large input batch for output comparison let single_input_batch = concat_batches(&schema, &input_batches).unwrap(); @@ -421,9 +438,9 @@ mod tests { for batch in input_batches { coalescer.push_batch(batch).unwrap(); } - coalescer.finish_batch().unwrap(); + coalescer.finish_buffered_batch().unwrap(); let mut output_batches = vec![]; - while let Some(batch) = coalescer.next_batch() { + while let Some(batch) = coalescer.next_completed_batch() { output_batches.push(batch); } From 363a747dbce79896f37bc2807eb6446d3f5067b0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 4 Jun 2025 10:49:59 -0400 Subject: [PATCH 3/4] Finish batch correctly --- arrow-select/src/coalesce.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 1771b897b652..5a5e7e141c84 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -180,6 +180,11 @@ impl BatchCoalescer { // Add the remaining rows to the buffer self.buffered_rows += batch.num_rows(); self.buffer.push(batch); + + // If we have reached the target batch size, finalize the buffered batch + if self.buffered_rows >= self.batch_size { + self.finish_buffered_batch()?; + } Ok(()) } @@ -303,12 +308,23 @@ mod tests { let batch = uint32_batch(0..8); Test::new() .with_batches(std::iter::repeat_n(batch, 10)) - // expected output is exactly 20 rows (except for the final batch) + // expected output is exactly 21 rows (except for the final batch) .with_batch_size(21) .with_expected_output_sizes(vec![21, 21, 21, 17]) .run() } + #[test] + fn test_coalesce_one_by_one() { + let batch = uint32_batch(0..1); // single row input + Test::new() + .with_batches(std::iter::repeat_n(batch, 97)) + // expected output is exactly 20 rows (except for the final batch) + .with_batch_size(20) + .with_expected_output_sizes(vec![20, 20, 20, 20, 17]) + .run() + } + #[test] fn test_coalesce_empty() { let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); From d75a71bb43473755e891c8c10c9cb5352c0bef6b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 4 Jun 2025 12:14:22 -0400 Subject: [PATCH 4/4] Add CoaleseBatches benchmark --- arrow/Cargo.toml | 6 + arrow/benches/coalesce_kernels.rs | 403 ++++++++++++++++++++++++++++++ arrow/src/util/bench_util.rs | 2 +- 3 files changed, 410 insertions(+), 1 deletion(-) create mode 100644 arrow/benches/coalesce_kernels.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 45d601828604..e19981f76b69 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -161,6 +161,12 @@ name = "filter_kernels" harness = false required-features = ["test_utils"] +[[bench]] +name = "coalesce_kernels" +harness = false +required-features = ["test_utils"] + + [[bench]] name = "take_kernels" harness = false diff --git a/arrow/benches/coalesce_kernels.rs b/arrow/benches/coalesce_kernels.rs new file mode 100644 index 000000000000..2c7736d7607c --- /dev/null +++ b/arrow/benches/coalesce_kernels.rs @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for the `coalesce` kernels in Arrow. + +use arrow::util::bench_util::*; +use std::sync::Arc; + +use arrow::array::*; +use arrow_array::types::{Float64Type, Int32Type}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_select::coalesce::BatchCoalescer; +use criterion::{criterion_group, criterion_main, Criterion}; + +/// Benchmarks for generating evently sized output RecordBatches +/// from a sequence of filtered source batches +/// +fn add_all_filter_benchmarks(c: &mut Criterion) { + let batch_size = 8192; // 8K rows is a commonly used size for batches + + // Single StringViewArray + let single_schema = SchemaRef::new(Schema::new(vec![Field::new( + "value", + DataType::Utf8View, + true, + )])); + + // Mixed primitive, StringViewArray + let mixed_utf8view_schema = SchemaRef::new(Schema::new(vec![ + Field::new("int32_val", DataType::Int32, true), + Field::new("float_val", DataType::Float64, true), + Field::new("utf8view_val", DataType::Utf8View, true), + ])); + + // Mixed primitive, StringArray + let mixed_utf8_schema = SchemaRef::new(Schema::new(vec![ + Field::new("int32_val", DataType::Int32, true), + Field::new("float_val", DataType::Float64, true), + Field::new("utf8", DataType::Utf8, true), + ])); + + // dictionary types + // + let mixed_dict_schema = SchemaRef::new(Schema::new(vec![ + Field::new( + "string_dict", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + Field::new("float_val1", DataType::Float64, true), + Field::new("float_val2", DataType::Float64, true), + // TODO model other dictionary types here (FixedSizeBinary for example) + ])); + + // Null density: 0, 10% + for null_density in [0.0, 0.1] { + // Selectivity: 0.1%, 1%, 10%, 80% + for selectivity in [0.001, 0.01, 0.1, 0.8] { + FilterBenchmarkBuilder { + c, + name: "single_utf8view", + batch_size, + num_output_batches: 50, + null_density, + selectivity, + schema: &single_schema, + } + .build(); + + FilterBenchmarkBuilder { + c, + name: "mixed_utf8view", + batch_size, + num_output_batches: 20, + null_density, + selectivity, + schema: &mixed_utf8view_schema, + } + .build(); + + FilterBenchmarkBuilder { + c, + name: "mixed_utf8", + batch_size, + num_output_batches: 20, + null_density, + selectivity, + schema: &mixed_utf8_schema, + } + .build(); + + FilterBenchmarkBuilder { + c, + name: "mixed_dict", + batch_size, + num_output_batches: 10, + null_density, + selectivity, + schema: &mixed_dict_schema, + } + .build(); + } + } +} + +criterion_group!(benches, add_all_filter_benchmarks); +criterion_main!(benches); + +/// Run the filters with a batch_size, null_density, selectivity, and schema +struct FilterBenchmarkBuilder<'a> { + /// Benchmark criterion instance + c: &'a mut Criterion, + /// Name of the benchmark + name: &'a str, + /// Size of the input and output batches + batch_size: usize, + /// Number of output batches to collect (tuned to keep benchmark time reasonable) + num_output_batches: usize, + /// between 0.0 .. 1.0, percent of data rows (not filter rows) that should be null + null_density: f32, + /// between 0.0 .. 1.0, percent of rows that should be kept by the filter + selectivity: f32, + /// Schema of the data stream + schema: &'a SchemaRef, +} + +impl FilterBenchmarkBuilder<'_> { + fn build(self) { + let Self { + c, + name, + batch_size, + num_output_batches, + null_density, + selectivity, + schema, + } = self; + + let filters = FilterStreamBuilder::new() + .with_batch_size(batch_size) + .with_true_density(selectivity) + .with_null_density(0.0) // no nulls in the filter + .build(); + + let data = DataStreamBuilder::new(Arc::clone(schema)) + .with_batch_size(batch_size) + .with_null_density(null_density) + .build(); + + // Keep feeding the filter stream into the coalescer until we hit a total number of output batches + let id = format!( + "filter: {name}, {batch_size}, nulls: {null_density}, selectivity: {selectivity}" + ); + c.bench_function(&id, |b| { + b.iter(|| { + filter_streams(num_output_batches, filters.clone(), data.clone()); + }) + }); + } +} + +/// Pull RecordBatches from a data stream and apply a sequence of +/// filters from a filter stream until we have a specified number of output +/// batches. +fn filter_streams( + mut num_output_batches: usize, + mut filter_stream: FilterStream, + mut data_stream: DataStream, +) { + let schema = data_stream.schema(); + let batch_size = data_stream.batch_size(); + let mut coalescer = BatchCoalescer::new(Arc::clone(schema), batch_size); + + while num_output_batches > 0 { + let filter = filter_stream.next_filter(); + let batch = data_stream.next_batch(); + // Apply the filter to the input batch + let filtered_batch = arrow_select::filter::filter_record_batch(batch, filter).unwrap(); + // Add the filtered batch to the coalescer + coalescer.push_batch(filtered_batch).unwrap(); + // consume (but discard) the output batch + if coalescer.next_completed_batch().is_some() { + num_output_batches -= 1; + } + } +} + +/// Stream of filters to apply to a sequence of input RecordBatches +/// +/// This pre-computes a sequence of filters and then repeats it forever. +#[derive(Debug, Clone)] +struct FilterStream { + index: usize, + // arc'd so it is cheaply cloned + batches: Arc<[BooleanArray]>, +} + +impl FilterStream { + pub fn next_filter(&mut self) -> &BooleanArray { + let current_index = self.index; + self.index += 1; + if self.index >= self.batches.len() { + self.index = 0; // loop back to the start + } + self.batches + .get(current_index) + .expect("No more filters available") + } +} + +#[derive(Debug)] +struct FilterStreamBuilder { + batch_size: usize, + num_batches: usize, // number of unique batches to create + null_density: f32, + true_density: f32, +} + +impl FilterStreamBuilder { + fn new() -> Self { + FilterStreamBuilder { + batch_size: 8192, // default batch size + num_batches: 11, // default number of unique batches (different than data stream) + null_density: 0.0, // default null density + true_density: 0.5, // default true density + } + } + /// set the batch size for the filter stream + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// set the null density for the filter stream + fn with_null_density(mut self, null_density: f32) -> Self { + assert!((0.0..=1.0).contains(&null_density)); + self.null_density = null_density; + self + } + + /// set the true density for the filter stream + fn with_true_density(mut self, true_density: f32) -> Self { + assert!((0.0..=1.0).contains(&true_density)); + self.true_density = true_density; + self + } + fn build(self) -> FilterStream { + let Self { + batch_size, + num_batches, + null_density, + true_density, + } = self; + let batches = (0..num_batches) + .map(|_| create_boolean_array(batch_size, null_density, true_density)) + .collect::>(); + + FilterStream { + index: 0, + batches: Arc::from(batches), + } + } +} + +#[derive(Debug, Clone)] +struct DataStream { + schema: SchemaRef, + index: usize, + batch_size: usize, + // arc'd so it is cheaply cloned + batches: Arc<[RecordBatch]>, +} + +impl DataStream { + /// Returns the schema for this data stream + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Returns the batch size + pub fn batch_size(&self) -> usize { + self.batch_size + } + + fn next_batch(&mut self) -> &RecordBatch { + let current_index = self.index; + self.index += 1; + if self.index >= self.batches.len() { + self.index = 0; // loop back to the start + } + self.batches + .get(current_index) + .expect("No more batches available") + } +} + +#[derive(Debug)] +struct DataStreamBuilder { + schema: SchemaRef, + batch_size: usize, + null_density: f32, + num_batches: usize, // number of unique batches to create +} + +impl DataStreamBuilder { + fn new(schema: SchemaRef) -> Self { + DataStreamBuilder { + schema, + batch_size: 8192, + null_density: 0.0, + num_batches: 10, + } + } + + /// set the batch size for the data stream + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// set the null density for the data stream + fn with_null_density(mut self, null_density: f32) -> Self { + assert!((0.0..=1.0).contains(&null_density)); + self.null_density = null_density; + self + } + + /// build the data stream (not implemented yet) + fn build(self) -> DataStream { + let Self { + schema, + batch_size, + null_density, + num_batches, + } = self; + + let batches = (0..num_batches) + .map(|seed| { + let columns = schema + .fields() + .iter() + .map(|field| create_input_array(field, batch_size, null_density, seed as u64)) + .collect::>(); + RecordBatch::try_new(schema.clone(), columns).unwrap() + }) + .collect::>(); + + DataStream { + schema, + index: 0, + batch_size, + batches: Arc::from(batches), + } + } +} + +fn create_input_array(field: &Field, batch_size: usize, null_density: f32, seed: u64) -> ArrayRef { + let max_string_len = 30; + match field.data_type() { + DataType::Int32 => Arc::new(create_primitive_array_with_seed::( + batch_size, + null_density, + seed, + )), + DataType::Float64 => Arc::new(create_primitive_array_with_seed::( + batch_size, + null_density, + seed, + )), + DataType::Utf8 => Arc::new(create_string_array::(batch_size, null_density)), // TODO seed + DataType::Utf8View => { + Arc::new(create_string_view_array_with_max_len( + batch_size, + null_density, + max_string_len, + )) // TODO seed + } + DataType::Dictionary(key_type, value_type) + if key_type.as_ref() == &DataType::Int32 && value_type.as_ref() == &DataType::Utf8 => + { + Arc::new(create_string_dict_array::( + batch_size, + null_density, + max_string_len, + )) // TODO seed + } + _ => panic!("Unsupported data type: {field:?}"), + } +} diff --git a/arrow/src/util/bench_util.rs b/arrow/src/util/bench_util.rs index 2f0ccf2addd4..94c6adfb83da 100644 --- a/arrow/src/util/bench_util.rs +++ b/arrow/src/util/bench_util.rs @@ -277,7 +277,7 @@ pub fn create_string_view_array(size: usize, null_density: f32) -> StringViewArr } /// Creates a random (but fixed-seeded) array of rand size with a given max size, null density and length -fn create_string_view_array_with_max_len( +pub fn create_string_view_array_with_max_len( size: usize, null_density: f32, max_str_len: usize,