diff --git a/arrow-array/src/array/byte_view_array.rs b/arrow-array/src/array/byte_view_array.rs index e837512ed064..d7fa51fb0a48 100644 --- a/arrow-array/src/array/byte_view_array.rs +++ b/arrow-array/src/array/byte_view_array.rs @@ -538,6 +538,18 @@ impl GenericByteViewArray { l_full_data.cmp(r_full_data) } + + /// return the total number of bytes required to hold all strings pointed to by views in this array + pub fn minimum_buffer_size(&self) -> usize { + let mut used = 0; + for v in self.views().iter() { + let len = (*v as u32) as usize; + if len > 12 { + used += len; + } + } + used + } } impl Debug for GenericByteViewArray { diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs index 7268e751b149..556c72dc6e19 100644 --- a/arrow-array/src/builder/generic_bytes_view_builder.rs +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -28,7 +28,7 @@ use hashbrown::HashTable; use crate::builder::ArrayBuilder; use crate::types::bytes::ByteArrayNativeType; use crate::types::{BinaryViewType, ByteViewType, StringViewType}; -use crate::{ArrayRef, GenericByteViewArray}; +use crate::{Array, ArrayRef, GenericByteViewArray}; const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB @@ -84,10 +84,26 @@ pub struct GenericByteViewBuilder { completed: Vec, in_progress: Vec, block_size: BlockSizeGrowthStrategy, + /// When appending views from an existing Array, the builder will copy + /// the underlying strings into a new buffer if the array is sparse. + /// + /// If None, the builder will not copy long strings + /// + /// If Some, the builder will *copy* long strings if the total size of the used + /// buffer bytes / the total size is less than than `append_load_factor` + /// + /// So if `append_load_factor` is `Some(0.5)`, the builder will copy long + /// strings if the total size of the used buffers is less than 50% of the + /// total size of the buffers. + target_buffer_load_factor: Option, /// Some if deduplicating strings /// map ` -> ` string_tracker: Option<(HashTable, ahash::RandomState)>, phantom: PhantomData, + /// How much space to reserve for newly created buffers. + /// + /// Defaults to 0 + initial_capacity: Option, } impl GenericByteViewBuilder { @@ -103,12 +119,39 @@ impl GenericByteViewBuilder { null_buffer_builder: NullBufferBuilder::new(capacity), completed: vec![], in_progress: vec![], + target_buffer_load_factor: Some(0.5), block_size: BlockSizeGrowthStrategy::Exponential { current_size: STARTING_BLOCK_SIZE, }, string_tracker: None, phantom: Default::default(), + initial_capacity: None, + } + } + + /// Set the initial capacity for buffers after finish is called + pub fn with_initial_capacity(mut self, initial_capacity: usize) -> Self { + self.initial_capacity = Some(initial_capacity); + self + } + + /// Set the target buffer load factor for appending views from existing arrays + /// + /// Defaults to 50% if not set. + /// + /// Panics if the load factor is not between 0 and 1. + pub fn with_target_buffer_load_factor( + mut self, + target_buffer_load_factor: Option, + ) -> Self { + if let Some(load_factor) = target_buffer_load_factor { + assert!( + load_factor > 0.0 && load_factor <= 1.0, + "Target buffer load factor must be between 0 and 1" + ); } + self.target_buffer_load_factor = target_buffer_load_factor; + self } /// Set a fixed buffer size for variable length strings @@ -236,7 +279,7 @@ impl GenericByteViewBuilder { /// Flushes the in progress block if any #[inline] - fn flush_in_progress(&mut self) { + pub fn flush_in_progress(&mut self) { if !self.in_progress.is_empty() { let f = Buffer::from_vec(std::mem::take(&mut self.in_progress)); self.push_completed(f) @@ -366,8 +409,18 @@ impl GenericByteViewBuilder { self.flush_in_progress(); let completed = std::mem::take(&mut self.completed); let len = self.views_builder.len(); - let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); - let nulls = self.null_buffer_builder.finish(); + let (mut views_builder, mut null_buffer_builder) = match self.initial_capacity { + Some(initial_capacity) => ( + BufferBuilder::new(initial_capacity), + NullBufferBuilder::new(initial_capacity), + ), + None => (BufferBuilder::default(), NullBufferBuilder::new(len)), + }; + std::mem::swap(&mut views_builder, &mut self.views_builder); + std::mem::swap(&mut null_buffer_builder, &mut self.null_buffer_builder); + + let views = ScalarBuffer::new(views_builder.finish(), 0, len); + let nulls = null_buffer_builder.finish(); if let Some((ref mut ht, _)) = self.string_tracker.as_mut() { ht.clear(); } @@ -406,6 +459,122 @@ impl GenericByteViewBuilder { }; buffer_size + in_progress + tracker + views + null } + + /// Append all views from the given array into the inprogress builder + /// + /// Will copy the underlying views based on the value of target_buffer_load_factor + pub fn append_array(&mut self, array: &GenericByteViewArray) { + let num_rows = array.len(); + if num_rows == 0 { + return; // nothing to do + } + + let null_buffer_builder = &mut self.null_buffer_builder; + let views = &mut self.views_builder; + + // Copy nulls + if let Some(nulls) = array.nulls() { + null_buffer_builder.append_buffer(nulls); + } else { + null_buffer_builder.append_n_non_nulls(array.len()); + } + + // Copy views from the source array + let starting_view = views.len(); + views.append_slice(array.views()); + + // Safety we only appended views from array + unsafe { + self.finalize_copied_views(starting_view, array); + } + } + + /// Finalizes the views and buffers of the array + /// + /// This must be called after appending views from `array` to the builder. + /// + /// The views from `array` will point to the old buffers. This function + /// updates all views starting at `starting_view` to point to the new + /// buffers or copies the values into a new buffer if the array is sparse. + /// + /// # Safety + /// + /// * self.views[starting_view..] must be valid views from `array`. + pub unsafe fn finalize_copied_views( + &mut self, + starting_view: usize, + array: &GenericByteViewArray, + ) { + // Flush the in-progress buffer + self.flush_in_progress(); + + let buffers = &mut self.completed; + let views = &mut self.views_builder; + + let mut used_buffer_size = 0; + let use_exising_buffers = match self.target_buffer_load_factor { + None => true, + Some(load_factor) => { + used_buffer_size = array.minimum_buffer_size(); + let actual_buffer_size = array.get_buffer_memory_size(); + // If the total size of the buffers is less than the load factor, copy them existing buffers + used_buffer_size >= (actual_buffer_size as f32 * load_factor) as usize + } + }; + + if use_exising_buffers { + let num_buffers_before: u32 = buffers.len().try_into().expect("buffer count overflow"); + buffers.extend_from_slice(array.data_buffers()); // + + // If there were no existing buffers, the views do not need to be updated + // as the buffers of `array` are the same + if num_buffers_before == 0 { + return; + } + + // Update any views that point to the old buffers + for v in views.as_slice_mut()[starting_view..].iter_mut() { + let view_len = *v as u32; + // if view_len is 12 or less, data is inlined and doesn't need an update + // if view is 12 or more, need to update the buffer offset + if view_len > 12 { + let mut view = ByteView::from(*v); + let new_buffer_index = num_buffers_before + view.buffer_index; + view.buffer_index = new_buffer_index; + *v = view.into(); // update view + } + } + } else { + // otherwise the array is sparse so copy the data into a single new + // buffer as well as updating the views + let mut new_buffer: Vec = Vec::with_capacity(used_buffer_size); + let new_buffer_index = buffers.len() as u32; // making one new buffer + // Update any views that point to the old buffers. + for v in views.as_slice_mut()[starting_view..].iter_mut() { + let view_len = *v as u32; + // if view_len is 12 or less, data is inlined and doesn't need an update + // if view is 12 or more, need to copy the data to the new buffer and update the index and buffer offset + if view_len > 12 { + let mut view = ByteView::from(*v); + let old_buffer = &array.data_buffers()[view.buffer_index as usize].as_slice(); + + let new_offset = new_buffer.len(); + let old_offset = view.offset as usize; + let str_data = &old_buffer[old_offset..old_offset + view_len as usize]; + new_buffer.extend_from_slice(str_data); + view.offset = new_offset as u32; + view.buffer_index = new_buffer_index; + *v = view.into(); // update view + } + } + buffers.push(new_buffer.into()); + } + } + + /// Returns the inner views and null buffer builders and buffers. + pub fn inner_mut(&mut self) -> (&mut BufferBuilder, &mut NullBufferBuilder) { + (&mut self.views_builder, &mut self.null_buffer_builder) + } } impl Default for GenericByteViewBuilder { diff --git a/arrow-array/src/builder/primitive_builder.rs b/arrow-array/src/builder/primitive_builder.rs index 41c65fe34e35..50abcacaa69f 100644 --- a/arrow-array/src/builder/primitive_builder.rs +++ b/arrow-array/src/builder/primitive_builder.rs @@ -187,7 +187,17 @@ impl PrimitiveBuilder { T::DATA_TYPE, data_type ); - Self { data_type, ..self } + // Type is Checked above + unsafe { self.with_data_type_unchecked(data_type) } + } + + /// Set the data type of the builder without checking if it is compatible + /// + /// # Safety + /// the DataType must be compatible with the type `T` + pub unsafe fn with_data_type_unchecked(mut self, data_type: DataType) -> Self { + self.data_type = data_type; + self } /// Returns the capacity of this builder measured in slots of type `T` @@ -284,20 +294,49 @@ impl PrimitiveBuilder { /// the iterator implement `TrustedLen` once that is stabilized. #[inline] pub unsafe fn append_trusted_len_iter(&mut self, iter: impl IntoIterator) { - let iter = iter.into_iter(); - let len = iter - .size_hint() - .1 - .expect("append_trusted_len_iter requires an upper bound"); - - self.null_buffer_builder.append_n_non_nulls(len); + let starting_len = self.len(); self.values_builder.append_trusted_len_iter(iter); + self.null_buffer_builder + .append_n_non_nulls(self.len() - starting_len); + } + + /// Builds the [`PrimitiveArray`] and consumes this builder. + pub fn build(self) -> PrimitiveArray { + let len = self.len(); + let Self { + values_builder, + null_buffer_builder, + data_type, + } = self; + let nulls = null_buffer_builder.build(); + + if let Some(nulls) = &nulls { + assert_eq!( + nulls.len(), + values_builder.len(), + "nulls/values length mismatch" + ); + } + let builder = ArrayData::builder(data_type) + .len(len) + .add_buffer(values_builder.build()) + .nulls(nulls); + + let array_data = unsafe { builder.build_unchecked() }; + PrimitiveArray::::from(array_data) } /// Builds the [`PrimitiveArray`] and reset this builder. pub fn finish(&mut self) -> PrimitiveArray { let len = self.len(); let nulls = self.null_buffer_builder.finish(); + if let Some(nulls) = &nulls { + assert_eq!( + nulls.len(), + self.values_builder.len(), + "nulls/values length mismatch" + ); + } let builder = ArrayData::builder(self.data_type.clone()) .len(len) .add_buffer(self.values_builder.finish()) @@ -312,6 +351,18 @@ impl PrimitiveBuilder { let len = self.len(); let nulls = self.null_buffer_builder.finish_cloned(); let values_buffer = Buffer::from_slice_ref(self.values_builder.as_slice()); + // Verify values and nulls buffers are the same length + // TODO for some reason this fails in the FixedSizeListBuilder + /* + if let Some(nulls) = &nulls { + assert_eq!( + nulls.len(), + values_buffer.len(), + "nulls/values length mismatch" + ); + } + + */ let builder = ArrayData::builder(self.data_type.clone()) .len(len) .add_buffer(values_buffer) @@ -348,6 +399,13 @@ impl PrimitiveBuilder { self.null_buffer_builder.as_slice_mut(), ) } + + /// Returns the inner value and null buffer builders. + /// + /// These must be kept in sync + pub fn inner_mut(&mut self) -> (&mut BufferBuilder, &mut NullBufferBuilder) { + (&mut self.values_builder, &mut self.null_buffer_builder) + } } impl PrimitiveBuilder

{ diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 19ca0fef1519..2fbc496cc2c7 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -543,8 +543,42 @@ impl MutableBuffer { iterator.for_each(|item| self.push(item)); } + /// Extends a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length. + /// + /// See [`MutableBuffer::from_trusted_len_iter`] for more details. + /// + /// # Safety + /// This method assumes that the iterator's size is correct and is undefined + /// behavior to use it on an iterator that reports an incorrect length. + #[inline] + pub unsafe fn extend_from_trusted_len_iter>( + &mut self, + iterator: I, + ) { + let item_size = std::mem::size_of::(); + let (lower, _) = iterator.size_hint(); + let additional = lower * item_size; + self.reserve(additional); + + // this is necessary because of https://github.com/rust-lang/rust/issues/32155 + let mut dst = unsafe { self.data.as_ptr().add(self.len) }; + for item in iterator { + // note how there is no reserve here (compared with `extend_from_iter`) + let src = item.to_byte_slice().as_ptr(); + std::ptr::copy_nonoverlapping(src, dst, item_size); + dst = dst.add(item_size); + } + self.len += additional; + assert_eq!( + dst.offset_from(self.data.as_ptr()) as usize, + self.len, + "Trusted iterator length was not accurately reported" + ); + } + /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length. - /// Prefer this to `collect` whenever possible, as it is faster ~60% faster. + /// Prefer this to `collect` whenever possible, as it is ~60% faster. + /// /// # Example /// ``` /// # use arrow_buffer::buffer::MutableBuffer; @@ -554,16 +588,18 @@ impl MutableBuffer { /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes /// ``` /// # Safety - /// This method assumes that the iterator's size is correct and is undefined behavior - /// to use it on an iterator that reports an incorrect length. - // This implementation is required for two reasons: - // 1. there is no trait `TrustedLen` in stable rust and therefore - // we can't specialize `extend` for `TrustedLen` like `Vec` does. - // 2. `from_trusted_len_iter` is faster. + /// This method assumes that the iterator's size is correct and is undefined + /// behavior to use it on an iterator that reports an incorrect length. + /// + /// This implementation is required for two reasons: + /// 1. there is no trait `TrustedLen` in stable Rust and therefore + /// we can't specialize `extend` for `TrustedLen` like `Vec` does. + /// 2. `from_trusted_len_iter` is faster. #[inline] pub unsafe fn from_trusted_len_iter>( iterator: I, ) -> Self { + // TODO: reduce duplication with `extend_from_trusted_len_iter` let item_size = std::mem::size_of::(); let (_, upper) = iterator.size_hint(); let upper = upper.expect("from_trusted_len_iter requires an upper limit"); diff --git a/arrow-buffer/src/builder/boolean.rs b/arrow-buffer/src/builder/boolean.rs index bdcc3a55dbf2..c61ed9078e92 100644 --- a/arrow-buffer/src/builder/boolean.rs +++ b/arrow-buffer/src/builder/boolean.rs @@ -244,6 +244,12 @@ impl BooleanBufferBuilder { self.buffer.as_slice_mut() } + /// Creates a [`BooleanBuffer`] annd consumes the builder + #[inline] + pub fn build(self) -> BooleanBuffer { + BooleanBuffer::new(self.buffer.into(), 0, self.len) + } + /// Creates a [`BooleanBuffer`] #[inline] pub fn finish(&mut self) -> BooleanBuffer { diff --git a/arrow-buffer/src/builder/mod.rs b/arrow-buffer/src/builder/mod.rs index f7e0e29dace4..1ef3ca8e4dee 100644 --- a/arrow-buffer/src/builder/mod.rs +++ b/arrow-buffer/src/builder/mod.rs @@ -330,13 +330,19 @@ impl BufferBuilder { /// the iterator implement `TrustedLen` once that is stabilized. #[inline] pub unsafe fn append_trusted_len_iter(&mut self, iter: impl IntoIterator) { - let iter = iter.into_iter(); - let len = iter - .size_hint() - .1 - .expect("append_trusted_len_iter expects upper bound"); - self.reserve(len); - self.extend(iter); + let starting_buffer_len = self.len; + debug_assert_eq!( + starting_buffer_len, + self.buffer.len() / std::mem::size_of::() + ); + self.buffer.extend_from_trusted_len_iter(iter.into_iter()); + self.len = self.buffer.len() / std::mem::size_of::(); + } + + /// Consume the builder and return a buffer + #[inline] + pub fn build(self) -> Buffer { + self.buffer.into() } /// Resets this builder and returns an immutable [Buffer]. diff --git a/arrow-buffer/src/builder/null.rs b/arrow-buffer/src/builder/null.rs index e6f426615be5..5cc88af4f597 100644 --- a/arrow-buffer/src/builder/null.rs +++ b/arrow-buffer/src/builder/null.rs @@ -193,6 +193,14 @@ impl NullBufferBuilder { } } + /// Builds the null buffer consuming the builder. + /// Returns `None` if the builder only contains `true`s. + pub fn build(self) -> Option { + self.bitmap_builder + .map(|bitmap_builder| bitmap_builder.build()) + .map(NullBuffer::new) + } + /// Builds the null buffer and resets the builder. /// Returns `None` if the builder only contains `true`s. pub fn finish(&mut self) -> Option { diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 486afbd14467..866c6935d3bd 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -31,7 +31,9 @@ //! ``` use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values}; -use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder}; +use arrow_array::builder::{ + BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder, +}; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; @@ -41,6 +43,44 @@ use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, FieldRef, Fields, SchemaRef}; use std::{collections::HashSet, ops::Add, sync::Arc}; +/// Extension trait for `ArrayBuilder`s which adds methods for appending +/// entire arrays based on a filter predicate. +/// +/// TODO move the methods from ArrayBuilders that already exist to this trait +pub trait ArrayBuilderExtAppend { + /// Appends all rows from `array` to the current array + /// + /// + fn append_array(&mut self, array: &ArrayRef) -> Result<(), ArrowError>; +} + +impl ArrayBuilderExtAppend for PrimitiveBuilder { + fn append_array(&mut self, array: &ArrayRef) -> Result<(), ArrowError> { + let array = array.as_primitive::(); + // TODO move append array into this module? + >::append_array(self, array); + Ok(()) + } +} + +impl ArrayBuilderExtAppend for BooleanBuilder { + fn append_array(&mut self, array: &ArrayRef) -> Result<(), ArrowError> { + // TODO move append array into this module? + let array = array.as_boolean(); + ::append_array(self, array); + Ok(()) + } +} + +impl ArrayBuilderExtAppend for GenericByteViewBuilder { + fn append_array(&mut self, array: &ArrayRef) -> Result<(), ArrowError> { + // TODO move append array into this module? + let array = array.as_byte_view::(); + >::append_array(self, array); + Ok(()) + } +} + fn binary_capacity(arrays: &[&dyn Array]) -> Capacities { let mut item_capacity = 0; let mut bytes_capacity = 0; diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index cf16140aad1f..99ece3c84503 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -20,13 +20,13 @@ use std::ops::AddAssign; use std::sync::Arc; -use arrow_array::builder::BooleanBufferBuilder; +use arrow_array::builder::{BooleanBufferBuilder, GenericByteViewBuilder, PrimitiveBuilder}; use arrow_array::cast::AsArray; use arrow_array::types::{ ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, ByteViewType, RunEndIndexType, }; use arrow_array::*; -use arrow_buffer::{bit_util, ArrowNativeType, BooleanBuffer, NullBuffer, RunEndBuffer}; +use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, NullBufferBuilder, RunEndBuffer}; use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator}; use arrow_data::transform::MutableArrayData; @@ -107,6 +107,31 @@ impl Iterator for IndexIterator<'_> { } } +/// Extension trait for `ArrayBuilder`s which adds add a method for appending rows from +/// an array based on a filter. +/// +/// Values from `array` are copied into the builder where the corresponding predicate +/// value is 1. +/// +/// This is the equivalent of calling [`filter`] and then [`concat`] on the resulting +/// arrays, but is more efficient as it avoids creating intermediate arrays. +/// +/// See also: +/// - InProgressRecordBatchBuilder (TODO add) +pub trait ArrayBuilderExtFilter { + /// Appends all rows from `array` to the current arrays where `predicate` is + /// `true`. + /// + /// See [`FilterPredicate`] for more details on how the predicate is applied. + /// + /// Panic's if array is not the correct type for this builder. + fn append_filtered( + &mut self, + array: &dyn Array, + predicate: &FilterPredicate, + ) -> Result<(), ArrowError>; +} + /// Counts the number of set bits in `filter` fn filter_count(filter: &BooleanArray) -> usize { filter.values().count_set_bits() @@ -174,7 +199,7 @@ pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray { pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> Result { let mut filter_builder = FilterBuilder::new(predicate); - if multiple_arrays(values.data_type()) { + if FilterBuilder::multiple_arrays(values.data_type()) { // Only optimize if filtering more than one array // Otherwise, the overhead of optimization can be more than the benefit filter_builder = filter_builder.optimize(); @@ -185,16 +210,6 @@ pub fn filter(values: &dyn Array, predicate: &BooleanArray) -> Result bool { - match data_type { - DataType::Struct(fields) => { - fields.len() > 1 || fields.len() == 1 && multiple_arrays(fields[0].data_type()) - } - DataType::Union(fields, UnionMode::Sparse) => !fields.is_empty(), - _ => false, - } -} - /// Returns a filtered [RecordBatch] where the corresponding elements of /// `predicate` are true. /// @@ -274,6 +289,19 @@ impl FilterBuilder { strategy: self.strategy, } } + + /// Returns true if the data type contains multiple arrays and hence would + /// benefit from [`FilterBuilder::optimize`] + pub fn multiple_arrays(data_type: &DataType) -> bool { + match data_type { + DataType::Struct(fields) => { + fields.len() > 1 + || fields.len() == 1 && Self::multiple_arrays(fields[0].data_type()) + } + DataType::Union(fields, UnionMode::Sparse) => !fields.is_empty(), + _ => false, + } + } } /// The iteration strategy used to evaluate [`FilterPredicate`] @@ -318,6 +346,8 @@ impl IterationStrategy { } /// A filtering predicate that can be applied to an [`Array`] +/// +/// See [`FilterBuilder`] to create a [`FilterPredicate`]. #[derive(Debug)] pub struct FilterPredicate { filter: BooleanArray, @@ -337,7 +367,95 @@ impl FilterPredicate { } } -fn filter_array(values: &dyn Array, predicate: &FilterPredicate) -> Result { +/// Appends the nulls from array for any filtered rows to the `null_buffer_builder` +/// +/// Panics if the strategy is +/// [`IterationStrategy::All`] or [`IterationStrategy::None`], which must be handled by the caller +fn append_filtered_nulls( + null_buffer_builder: &mut NullBufferBuilder, + array: &dyn Array, + predicate: &FilterPredicate, +) { + assert!(!matches!( + predicate.strategy, + IterationStrategy::All | IterationStrategy::None + )); + + let Some(nulls) = array.nulls() else { + // No nulls in the source array, so it anything selected by filter will be non null as well + null_buffer_builder.append_n_non_nulls(predicate.count()); + return; + }; + + // Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` + // TODO: maybe this could be improved to avoid materializing the temporary buffer + let nulls = filter_bits(nulls.inner(), predicate); + let nulls = NullBuffer::from(BooleanBuffer::new(nulls, 0, predicate.count)); + + null_buffer_builder.append_buffer(&nulls); +} + +impl ArrayBuilderExtFilter for PrimitiveBuilder { + fn append_filtered( + &mut self, + array: &dyn Array, + predicate: &FilterPredicate, + ) -> Result<(), ArrowError> { + // todo reserve space for the new values + // self.reserve(predicate.count); + + let array = array.as_primitive::(); + let values = array.values(); + + assert!(values.len() >= predicate.filter.len()); + + let (values_builder, null_buffer_builder) = self.inner_mut(); + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + append_filtered_nulls(null_buffer_builder, array, predicate); + for (start, end) in SlicesIterator::new(&predicate.filter) { + values_builder.append_slice(&values[start..end]); + } + } + IterationStrategy::Slices(slices) => { + append_filtered_nulls(null_buffer_builder, array, predicate); + for (start, end) in slices { + values_builder.append_slice(&values[*start..*end]); + } + } + IterationStrategy::IndexIterator => { + append_filtered_nulls(null_buffer_builder, array, predicate); + let iter = + IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); + // SAFETY: IndexIterator is trusted length + unsafe { values_builder.append_trusted_len_iter(iter) } + } + IterationStrategy::Indices(indices) => { + append_filtered_nulls(null_buffer_builder, array, predicate); + let iter = indices.iter().map(|x| values[*x]); + // safety: Vec + Map knows its length correctly + unsafe { + values_builder.append_trusted_len_iter(iter); + } + } + IterationStrategy::All => { + self.append_array(array); // Also appends nulls + } + IterationStrategy::None => {} + } + let (values_builder, null_buffer_builder) = self.inner_mut(); + assert_eq!(values_builder.len(), null_buffer_builder.len()); + Ok(()) + } +} + +/// filter an [`Array`] with a pre-created [`FilterPredicate`] +/// +/// See [`filter`] for a higher level API +pub fn filter_array( + values: &dyn Array, + predicate: &FilterPredicate, +) -> Result { if predicate.filter.len() > values.len() { return Err(ArrowError::InvalidArgumentError(format!( "Filter predicate of length {} is larger than target array of length {}", @@ -496,6 +614,9 @@ fn filter_null_mask( } /// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` +/// +/// Panics for `IterationStrategy::All` or `IterationStrategy::None` which must +/// be handled by the caller fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { let src = buffer.values(); let offset = buffer.offset(); @@ -530,7 +651,8 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { } builder.into() } - IterationStrategy::All | IterationStrategy::None => unreachable!(), + IterationStrategy::All => unreachable!(), + IterationStrategy::None => unreachable!(), } } @@ -550,56 +672,18 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA BooleanArray::from(data) } -#[inline(never)] -fn filter_native(values: &[T], predicate: &FilterPredicate) -> Buffer { - assert!(values.len() >= predicate.filter.len()); - - match &predicate.strategy { - IterationStrategy::SlicesIterator => { - let mut buffer = Vec::with_capacity(predicate.count); - for (start, end) in SlicesIterator::new(&predicate.filter) { - buffer.extend_from_slice(&values[start..end]); - } - buffer.into() - } - IterationStrategy::Slices(slices) => { - let mut buffer = Vec::with_capacity(predicate.count); - for (start, end) in slices { - buffer.extend_from_slice(&values[*start..*end]); - } - buffer.into() - } - IterationStrategy::IndexIterator => { - let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); - - // SAFETY: IndexIterator is trusted length - unsafe { MutableBuffer::from_trusted_len_iter(iter) }.into() - } - IterationStrategy::Indices(indices) => { - let iter = indices.iter().map(|x| values[*x]); - iter.collect::>().into() - } - IterationStrategy::All | IterationStrategy::None => unreachable!(), - } -} - /// `filter` implementation for primitive arrays fn filter_primitive(array: &PrimitiveArray, predicate: &FilterPredicate) -> PrimitiveArray where T: ArrowPrimitiveType, { - let values = array.values(); - let buffer = filter_native(values, predicate); - let mut builder = ArrayDataBuilder::new(array.data_type().clone()) - .len(predicate.count) - .add_buffer(buffer); - - if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { - builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); - } + let builder = PrimitiveBuilder::::with_capacity(predicate.count); + let mut builder = unsafe { builder.with_data_type_unchecked(array.data_type().clone()) }; + builder + .append_filtered(array, predicate) + .expect("Failed to append filtered values"); - let data = unsafe { builder.build_unchecked() }; - PrimitiveArray::from(data) + builder.build() } /// [`FilterBytes`] is created from a source [`GenericByteArray`] and can be @@ -744,23 +828,82 @@ where GenericByteArray::from(data) } +impl ArrayBuilderExtFilter for GenericByteViewBuilder { + fn append_filtered( + &mut self, + array: &dyn Array, + predicate: &FilterPredicate, + ) -> Result<(), ArrowError> { + // todo reserve space for the new values + // self.reserve(predicate.count); + + let array = array.as_byte_view::(); + let views = array.views(); + assert!(views.len() >= predicate.filter.len()); + + // Note, this is basically the same as `filter_native` -- TODO figure out how to reuse that code (maybe also for filter_primitive) + let (view_builder, null_buffer_builder) = self.inner_mut(); + let starting_view = view_builder.len(); + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + append_filtered_nulls(null_buffer_builder, array, predicate); + for (start, end) in SlicesIterator::new(&predicate.filter) { + view_builder.append_slice(&views[start..end]); + } + } + IterationStrategy::Slices(slices) => { + append_filtered_nulls(null_buffer_builder, array, predicate); + for (start, end) in slices { + view_builder.append_slice(&views[*start..*end]); + } + } + IterationStrategy::IndexIterator => { + append_filtered_nulls(null_buffer_builder, array, predicate); + let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| views[x]); + + // SAFETY: IndexIterator is trusted length + unsafe { view_builder.append_trusted_len_iter(iter) } + } + IterationStrategy::Indices(indices) => { + append_filtered_nulls(null_buffer_builder, array, predicate); + let iter = indices.iter().map(|x| views[*x]); + // Safety: Vec + Map knows its length correctly + unsafe { + view_builder.append_trusted_len_iter(iter); + } + } + IterationStrategy::All => { + // handles nulls and compaction as well + self.append_array(array); + return Ok(()); + } + IterationStrategy::None => { + return Ok(()); + } + } + + // Flush the in-progress buffer + unsafe { + // All views were coped from `array` + self.finalize_copied_views(starting_view, array); + } + + Ok(()) + } +} + /// `filter` implementation for byte view arrays. fn filter_byte_view( array: &GenericByteViewArray, predicate: &FilterPredicate, ) -> GenericByteViewArray { - let new_view_buffer = filter_native(array.views(), predicate); - - let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) - .len(predicate.count) - .add_buffer(new_view_buffer) - .add_buffers(array.data_buffers().to_vec()); - - if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { - builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); - } - - GenericByteViewArray::from(unsafe { builder.build_unchecked() }) + let mut builder = GenericByteViewBuilder::::with_capacity(predicate.count) + // turn off string copy coalescing + .with_target_buffer_load_factor(None); + builder + .append_filtered(array, predicate) + .expect("Failed to append filtered values"); + builder.finish() } fn filter_fixed_size_binary( diff --git a/arrow-select/src/incremental_array_builder.rs b/arrow-select/src/incremental_array_builder.rs new file mode 100644 index 000000000000..8bade658a600 --- /dev/null +++ b/arrow-select/src/incremental_array_builder.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Incrementally builds Arrow Arrays from a stream of values + +use crate::concat::ArrayBuilderExtAppend; +use crate::filter::{ArrayBuilderExtFilter, FilterPredicate}; +use arrow_array::builder::{ArrayBuilder, GenericByteViewBuilder, PrimitiveBuilder}; +use arrow_array::types::ByteViewType; +use arrow_array::{Array, ArrayRef, ArrowPrimitiveType}; +use arrow_schema::ArrowError; +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +/// This builder is used to incrementally build Arrow Arrays +/// +/// The difference between this and a regular `ArrayBuilder` is that this +/// builder includes a `filter` method that can quickly copy append subsets of +/// existing arrays to the incremental array. +/// +/// This is the same operation as applying `filter` then `concat` kernels +/// but is more efficient as it avoids the intermediate array creation and +/// does not need extra buffering. +/// +/// This is useful for scenarios where you want to build an output array +/// from the results of filtering or taking elements from existing arrays, +/// a common operation in data processing pipelines and query engines. +/// +/// (TODO diagram) +/// +/// TODO: add comments +/// +/// TODO note this can also be extended to support `take` operations as well +/// +/// TODO make this an extension trait for `ArrayBuilder`s +pub trait IncrementalArrayBuilder: + ArrayBuilder + ArrayBuilderExtFilter + ArrayBuilderExtAppend +{ +} + +/// Generic incremental array builder for any Arrow Array type +/// +/// Uses `concat` / `filter` to build the final output array. This is less +/// efficient than using a specific builder but works for any Arrow Array type +/// until we have implemented specific builders all types. +#[derive(Debug, Default)] +pub struct GenericIncrementalArrayBuilder { + /// In progress arrays being built + arrays: Vec, +} + +impl GenericIncrementalArrayBuilder { + /// Create a new generic incremental array builder + pub fn new() -> Self { + Default::default() + } +} + +impl ArrayBuilder for GenericIncrementalArrayBuilder { + fn is_empty(&self) -> bool { + self.arrays.is_empty() + } + + fn len(&self) -> usize { + self.arrays.iter().map(|a| a.len()).sum() + } + + /// Build the final output array by concatenating all appended arrays, and + /// resetting the builder state. + fn finish(&mut self) -> ArrayRef { + let output_array = self.finish_cloned(); + self.arrays.clear(); // Reset the builder state + output_array + } + + fn finish_cloned(&self) -> ArrayRef { + // must conform to concat signature + let concat_input: Vec<&dyn Array> = self.arrays.iter().map(|a| a.as_ref()).collect(); + crate::concat::concat(&concat_input).expect("concat should not fail") + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_box_any(self: Box) -> Box { + self + } +} + +impl ArrayBuilderExtFilter for GenericIncrementalArrayBuilder { + fn append_filtered( + &mut self, + array: &dyn Array, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + // Filter the array using the filter mask + let filtered_array = crate::filter::filter_array(array, filter)?; + self.arrays.push(filtered_array); + Ok(()) + } +} + +impl ArrayBuilderExtAppend for GenericIncrementalArrayBuilder { + fn append_array(&mut self, array: &ArrayRef) -> Result<(), ArrowError> { + self.arrays.push(Arc::clone(array)); + Ok(()) + } +} + +impl IncrementalArrayBuilder for GenericIncrementalArrayBuilder {} + +impl IncrementalArrayBuilder for PrimitiveBuilder {} + +impl IncrementalArrayBuilder for GenericByteViewBuilder {} diff --git a/arrow-select/src/incremental_batch_builder.rs b/arrow-select/src/incremental_batch_builder.rs new file mode 100644 index 000000000000..9c265323d6aa --- /dev/null +++ b/arrow-select/src/incremental_batch_builder.rs @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`IncrementalRecordBatchBuilder`] for incrementally building RecordBatches from other arrays + +use crate::filter::{FilterBuilder, FilterPredicate, SlicesIterator}; +use crate::incremental_array_builder::{GenericIncrementalArrayBuilder, IncrementalArrayBuilder}; +use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; +use arrow_array::{BooleanArray, RecordBatch}; +use arrow_schema::{ArrowError, DataType, SchemaRef}; +use std::borrow::Cow; +use std::collections::VecDeque; + +type ArrayBuilderImpl = Box; + +/// Incrementally creates `RecordBatch`es of limited size +/// +/// This structure implements the common pattern of incrementally creating +/// output batches of a specific size from an input stream of arrays. +/// +/// See: +/// +/// This is a convenience over [`IncrementalArrayBuilder`] which is used to +/// build arrays for each column in the `RecordBatch`. +/// +/// Which rows are selected from the input arrays are be chosen using one of the +/// following mechanisms: +/// +/// 1. `concat`-enated: all rows from the input array are appended +/// 2. `filter`-ed: the input array is filtered using a `BooleanArray` +/// 3. `take`-n: a subset of the input array is selected based on the indices provided in a `UInt32Array` or similar. +/// +/// This structure handles multiple arrays +pub struct IncrementalRecordBatchBuilder { + /// The schema of the RecordBatches being built + schema: SchemaRef, + /// The maximum size, in rows, of the arrays being built + batch_size: usize, + /// Should we 'optimize' the predicate before applying it? + optimize_predicate: bool, + /// batches that are "finished" (have batch_size rows) + finished: VecDeque, + /// The current arrays being built + current: Vec, +} + +impl std::fmt::Debug for IncrementalRecordBatchBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IncrementalRecordBatchBuilder") + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .field("optimize_predicate", &self.optimize_predicate) + .field("finished", &self.finished.len()) + .field("current", &self.current.len()) + .finish() + } +} + +impl IncrementalRecordBatchBuilder { + /// Creates a new builder with the specified batch size and schema + /// + /// There must be at least one column in the schema, and the batch size must be greater than 0. + pub fn try_new(schema: SchemaRef, batch_size: usize) -> Result { + if schema.fields().is_empty() { + return Err(ArrowError::InvalidArgumentError( + "IncrementalRecordBatchBuilder Schema must have at least one field".to_string(), + )); + } + + if batch_size == 0 { + return Err(ArrowError::InvalidArgumentError( + "IncrementalRecordBatchBuilder batch size must be greater than 0".to_string(), + )); + } + + let current = schema + .fields() + .iter() + .map(|field| instantiate_builder(field.data_type(), batch_size)) + .collect::>(); + + // Optimize the predicate if we will use it more than once (have more than 1 array) + let optimize_predicate = schema.fields().len() > 1 + || schema + .fields() + .iter() + .any(|f| FilterBuilder::multiple_arrays(f.data_type())); + + Ok(Self { + schema, + batch_size, + optimize_predicate, + finished: VecDeque::new(), + current, + }) + } + + /// Return the current schema of the builder + #[allow(dead_code)] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Combines all arrays in `current` into a new array in `finished` and returns the + /// number of rows in the array added to `self.finished` + fn finish_current(&mut self) -> Result { + debug_assert!( + self.current + .iter() + .all(|b| b.is_empty() == self.current[0].is_empty()), + "All builders in current must match is_empty" + ); + + if self.current[0].is_empty() { + // no rows in progress, so nothing to do + return Ok(0); + } + let new_arrays: Vec<_> = self + .current + .iter_mut() + .map(|builder| builder.finish()) + .collect(); + let batch = RecordBatch::try_new(self.schema.clone(), new_arrays)?; + + let num_rows = batch.num_rows(); + self.finished.push_back(batch); + Ok(num_rows) + } + + /// returns the number of rows currently in progress + pub fn num_current_rows(&self) -> usize { + debug_assert!( + self.current + .iter() + .all(|b| b.len() == self.current[0].len()), + "All builders in current must have the same length" + ); + self.current[0].len() + } + + /// Return the next `RecordBatch` if it is ready, or `None` if not + /// + /// This allows the builder to be used in a streaming fashion where rows are + /// added incrementally and produced in batches. + /// + /// Each batch + #[allow(dead_code)] + pub fn next_batch(&mut self) -> Option { + // return the last finished batch + self.finished.pop_front() + } + + /// Finalize this builder, returning any remaining batches + pub fn build(mut self) -> Result, ArrowError> { + self.finish_current()?; + let Self { finished, .. } = self; + Ok(finished) + } + + /// Appends all rows from the input batch to the current arrays where + /// `filter_array` is `true`. + /// + /// This method optimizes for the case where the filter selects all or no rows + /// and ensures all output arrays in `current` is at most `batch_size` rows long. + pub fn append_filtered( + &mut self, + mut batch: RecordBatch, + filter_array: &BooleanArray, + ) -> Result<(), ArrowError> { + let mut filter_array = Cow::Borrowed(filter_array); + loop { + // how many more rows do we need to fill the current array? + let row_limit = self.batch_size - self.num_current_rows(); + match get_filter_limit(&filter_array, row_limit) { + FilterLimit::None => { + break; + } + FilterLimit::Filter => { + let predicate = create_predicate(&filter_array, self.optimize_predicate); + let columns = batch.columns().iter(); + for (builder, array) in self.current.iter_mut().zip(columns) { + builder.append_filtered(array, &predicate)?; + } + break; + } + FilterLimit::Concat => { + let columns = batch.columns().iter(); + for (builder, array) in self.current.iter_mut().zip(columns) { + builder.append_array(array)?; // append the entire array + } + break; + } + FilterLimit::Slice { end } => { + // can only fit a slice of the filter into the current array + let sliced_filter = filter_array.slice(0, end); + let remain_filter = filter_array.slice(end, filter_array.len() - end); + let sliced_batch = batch.slice(0, end); + let remain_batch = batch.slice(end, batch.num_rows() - end); + + let predicate = create_predicate(&sliced_filter, self.optimize_predicate); + let columns = sliced_batch.columns().iter(); + for (builder, array) in self.current.iter_mut().zip(columns) { + // append the sliced array and filter + builder.append_filtered(array, &predicate)?; + } + let completed_rows = self.finish_current()?; + assert_eq!(completed_rows, self.batch_size); + + // update the filter / array with the slices + filter_array = Cow::Owned(remain_filter); + batch = remain_batch; + } + } + } + + // Finish the current batch if it is full + assert!( + self.num_current_rows() <= self.batch_size, + "Current batch should not exceed batch size. Current rows: {}, batch size: {}", + self.num_current_rows(), + self.batch_size + ); + if self.num_current_rows() >= self.batch_size { + let completed_rows = self.finish_current()?; + assert_eq!(completed_rows, self.batch_size); + } + Ok(()) + } +} + +fn create_predicate(filter: &BooleanArray, optimize: bool) -> FilterPredicate { + // Create a filter predicate from the BooleanArray + let mut builder = FilterBuilder::new(filter); + if optimize { + // Optimize the predicate if we have more than one array or the filter is complex + builder = builder.optimize() + } + builder.build() +} + +#[derive(Debug)] +enum FilterLimit { + /// The filter selects no rows + None, + /// Use the entire filter as is + Filter, + /// The filter selects all rows, so need to consult it + Concat, + /// Can only fit a slice of the filter 0,end + Slice { + /// end of the slice from the filter + end: usize, + }, +} + +/// Returns the number of rows from filter which must be taken to ensure +/// that there are no more than `row_limit` rows in a filtered result. +fn get_filter_limit(filter: &BooleanArray, row_limit: usize) -> FilterLimit { + // Calculating the number of true values in the filter is very fast so check + // it first for the common case. + let true_count = filter.true_count(); + + if true_count == 0 { + // no rows selected by the filter + return FilterLimit::None; + } + + if true_count <= row_limit { + return if true_count == filter.len() { + FilterLimit::Concat + } else { + FilterLimit::Filter + }; + } + + // there are more true values than remaining, so we need to slice the filter + let mut slices_iter = SlicesIterator::new(filter); + // how many rows are already included in filter? + let mut in_filter_rows = 0; + loop { + let Some((start, end)) = slices_iter.next() else { + panic!("slices iterator should not be empty if true_count > remaining"); + }; + + let slice_len = end - start; + if slice_len + in_filter_rows < row_limit { + // this slice does not have enough rows, so add it to the count + in_filter_rows += slice_len; + } else { + // adjust end so it only selects the remaining rows + let rows_needed_in_slice = row_limit - in_filter_rows; + return FilterLimit::Slice { + end: start + rows_needed_in_slice, + }; + } + } +} +/* +/// Create an incremental array builder for the given data type +/// +/// Uses a generic implementation if we don't have a specific builder for the type +fn instantiate_builder(data_type: &DataType, batch_size: usize) -> ArrayBuilderImpl { + // Create a primitive builder for the given data type + macro_rules! primitive_builder_helper { + ($t:ty, $DT:expr, $SZ:expr) => { + Box::new(PrimitiveBuilder::<$t>::with_capacity($SZ).with_data_type($DT.clone())) + }; + } + + downcast_primitive! { + data_type => (primitive_builder_helper, data_type, batch_size), + DataType::Utf8View => Box::new(StringViewBuilder::with_capacity(batch_size)), + DataType::BinaryView => Box::new(BinaryViewBuilder::with_capacity(batch_size)), + + // Default to using the generic builder for all other types + // TODO file tickets tracking specific builders for other types + _ => Box::new(GenericIncrementalArrayBuilder::new()), + } +} +*/ + +/// Create an incremental array builder for the given data type +/// +/// Uses a generic implementation if we don't have a specific builder for the type +/// +/// TEMPORARY test: only use string view builder for now to test +/// with https://github.com/apache/datafusion/pull/16208 +fn instantiate_builder(data_type: &DataType, batch_size: usize) -> ArrayBuilderImpl { + match data_type { + DataType::Utf8View => { + Box::new(StringViewBuilder::with_capacity(batch_size).with_initial_capacity(batch_size)) + } + DataType::BinaryView => { + Box::new(BinaryViewBuilder::with_capacity(batch_size).with_initial_capacity(batch_size)) + } + + // Default to using the generic builder for all other types + // + _ => Box::new(GenericIncrementalArrayBuilder::new()), + } +} diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs index 1648dc2833d8..184661bb738a 100644 --- a/arrow-select/src/lib.rs +++ b/arrow-select/src/lib.rs @@ -27,6 +27,8 @@ pub mod concat; mod dictionary; pub mod filter; +pub mod incremental_array_builder; +pub mod incremental_batch_builder; pub mod interleave; pub mod nullif; pub mod take; diff --git a/arrow/benches/take_kernels.rs b/arrow/benches/take_kernels.rs index a7e70d2b03db..8f6f92a375e3 100644 --- a/arrow/benches/take_kernels.rs +++ b/arrow/benches/take_kernels.rs @@ -192,6 +192,19 @@ fn add_benchmark(c: &mut Criterion) { "take primitive run logical len: 1024, physical len: 512, indices: 1024", |b| b.iter(|| bench_take(&values, &indices)), ); + + let values = create_fsb_array(1024, 0.0, 12); + let indices = create_random_index(1024, 0.0); + c.bench_function("take primitive fsb value len: 12, indices: 1024", |b| { + b.iter(|| bench_take(&values, &indices)) + }); + + let values = create_fsb_array(1024, 0.5, 12); + let indices = create_random_index(1024, 0.0); + c.bench_function( + "take primitive fsb value len: 12, null values, indices: 1024", + |b| b.iter(|| bench_take(&values, &indices)), + ); } criterion_group!(benches, add_benchmark); diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs index bff7214718fc..09456ddcbc8c 100644 --- a/arrow/src/compute/mod.rs +++ b/arrow/src/compute/mod.rs @@ -37,3 +37,6 @@ pub use self::kernels::take::*; pub use self::kernels::temporal::*; pub use self::kernels::union_extract::*; pub use self::kernels::window::*; + +pub use arrow_select::incremental_array_builder::*; +pub use arrow_select::incremental_batch_builder::IncrementalRecordBatchBuilder; diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 14a475859810..a0873476cb6f 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -23,7 +23,7 @@ use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::{ - make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, + make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, }; @@ -37,11 +37,18 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; /// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader pub(crate) struct ArrayReaderBuilder<'a> { row_groups: &'a dyn RowGroups, + cached_predicate_result: Option<&'a CachedPredicateResult>, } impl<'a> ArrayReaderBuilder<'a> { - pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self { - Self { row_groups } + pub(crate) fn new( + row_groups: &'a dyn RowGroups, + cached_predicate_result: Option<&'a CachedPredicateResult>, + ) -> Self { + Self { + row_groups, + cached_predicate_result, + } } /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. @@ -68,6 +75,10 @@ impl<'a> ArrayReaderBuilder<'a> { field: &ParquetField, mask: &ProjectionMask, ) -> Result>> { + if let Some(builder) = self.build_cached_reader(field, mask)? { + return Ok(Some(builder)); + } + match field.field_type { ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask), ParquetFieldType::Group { .. } => match &field.arrow_type { @@ -81,6 +92,33 @@ impl<'a> ArrayReaderBuilder<'a> { } } + /// Build cached array reader if the field is in the projection mask and in the cache + fn build_cached_reader( + &self, + field: &ParquetField, + mask: &ProjectionMask, + ) -> Result>> { + let Some(cached_predicate_result) = self.cached_predicate_result else { + return Ok(None); + }; + + // TODO how to find a cached struct / list + // (Probably have to cache the individual fields) + let ParquetFieldType::Primitive { + col_idx, + primitive_type: _, + } = &field.field_type + else { + return Ok(None); + }; + + if !mask.leaf_included(*col_idx) { + return Ok(None); + } + + cached_predicate_result.build_reader(*col_idx) + } + /// Build array reader for map type. fn build_map_reader( &self, @@ -375,7 +413,8 @@ mod tests { ) .unwrap(); - let array_reader = ArrayReaderBuilder::new(&file_reader) + let cached_predicate_result = None; + let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/cached/builder.rs b/parquet/src/arrow/array_reader/cached/builder.rs new file mode 100644 index 000000000000..f32d046a46e8 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/builder.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::CachedPredicateResult; +use crate::arrow::arrow_reader::RowSelection; +use crate::arrow::ProjectionMask; +use arrow_array::{Array, BooleanArray, RecordBatch}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use arrow_select::filter::prep_null_mask_filter; +use arrow_select::incremental_batch_builder::IncrementalRecordBatchBuilder; +use std::sync::Arc; + +/// Incrementally builds the result of evaluating an ArrowPredicate on +/// a RowGroup. +#[derive(Debug)] +pub(crate) struct CachedPredicateResultBuilder { + /// What is being cached + strategy: CacheStrategy, + /// Total number of columns in the original parquet schema + num_original_columns: usize, + /// Any filters that have been applied. Note this the complete set of filters + /// that have been applied to the cached batches. + filters: Vec, +} + +#[derive(Debug)] +enum CacheStrategy { + /// Don't cache any results + None, + /// Cache the result of filtering all columns in the filter schema + All { + /// The builder for the cached batches + cached_batches_builder: IncrementalRecordBatchBuilder, + /// The indexes of the columns in the original parquet schema that are in the projection + original_projection: Vec, + }, + /// Cache the result of filtering a subset of the columns in the filter schema + Subset { + /// The builder for the cached batches + cached_batches_builder: IncrementalRecordBatchBuilder, + /// The indexes of the columns in the filter schema that are in the projection + filter_projection: Vec, + /// The indexes of the columns in the original parquet schema that are in the projection + original_projection: Vec, + }, +} + +impl CachedPredicateResultBuilder { + /// Create a new CachedPredicateResultBuilder + /// + /// # Arguments: + /// * `num_original_columns`: The number of columns in the original parquet schema + /// * `schema`: The schema of the filtered record batch (not the original parquet schema) + /// * `filter_mask`: which columns of the original parquet schema did the filter columns come from? + /// * `projection_mask`: which columns of the original parquet schema are in the final projection? + /// + /// This structure does not cache filter results for the columns that are not + /// in the projection mask. This is because the filter results are not needed + pub(crate) fn try_new( + num_original_columns: usize, + filter_schema: &SchemaRef, + filter_mask: &ProjectionMask, + projection_mask: &ProjectionMask, + batch_size: usize, + ) -> Result { + let (filter_mask_inner, projection_mask_inner) = + match (filter_mask.mask(), projection_mask.mask()) { + (Some(filter_mask), Some(projection_mask)) => (filter_mask, projection_mask), + // None means "select all columns" so in this case cache all filtered columns + (Some(filter_mask), None) => (filter_mask, filter_mask), + // None means "select all columns" so in this case cache all columns used in projection + (None, Some(projection_mask)) => (projection_mask, projection_mask), + (None, None) => { + // this means all columns are in the projection *and* filter so cache them all when possible + let cached_batches_builder = IncrementalRecordBatchBuilder::try_new( + Arc::clone(filter_schema), + batch_size, + )?; + let strategy = CacheStrategy::All { + cached_batches_builder, + original_projection: (0..num_original_columns).collect(), + }; + return { + Ok(Self { + strategy, + num_original_columns, + filters: vec![], + }) + }; + } + }; + + // Otherwise, need to select a subset of the fields from each batch to cache + + // This is an iterator over the fields of the schema of batches passed + // to the filter. + let mut filter_field_iter = filter_schema.fields.iter().enumerate(); + + let mut filter_projection = vec![]; + let mut original_projection = vec![]; + let mut fields = vec![]; + + // Iterate over the masks from the original schema + assert_eq!(filter_mask_inner.len(), projection_mask_inner.len()); + for (original_index, (&in_filter, &in_projection)) in filter_mask_inner + .iter() + .zip(projection_mask_inner.iter()) + .enumerate() + { + if !in_filter { + continue; + } + // take next field from the filter schema + let (filter_index, field) = + filter_field_iter.next().expect("mismatch in field lengths"); + if !in_projection { + // this field is not in the projection, so don't cache it + continue; + } + // this field is both in filter and the projection, so cache the results + filter_projection.push(filter_index); + original_projection.push(original_index); + fields.push(Arc::clone(field)); + } + let strategy = if fields.is_empty() { + CacheStrategy::None + } else { + let cached_batches_builder = + IncrementalRecordBatchBuilder::try_new(Arc::new(Schema::new(fields)), batch_size)?; + CacheStrategy::Subset { + cached_batches_builder, + filter_projection, + original_projection, + } + }; + + Ok(Self { + strategy, + num_original_columns, + filters: vec![], + }) + } + + /// Add a new batch and filter to the builder + pub(crate) fn add( + &mut self, + batch: RecordBatch, + mut filter: BooleanArray, + ) -> crate::errors::Result<()> { + if filter.null_count() > 0 { + filter = prep_null_mask_filter(&filter); + } + + match &mut self.strategy { + CacheStrategy::None => {} + CacheStrategy::All { + cached_batches_builder, + .. + } => { + cached_batches_builder.append_filtered(batch, &filter)?; + } + CacheStrategy::Subset { + cached_batches_builder, + ref filter_projection, + .. + } => { + // If we have a filter projection, we need to project the batch + // to only the columns that are in the filter projection + let projected_batch = batch.project(filter_projection)?; + cached_batches_builder.append_filtered(projected_batch, &filter)?; + } + } + + self.filters.push(filter); + + Ok(()) + } + + /// Return (selection, maybe_cached_predicate_result) that represents the rows + /// that were selected and batches that were evaluated. + pub(crate) fn build( + self, + ) -> crate::errors::Result<(RowSelection, Option)> { + let Self { + strategy, + num_original_columns, + filters, + } = self; + + let new_selection = RowSelection::from_filters(&filters); + + match strategy { + CacheStrategy::None => Ok((new_selection, None)), + CacheStrategy::All { + cached_batches_builder, + original_projection, + } + | CacheStrategy::Subset { + cached_batches_builder, + original_projection, + .. + } => { + // explode out the cached batches into the proper place in the original schema + let completed_batches = cached_batches_builder.build()?; + let mut cached_result = CachedPredicateResult::new(num_original_columns, filters); + for (batch_index, original_idx) in original_projection.iter().enumerate() { + let mut column_arrays = Vec::with_capacity(completed_batches.len()); + for batch in &completed_batches { + column_arrays.push(Arc::clone(batch.column(batch_index))); + } + cached_result.add_result(*original_idx, column_arrays); + } + + Ok((new_selection, Some(cached_result))) + } + } + } +} diff --git a/parquet/src/arrow/array_reader/cached/mod.rs b/parquet/src/arrow/array_reader/cached/mod.rs new file mode 100644 index 000000000000..8f154f15ef0b --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/mod.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implements a cached column reader that provides data using +//! previously decoded / filtered arrays + +mod builder; +mod reader; + +pub(crate) use builder::CachedPredicateResultBuilder; +use reader::CachedArrayReader; + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::Result; +use arrow_array::{ArrayRef, BooleanArray}; + +/// The result of evaluating a predicate on a RowGroup with a specific +/// RowSelection +/// +/// The flow is: +/// * Decode with a RowSelection +/// * Apply a predicate --> this result +#[derive(Clone)] +pub(crate) struct CachedPredicateResult { + /// Map of parquet schema column index to the result of evaluating the predicate + /// on that column. + /// + /// NOTE each array already has had `filters` applied + /// + /// If `Some`, it is a set of arrays that make up the result. Each has + /// batch_rows rows except for the last + arrays: Vec>>, + /// The results of evaluating the predicate (this has already been applied to the + /// cached results). + filters: Vec, +} + +impl CachedPredicateResult { + pub(crate) fn new(num_columns: usize, filters: Vec) -> Self { + Self { + arrays: vec![None; num_columns], + filters, + } + } + + /// Add the specified array to the cached result + pub fn add_result(&mut self, column_index: usize, arrays: Vec) { + // TODO how is this possible to end up with previously cached arrays? + //assert!(self.arrays.get(column_index).is_none(), "column index {} already has a cached array", column_index); + self.arrays[column_index] = Some(arrays); + } + + /// Returns an array reader for the given column index, if any, that reads from the cache rather + /// than the original column chunk + pub(crate) fn build_reader(&self, col_index: usize) -> Result>> { + let Some(array) = &self.arrays[col_index] else { + return Ok(None); + }; + + Ok(Some(Box::new(CachedArrayReader::new( + array.clone(), + &self.filters, + )))) + } +} diff --git a/parquet/src/arrow/array_reader/cached/reader.rs b/parquet/src/arrow/array_reader/cached/reader.rs new file mode 100644 index 000000000000..881f6a11b410 --- /dev/null +++ b/parquet/src/arrow/array_reader/cached/reader.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use arrow_array::{new_empty_array, Array, ArrayRef, BooleanArray}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::VecDeque; + +pub(crate) struct CachedArrayReader { + /// The cached arrays. These should already be broken down into the correct batch_size chunks + cached_arrays: VecDeque, + data_type: DataType, + // /// The filter that was applied to the cached array (that has already been applied) + //filter: BooleanArray, + /// The length of the currently "in progress" array + current_length: usize, +} + +impl CachedArrayReader { + pub(crate) fn new(cached_arrays: Vec, _filters: &[BooleanArray]) -> Self { + //let input: Vec<&dyn Array> = filters.iter().map(|b| b as &dyn Array).collect::>(); + //let filter = concat(&input).unwrap().as_boolean().clone(); + let data_type = cached_arrays + .first() + .expect("had at least one array") + .data_type() + .clone(); + Self { + cached_arrays: VecDeque::from(cached_arrays), + data_type, + current_length: 0, + } + } +} + +impl ArrayReader for CachedArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &self.data_type + } + + fn read_records(&mut self, batch_size: usize) -> crate::errors::Result { + // since the entire array is cached, reads always succeed + self.current_length += batch_size; + Ok(batch_size) + } + + // Produce the "in progress" batch + fn consume_batch(&mut self) -> crate::errors::Result { + if self.current_length == 0 { + return Ok(new_empty_array(&self.data_type)); + } + + let mut next_array = self.cached_arrays.pop_front().ok_or_else(|| { + crate::errors::ParquetError::General( + "Internal error: no more cached arrays".to_string(), + ) + })?; + + // the next batch is the next array in the queue + // when a limit is applied, the next array may be smaller than the cached batch size, which is fine as we are + // just consuming the next available array + // TODO take this limit into account as part of the filter creation + // TODO this was hit by DataFusion tests, not arrow-rs tests, so there is a gap in our testing + // TODO add coverage + if self.current_length < next_array.len() { + next_array = next_array.slice(0, self.current_length); + } + assert_eq!(self.current_length, next_array.len()); + self.current_length = 0; + Ok(next_array) + } + + fn skip_records(&mut self, num_records: usize) -> crate::errors::Result { + // todo!() + // it would be good to verify the pattern of read/consume matches + // the boolean array + Ok(num_records) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None // TODO this is likely not right for structured types + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None // TODO this is likely not right for structured types + } +} diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 66c4f30b3c29..1d473ac1fc0d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -563,7 +563,8 @@ mod tests { ) .unwrap(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader) + let cached_predicate_result = None; + let mut array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 94d61c9eacf5..4e6b26045e85 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -42,6 +42,7 @@ mod null_array; mod primitive_array; mod struct_array; +mod cached; #[cfg(test)] mod test_util; @@ -50,6 +51,7 @@ pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use byte_view_array::make_byte_view_array_reader; +pub(crate) use cached::{CachedPredicateResult, CachedPredicateResultBuilder}; #[allow(unused_imports)] // Only used for benchmarks pub use fixed_len_byte_array::make_fixed_len_byte_array_reader; pub use fixed_size_list_array::FixedSizeListArrayReader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 2e5809304c9f..35c1879f78e4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -716,6 +716,8 @@ impl ParquetRecordBatchReaderBuilder { .row_groups .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect()); + let num_original_columns = self.metadata.file_metadata().schema_descr().num_columns(); + let reader = ReaderRowGroups { reader: Arc::new(self.input.0), metadata: self.metadata, @@ -733,14 +735,21 @@ impl ParquetRecordBatchReaderBuilder { break; } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + // TODO move this into the read_plan?? + let array_reader = + ArrayReaderBuilder::new(&reader, plan_builder.cached_predicate_result()) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + plan_builder = plan_builder.with_predicate( + num_original_columns, + array_reader, + predicate.as_mut(), + &self.projection, + )?; } } - let array_reader = ArrayReaderBuilder::new(&reader) + let array_reader = ArrayReaderBuilder::new(&reader, plan_builder.cached_predicate_result()) .build_array_reader(self.fields.as_deref(), &self.projection)?; let read_plan = plan_builder @@ -941,7 +950,8 @@ impl ParquetRecordBatchReader { batch_size: usize, selection: Option, ) -> Result { - let array_reader = ArrayReaderBuilder::new(row_groups) + let cached_predicate_result = None; + let array_reader = ArrayReaderBuilder::new(row_groups, cached_predicate_result) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index e083fb822be4..da76b775d317 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -18,13 +18,15 @@ //! [`ReadPlan`] and [`ReadPlanBuilder`] for determining which rows to read //! from a Parquet file -use crate::arrow::array_reader::ArrayReader; +use crate::arrow::array_reader::{ + ArrayReader, CachedPredicateResult, CachedPredicateResultBuilder, +}; use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelector, }; +use crate::arrow::ProjectionMask; use crate::errors::{ParquetError, Result}; -use arrow_array::Array; -use arrow_select::filter::prep_null_mask_filter; +use arrow_array::RecordBatchReader; use std::collections::VecDeque; /// A builder for [`ReadPlan`] @@ -33,6 +35,8 @@ pub(crate) struct ReadPlanBuilder { batch_size: usize, /// Current to apply, includes all filters selection: Option, + /// Cached result of evaluating some columns with the RowSelection + cached_predicate_result: Option, } impl ReadPlanBuilder { @@ -41,6 +45,7 @@ impl ReadPlanBuilder { Self { batch_size, selection: None, + cached_predicate_result: None, } } @@ -56,6 +61,11 @@ impl ReadPlanBuilder { self.selection.as_ref() } + /// Returns the currently cached predicate result, if any + pub(crate) fn cached_predicate_result(&self) -> Option<&CachedPredicateResult> { + self.cached_predicate_result.as_ref() + } + /// Specifies the number of rows in the row group, before filtering is applied. /// /// Returns a [`LimitedReadPlanBuilder`] that can apply @@ -83,24 +93,48 @@ impl ReadPlanBuilder { /// Evaluates an [`ArrowPredicate`], updating this plan's `selection` /// - /// If the current `selection` is `Some`, the resulting [`RowSelection`] - /// will be the conjunction of the existing selection and the rows selected - /// by `predicate`. + /// # Arguments + /// + /// * `num_original_columns`: The number of columns in the original parquet + /// schema. + /// + /// * `array_reader`: The array reader to use for evaluating the predicate. + /// must be configured with the projection mask specified by + /// [`ArrowPredicate::projection`] for the `predicate`. + /// + /// * `predicate`: The predicate to evaluate /// - /// Note: pre-existing selections may come from evaluating a previous predicate - /// or if the [`ParquetRecordBatchReader`] specified an explicit + /// * `projection`: The projection mask that will be selected. This code will + /// potentially cache the results of filtering columns that also appear in the + /// projection mask. + /// + /// If `this.selection` is `Some`, the resulting [`RowSelection`] will be + /// the conjunction of it and the rows selected by `predicate` (they will be + /// `AND`ed). + /// + /// Note: A pre-existing selection may come from evaluating a previous + /// predicate or if the [`ParquetRecordBatchReader`] specifies an explicit /// [`RowSelection`] in addition to one or more predicates. pub(crate) fn with_predicate( mut self, + num_original_columns: usize, array_reader: Box, predicate: &mut dyn ArrowPredicate, + projection_mask: &ProjectionMask, ) -> Result { + // Prepare to decode all rows in the selection to evaluate the predicate let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); - let mut filters = vec![]; + let mut cached_results_builder = CachedPredicateResultBuilder::try_new( + num_original_columns, + &reader.schema(), + predicate.projection(), + projection_mask, + self.batch_size, + )?; for maybe_batch in reader { - let maybe_batch = maybe_batch?; - let input_rows = maybe_batch.num_rows(); - let filter = predicate.evaluate(maybe_batch)?; + let batch = maybe_batch?; + let input_rows = batch.num_rows(); + let filter = predicate.evaluate(batch.clone())?; // Since user supplied predicate, check error here to catch bugs quickly if filter.len() != input_rows { return Err(arrow_err!( @@ -108,17 +142,16 @@ impl ReadPlanBuilder { filter.len() )); } - match filter.null_count() { - 0 => filters.push(filter), - _ => filters.push(prep_null_mask_filter(&filter)), - }; + cached_results_builder.add(batch, filter)?; } - let raw = RowSelection::from_filters(&filters); + let (raw, cached_predicate_result) = cached_results_builder.build()?; self.selection = match self.selection.take() { Some(selection) => Some(selection.and_then(&raw)), None => Some(raw), }; + + self.cached_predicate_result = cached_predicate_result; Ok(self) } @@ -131,6 +164,7 @@ impl ReadPlanBuilder { let Self { batch_size, selection, + cached_predicate_result, } = self; let selection = selection.map(|s| s.trim().into()); @@ -138,6 +172,7 @@ impl ReadPlanBuilder { ReadPlan { batch_size, selection, + cached_predicate_result, } } } @@ -234,7 +269,11 @@ pub(crate) struct ReadPlan { /// The number of rows to read in each batch batch_size: usize, /// Row ranges to be selected from the data source + /// TODO update this to use something more efficient + /// See selection: Option>, + /// Cached result of evaluating some column(s) with the current RowSelection + cached_predicate_result: Option, } impl ReadPlan { @@ -243,6 +282,11 @@ impl ReadPlan { self.selection.as_mut() } + /// Returns the current cached predicate result, if any + pub(crate) fn cached_predicate_result(&self) -> Option<&CachedPredicateResult> { + self.cached_predicate_result.as_ref() + } + /// Return the number of rows to read in each output batch #[inline(always)] pub fn batch_size(&self) -> usize { diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..a2c1b84b8259 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -599,6 +599,11 @@ where let filter = self.filter.as_mut(); let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); + let num_original_columns = row_group + .metadata + .file_metadata() + .schema_descr() + .num_columns(); // Update selection based on any filters if let Some(filter) = filter { @@ -613,10 +618,16 @@ where .fetch(&mut self.input, predicate.projection(), selection) .await?; - let array_reader = ArrayReaderBuilder::new(&row_group) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + let array_reader = + ArrayReaderBuilder::new(&row_group, plan_builder.cached_predicate_result()) + .build_array_reader(self.fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + plan_builder = plan_builder.with_predicate( + num_original_columns, + array_reader, + predicate.as_mut(), + &projection, + )?; } } @@ -661,7 +672,7 @@ where let plan = plan_builder.build(); - let array_reader = ArrayReaderBuilder::new(&row_group) + let array_reader = ArrayReaderBuilder::new(&row_group, plan.cached_predicate_result()) .build_array_reader(self.fields.as_deref(), &projection)?; let reader = ParquetRecordBatchReader::new(array_reader, plan); diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index e15a8d9b0203..069c75c9a3e8 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -251,25 +251,17 @@ pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; /// #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionMask { - /// If `Some`, a leaf column should be included if the value at + /// If present, a leaf column should be included if the value at /// the corresponding index is true /// - /// If `None`, all columns should be included + /// If `None`, include all columns /// - /// # Examples + /// # Example /// - /// Given the original parquet schema with leaf columns is `[a, b, c, d]` - /// - /// A mask of `[true, false, true, false]` will result in a schema 2 - /// elements long: + /// If the original parquet schema is `[a, b, c, d]` and the mask is `[true, + /// false, true, false]`, then the resulting schema will be 2 elements long: /// * `fields[0]`: `a` /// * `fields[1]`: `c` - /// - /// A mask of `None` will result in a schema 4 elements long: - /// * `fields[0]`: `a` - /// * `fields[1]`: `b` - /// * `fields[2]`: `c` - /// * `fields[3]`: `d` mask: Option>, } @@ -279,6 +271,11 @@ impl ProjectionMask { Self { mask: None } } + // TODO better interface + pub(crate) fn mask(&self) -> Option<&Vec> { + self.mask.as_ref() + } + /// Create a [`ProjectionMask`] which selects only the specified leaf columns /// /// Note: repeated or out of order indices will not impact the final mask