diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 41fd0ff1685..c807d99d77d 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -333,7 +333,7 @@ fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch { #[cfg(test)] mod tests { use super::*; - use arrow_array::builder::{ArrayBuilder, StringViewBuilder}; + use arrow_array::builder::StringViewBuilder; use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema}; use std::ops::Range; @@ -346,7 +346,7 @@ mod tests { // expected output is exactly 21 rows (except for the final batch) .with_batch_size(21) .with_expected_output_sizes(vec![21, 21, 21, 17]) - .run() + .run(); } #[test] @@ -357,7 +357,7 @@ mod tests { // expected output is exactly 20 rows (except for the final batch) .with_batch_size(20) .with_expected_output_sizes(vec![20, 20, 20, 20, 17]) - .run() + .run(); } #[test] @@ -369,7 +369,7 @@ mod tests { .with_schema(schema) .with_batch_size(21) .with_expected_output_sizes(vec![]) - .run() + .run(); } #[test] @@ -416,25 +416,162 @@ mod tests { .run(); } + #[test] + fn test_empty_schema() { + let schema = Schema::empty(); + let batch = RecordBatch::new_empty(schema.into()); + Test::new() + .with_batch(batch) + .with_expected_output_sizes(vec![]) + .run(); + } + + #[test] + fn test_string_view_no_views() { + Test::new() + // both input batches have no views, so no need to compact + .with_batch(stringview_batch([Some("foo"), Some("bar")])) + .with_batch(stringview_batch([Some("baz"), Some("qux")])) + .with_expected_output_sizes(vec![4]) + .run(); + } + + #[test] + fn test_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]); + let gc_batches = Test::new() + .with_batch(batch.clone()) + .with_expected_output_sizes(vec![1000]) + .run(); + + let array = col_as_string_view("c0", &batch); + let gc_array = col_as_string_view("c0", gc_batches.first().unwrap()); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]); + let gc_batches = Test::new() + .with_batch(batch.clone()) + .with_batch_size(1000) + .with_expected_output_sizes(vec![1000]) + .run(); + + let array = col_as_string_view("c0", &batch); + let gc_array = col_as_string_view("c0", gc_batches.first().unwrap()); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_string_view_batch_small_with_buffers_no_compact() { + // view with buffers but only short views + let short_strings = std::iter::repeat(Some("SmallString")); + let long_strings = std::iter::once(Some("This string is longer than 12 bytes")); + // 20 short strings, then a long ones + let values = short_strings.take(20).chain(long_strings); + let batch = stringview_batch_repeated(1000, values) + // take only 10 short strings (no long ones) + .slice(5, 10); + let gc_batches = Test::new() + .with_batch(batch.clone()) + .with_batch_size(1000) + .with_expected_output_sizes(vec![10]) + .run(); + + let array = col_as_string_view("c0", &batch); + let gc_array = col_as_string_view("c0", gc_batches.first().unwrap()); + assert_eq!(array.data_buffers().len(), 1); // input has one buffer + assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings + } + + #[test] + fn test_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]) + // slice only 22 rows, so most of the buffer is not used + .slice(11, 22); + + let gc_batches = Test::new() + .with_batch(batch.clone()) + .with_batch_size(1000) + .with_expected_output_sizes(vec![22]) + .run(); + + let array = col_as_string_view("c0", &batch); + let gc_array = col_as_string_view("c0", gc_batches.first().unwrap()); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + #[test] + fn test_string_view_mixed() { + let large_view_batch = + stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]); + let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]); + let mixed_batch = stringview_batch_repeated( + 1000, + [Some("This string is longer than 12 bytes"), Some("Small")], + ); + let mixed_batch_nulls = stringview_batch_repeated( + 1000, + [ + Some("This string is longer than 12 bytes"), + Some("Small"), + None, + ], + ); + + // Several batches with mixed inline / non inline + // 4k rows in + let gc_batches = Test::new() + .with_batch(large_view_batch.clone()) + .with_batch(small_view_batch) + // this batch needs to be compacted (less than 1/2 full) + .with_batch(large_view_batch.slice(10, 20)) + .with_batch(mixed_batch_nulls) + // this batch needs to be compacted (less than 1/2 full) + .with_batch(large_view_batch.slice(10, 20)) + .with_batch(mixed_batch) + .with_expected_output_sizes(vec![1024, 1024, 1024, 968]) + .run(); + + let gc_array = col_as_string_view("c0", gc_batches.first().unwrap()); + + assert_eq!(gc_array.data_buffers().len(), 5); + } + /// Test for [`BatchCoalescer`] /// /// Pushes the input batches to the coalescer and verifies that the resulting /// batches have the expected number of rows and contents. - #[derive(Debug, Clone, Default)] + #[derive(Debug, Clone)] struct Test { - /// Batches to feed to the coalescer. Tests must have at least one - /// schema + /// Batches to feed to the coalescer. input_batches: Vec, - /// The schema. If not provided, the first batch's schema is used. schema: Option, - /// Expected output sizes of the resulting batches expected_output_sizes: Vec, - /// target batch size + /// target batch size (default to 1024) target_batch_size: usize, } + impl Default for Test { + fn default() -> Self { + Self { + input_batches: vec![], + schema: None, + expected_output_sizes: vec![], + target_batch_size: 1024, + } + } + } + impl Test { fn new() -> Self { Self::default() @@ -471,7 +608,9 @@ mod tests { } /// Runs the test -- see documentation on [`Test`] for details - fn run(self) { + /// + /// Returns the resulting output batches + fn run(self) -> Vec { let Self { input_batches, schema, @@ -505,14 +644,17 @@ mod tests { "Unexpected number of rows in output batches\n\ Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}" ); - for (i, (expected_size, batch)) in - expected_output_sizes.iter().zip(output_batches).enumerate() - { + let iter = expected_output_sizes + .iter() + .zip(output_batches.iter()) + .enumerate(); + + for (i, (expected_size, batch)) in iter { // compare the contents of the batch after normalization (using // `==` compares the underlying memory layout too) let expected_batch = single_input_batch.slice(starting_idx, *expected_size); let expected_batch = normalize_batch(expected_batch); - let batch = normalize_batch(batch); + let batch = normalize_batch(batch.clone()); assert_eq!( expected_batch, batch, "Unexpected content in batch {i}:\ @@ -520,10 +662,11 @@ mod tests { ); starting_idx += *expected_size; } + output_batches } } - /// Return a batch of UInt32 with the specified range + /// Return a RecordBatch with a UInt32Array with the specified range fn uint32_batch(range: Range) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); @@ -534,111 +677,55 @@ mod tests { .unwrap() } - #[test] - fn test_gc_string_view_batch_small_no_compact() { - // view with only short strings (no buffers) --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("a"), Some("b"), Some("c")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 0); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } + /// Return a RecordBatch with a StringViewArray with (only) the specified values + fn stringview_batch<'a>(values: impl IntoIterator>) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "c0", + DataType::Utf8View, + false, + )])); - #[test] - fn test_gc_string_view_test_batch_empty() { - let schema = Schema::empty(); - let batch = RecordBatch::new_empty(schema.into()); - let cols = batch.num_columns(); - let num_rows = batch.num_rows(); - let output_batch = gc_string_view_batch(batch); - assert_eq!(cols, output_batch.num_columns()); - assert_eq!(num_rows, output_batch.num_rows()); - } - - #[test] - fn test_gc_string_view_batch_large_no_compact() { - // view with large strings (has buffers) but full --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("This string is longer than 12 bytes")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } - - #[test] - fn test_gc_string_view_batch_large_slice_compact() { - // view with large strings (has buffers) and only partially used --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("this string is longer than 12 bytes")], - } - .build(); - - // slice only 11 rows, so most of the buffer is not used - let array = array.slice(11, 22); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(StringViewArray::from_iter(values))], + ) + .unwrap() } - /// Compares the values of two string view arrays - fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { - assert_eq!(arr1.len(), arr2.len()); - for (s1, s2) in arr1.iter().zip(arr2.iter()) { - assert_eq!(s1, s2); + /// Return a RecordBatch with a StringViewArray with num_rows by repating + /// values over and over. + fn stringview_batch_repeated<'a>( + num_rows: usize, + values: impl IntoIterator>, + ) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "c0", + DataType::Utf8View, + true, + )])); + + // Repeat the values to a total of num_rows + let values: Vec<_> = values.into_iter().collect(); + let values_iter = std::iter::repeat(values.iter()) + .flatten() + .cloned() + .take(num_rows); + + let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192); + for val in values_iter { + builder.append_option(val); } - } - /// runs garbage collection on string view array - /// and ensures the number of rows are the same - fn do_gc(array: StringViewArray) -> StringViewArray { - let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); - let rows = batch.num_rows(); - let schema = batch.schema(); - let gc_batch = gc_string_view_batch(batch); - assert_eq!(rows, gc_batch.num_rows()); - assert_eq!(schema, gc_batch.schema()); - gc_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .clone() + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(builder.finish())]).unwrap() } - /// Describes parameters for creating a `StringViewArray` - struct StringViewTest { - /// The number of rows in the array - rows: usize, - /// The strings to use in the array (repeated over and over - strings: Vec>, - } - - impl StringViewTest { - /// Create a `StringViewArray` with the parameters specified in this struct - fn build(self) -> StringViewArray { - let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192); - loop { - for &v in self.strings.iter() { - builder.append_option(v); - if builder.len() >= self.rows { - return builder.finish(); - } - } - } - } + /// Returns the named column as a StringViewArray + fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray { + batch + .column_by_name(name) + .expect("column not found") + .as_string_view_opt() + .expect("column is not a string view") } /// Normalize the `RecordBatch` so that the memory layout is consistent