diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index 63b9fe30ed42..a9aed95318f7 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -757,7 +757,7 @@ mod tests { fn test_in_progress_recreation() { let array = { // make a builder with small block size. - let mut builder = StringViewBuilder::new().with_block_size(14); + let mut builder = StringViewBuilder::new().with_fixed_block_size(14); builder.append_value("large payload over 12 bytes"); builder.append_option(Some("another large payload over 12 bytes that double than the first one, so that we can trigger the in_progress in builder re-created")); builder.finish() @@ -848,7 +848,7 @@ mod tests { ]; let array = { - let mut builder = StringViewBuilder::new().with_block_size(8); // create multiple buffers + let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // create multiple buffers test_data.into_iter().for_each(|v| builder.append_option(v)); builder.finish() }; diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 7726ee35240f..4f19204b86ef 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -30,7 +30,30 @@ use crate::types::bytes::ByteArrayNativeType; use crate::types::{BinaryViewType, ByteViewType, StringViewType}; use crate::{ArrayRef, GenericByteViewArray}; -const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; +const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB +const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB + +enum BlockSizeGrowthStrategy { + Fixed { size: u32 }, + Exponential { current_size: u32 }, +} + +impl BlockSizeGrowthStrategy { + fn next_size(&mut self) -> u32 { + match self { + Self::Fixed { size } => *size, + Self::Exponential { current_size } => { + if *current_size < MAX_BLOCK_SIZE { + // we have fixed start/end block sizes, so we can't overflow + *current_size = current_size.saturating_mul(2); + *current_size + } else { + MAX_BLOCK_SIZE + } + } + } + } +} /// A builder for [`GenericByteViewArray`] /// @@ -58,7 +81,7 @@ pub struct GenericByteViewBuilder { null_buffer_builder: NullBufferBuilder, completed: Vec, in_progress: Vec, - block_size: u32, + block_size: BlockSizeGrowthStrategy, /// Some if deduplicating strings /// map ` -> ` string_tracker: Option<(HashTable, ahash::RandomState)>, @@ -78,15 +101,42 @@ impl GenericByteViewBuilder { null_buffer_builder: NullBufferBuilder::new(capacity), completed: vec![], in_progress: vec![], - block_size: DEFAULT_BLOCK_SIZE, + block_size: BlockSizeGrowthStrategy::Exponential { + current_size: STARTING_BLOCK_SIZE, + }, string_tracker: None, phantom: Default::default(), } } + /// Set a fixed buffer size for variable length strings + /// + /// The block size is the size of the buffer used to store values greater + /// than 12 bytes. The builder allocates new buffers when the current + /// buffer is full. + /// + /// By default the builder balances buffer size and buffer count by + /// growing buffer size exponentially from 8KB up to 2MB. The + /// first buffer allocated is 8KB, then 16KB, then 32KB, etc up to 2MB. + /// + /// If this method is used, any new buffers allocated are + /// exactly this size. This can be useful for advanced users + /// that want to control the memory usage and buffer count. + /// + /// See for more details on the implications. + pub fn with_fixed_block_size(self, block_size: u32) -> Self { + debug_assert!(block_size > 0, "Block size must be greater than 0"); + Self { + block_size: BlockSizeGrowthStrategy::Fixed { size: block_size }, + ..self + } + } + /// Override the size of buffers to allocate for holding string data + /// Use `with_fixed_block_size` instead. + #[deprecated(note = "Use `with_fixed_block_size` instead")] pub fn with_block_size(self, block_size: u32) -> Self { - Self { block_size, ..self } + self.with_fixed_block_size(block_size) } /// Deduplicate strings while building the array @@ -277,7 +327,7 @@ impl GenericByteViewBuilder { let required_cap = self.in_progress.len() + v.len(); if self.in_progress.capacity() < required_cap { self.flush_in_progress(); - let to_reserve = v.len().max(self.block_size as usize); + let to_reserve = v.len().max(self.block_size.next_size() as usize); self.in_progress.reserve(to_reserve); }; let offset = self.in_progress.len() as u32; @@ -478,7 +528,7 @@ mod tests { let mut builder = StringViewBuilder::new() .with_deduplicate_strings() - .with_block_size(value_1.len() as u32 * 2); // so that we will have multiple buffers + .with_fixed_block_size(value_1.len() as u32 * 2); // so that we will have multiple buffers let values = vec![ Some(value_1), @@ -585,4 +635,46 @@ mod tests { "Invalid argument error: No block found with index 5" ); } + + #[test] + fn test_string_view_with_block_size_growth() { + let mut exp_builder = StringViewBuilder::new(); + let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE); + + let long_string = String::from_utf8(vec![b'a'; STARTING_BLOCK_SIZE as usize]).unwrap(); + + for i in 0..9 { + // 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M + for _ in 0..(2_u32.pow(i)) { + exp_builder.append_value(&long_string); + fixed_builder.append_value(&long_string); + } + exp_builder.flush_in_progress(); + fixed_builder.flush_in_progress(); + + // Every step only add one buffer, but the buffer size is much larger + assert_eq!(exp_builder.completed.len(), i as usize + 1); + assert_eq!( + exp_builder.completed[i as usize].len(), + STARTING_BLOCK_SIZE as usize * 2_usize.pow(i) + ); + + // This step we added 2^i blocks, the sum of blocks should be 2^(i+1) - 1 + assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1); + + // Every buffer is fixed size + assert!(fixed_builder + .completed + .iter() + .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)); + } + + // Add one more value, and the buffer stop growing. + exp_builder.append_value(&long_string); + exp_builder.flush_in_progress(); + assert_eq!( + exp_builder.completed.last().unwrap().capacity(), + MAX_BLOCK_SIZE as usize + ); + } } diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 5f72debcdad2..f6103cb84136 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -5321,7 +5321,7 @@ mod tests { let typed_dict = string_dict_array.downcast_dict::().unwrap(); let string_view_array = { - let mut builder = StringViewBuilder::new().with_block_size(8); // multiple buffers. + let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // multiple buffers. for v in typed_dict.into_iter() { builder.append_option(v); } @@ -5338,7 +5338,7 @@ mod tests { let typed_binary_dict = binary_dict_array.downcast_dict::().unwrap(); let binary_view_array = { - let mut builder = BinaryViewBuilder::new().with_block_size(8); // multiple buffers. + let mut builder = BinaryViewBuilder::new().with_fixed_block_size(8); // multiple buffers. for v in typed_binary_dict.into_iter() { builder.append_option(v); } @@ -5381,7 +5381,7 @@ mod tests { O: OffsetSizeTrait, { let view_array = { - let mut builder = StringViewBuilder::new().with_block_size(8); // multiple buffers. + let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // multiple buffers. for s in VIEW_TEST_DATA.iter() { builder.append_option(*s); } @@ -5410,7 +5410,7 @@ mod tests { O: OffsetSizeTrait, { let view_array = { - let mut builder = BinaryViewBuilder::new().with_block_size(8); // multiple buffers. + let mut builder = BinaryViewBuilder::new().with_fixed_block_size(8); // multiple buffers. for s in VIEW_TEST_DATA.iter() { builder.append_option(*s); }