-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Make push_batch_with_filter up to 3x faster
#8951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6ecd42b
a8df36f
f20702b
124b4e3
79bd847
b2fc66f
0872a9b
b7b3f18
7758889
dcf4864
87626d1
7c46a72
6ee1f04
c39a455
dc0c45e
d2b5d29
18cf6fc
ddd0306
82acfe1
1acccc7
bb025cf
ca19422
f718f2e
b235243
ae995ba
e8919b1
46484ea
3022aa6
4eb65a0
b2f9e42
6bfd685
0f994fa
72c356a
432b760
a4dee8a
9927454
4a3ce6a
c587cf0
69dbab2
e28c305
077ad74
f7c430d
923c2b2
6d26fbc
91adb91
d807503
db37aa1
6234ee0
924e1fe
8485edf
9ee3cf1
684bc9f
b41cd0d
66c1dae
94317f7
717ed06
a064327
b8322ce
ac1afae
bf63ec5
5d5d1bf
9f43539
c3b76dc
4bdccc2
90c0a39
87fe9c8
a9444d9
7d1223d
91fe6f7
efeeded
b850d6f
2380b11
5678415
4951e5b
a4a49d1
e78fa28
e2ad508
414b3b9
840653b
a1d4097
9773da7
bd65f64
ec7ef9e
e76a5d7
58190b8
be6b796
f0b98d6
9aaca1f
16f5d86
a27e4ab
6de89a3
2b8711f
12c526b
310404c
92b1b46
c9ae743
5ae319f
bd4a1bc
99772cf
63443da
b9efe0c
cbaaf98
0dbd786
0a5a575
3fbd560
d3cf7cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,8 @@ | |
| //! | ||
| //! [`filter`]: crate::filter::filter | ||
| //! [`take`]: crate::take::take | ||
| use crate::filter::filter_record_batch; | ||
| use crate::filter::{FilterBuilder, FilterPredicate, is_optimize_beneficial_record_batch}; | ||
|
|
||
| use crate::take::take_record_batch; | ||
| use arrow_array::types::{BinaryViewType, StringViewType}; | ||
| use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive}; | ||
|
|
@@ -212,7 +213,10 @@ impl BatchCoalescer { | |
| /// Push a batch into the Coalescer after applying a filter | ||
| /// | ||
| /// This is semantically equivalent of calling [`Self::push_batch`] | ||
| /// with the results from [`filter_record_batch`] | ||
| /// with the results from [`filter_record_batch`], but avoids | ||
| /// materializing the intermediate filtered batch. | ||
| /// | ||
| /// [`filter_record_batch`]: crate::filter::filter_record_batch | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
|
|
@@ -238,10 +242,103 @@ impl BatchCoalescer { | |
| batch: RecordBatch, | ||
| filter: &BooleanArray, | ||
| ) -> Result<(), ArrowError> { | ||
| // TODO: optimize this to avoid materializing (copying the results | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎉 |
||
| // of filter to a new batch) | ||
| let filtered_batch = filter_record_batch(&batch, filter)?; | ||
| self.push_batch(filtered_batch) | ||
| // We only support primitve now, fallback to filter_record_batch for other types | ||
| // Also, skip optimization when filter is not very selectivex§ | ||
|
|
||
| // Build an optimized filter predicate that chooses the best iteration strategy | ||
| // Byteview does use a filter as part of calculating ideal buffer sizes, so optimizing is helpful even for | ||
| // a single array | ||
| let is_optimize_beneficial = is_optimize_beneficial_record_batch(&batch) | ||
| || batch.columns().len() == 1 | ||
| && matches!( | ||
| batch.columns()[0].data_type(), | ||
| DataType::BinaryView | DataType::Utf8View | ||
| ); | ||
| let selected_count = filter.true_count(); | ||
| let num_rows = batch.num_rows(); | ||
|
|
||
| // Fast path: skip if no rows selected | ||
| if selected_count == 0 { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Fast path: if all rows selected, just push the batch | ||
| if selected_count == num_rows { | ||
| return self.push_batch(batch); | ||
| } | ||
|
|
||
| let (_schema, arrays, _num_rows) = batch.into_parts(); | ||
|
|
||
| let mut filter_builder = FilterBuilder::new(&filter); | ||
|
|
||
| if is_optimize_beneficial { | ||
| filter_builder = filter_builder.optimize(); | ||
| } | ||
|
|
||
| let filter = filter_builder.build(); | ||
| // Setup input arrays as sources | ||
| assert_eq!(arrays.len(), self.in_progress_arrays.len()); | ||
| self.in_progress_arrays | ||
| .iter_mut() | ||
| .zip(arrays) | ||
| .for_each(|(in_progress, array)| { | ||
| in_progress.set_source_from_filter(Some(array), &filter); | ||
| }); | ||
Dandandan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Choose iteration strategy based on the optimized predicate | ||
| self.copy_from_filter(filter, selected_count)?; | ||
| // Clear sources to allow memory to be freed | ||
| for in_progress in self.in_progress_arrays.iter_mut() { | ||
| in_progress.set_source(None); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Helper to copy rows at the given indices, handling batch boundaries efficiently | ||
| /// | ||
| /// This method batches the index iteration to avoid per-row batch boundary checks. | ||
| fn copy_from_filter( | ||
| &mut self, | ||
| filter: FilterPredicate, | ||
| count: usize, | ||
| ) -> Result<(), ArrowError> { | ||
| let mut remaining = count; | ||
| let mut filter_pos = 0; // Position in the filter array | ||
|
|
||
| // Build an optimized filter predicate once for the whole input batch | ||
|
|
||
| // We need to process the filter in chunks that fit the target batch size | ||
| while remaining > 0 { | ||
| let space_in_batch = self.target_batch_size - self.buffered_rows; | ||
| let to_copy = remaining.min(space_in_batch); | ||
|
|
||
| // Find how many filter positions we need to cover `to_copy` set bits | ||
| // Skip the expensive search if all remaining rows fit in the current batch | ||
| let chunk_len = if remaining <= space_in_batch { | ||
| filter.len() - filter_pos | ||
| } else { | ||
| filter.find_nth_set_bit_position(filter_pos, to_copy) - filter_pos | ||
| }; | ||
|
|
||
| let chunk_predicate = filter.slice_with_count(filter_pos, chunk_len, to_copy); | ||
|
|
||
| // Copy all collected indices in one call per array | ||
| for in_progress in self.in_progress_arrays.iter_mut() { | ||
| in_progress.copy_rows_by_filter(&chunk_predicate, filter_pos, chunk_len)?; | ||
| } | ||
|
Comment on lines
+326
to
+329
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A performance improvement you can do here is copy X columns at a time like I did and explained in 4: the number 4 is a magic number, but you can pick other number like 2 to amortize the cost of boolean iterations |
||
|
|
||
| self.buffered_rows += to_copy; | ||
| filter_pos += chunk_len; | ||
| remaining -= to_copy; | ||
|
|
||
| // If we've filled the batch, finish it | ||
| if self.buffered_rows >= self.target_batch_size { | ||
| self.finish_buffered_batch()?; | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Push a batch into the Coalescer after applying a set of indices | ||
|
|
@@ -598,13 +695,31 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { | |
| /// current in-progress array | ||
| fn set_source(&mut self, source: Option<ArrayRef>); | ||
|
|
||
| /// Set the source array with a filter, allowing for calculating GC based on filter | ||
| /// | ||
| /// Default implementation just calls [`Self::set_source`] | ||
| fn set_source_from_filter(&mut self, source: Option<ArrayRef>, _filter: &FilterPredicate) { | ||
| self.set_source(source); | ||
| } | ||
|
|
||
| /// Copy rows from the current source array into the in-progress array | ||
| /// | ||
| /// The source array is set by [`Self::set_source`]. | ||
| /// | ||
| /// Return an error if the source array is not set | ||
| fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>; | ||
|
|
||
| /// Copy rows from the source array between the specified offset and len that | ||
| /// match the predicate to the output array | ||
| /// | ||
| /// TODO add an example | ||
| fn copy_rows_by_filter( | ||
| &mut self, | ||
| filter: &FilterPredicate, | ||
| offset: usize, | ||
| len: usize, | ||
| ) -> Result<(), ArrowError>; | ||
|
|
||
| /// Finish the currently in-progress array and return it as an `ArrayRef` | ||
| fn finish(&mut self) -> Result<ArrayRef, ArrowError>; | ||
| } | ||
|
|
@@ -613,6 +728,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { | |
| mod tests { | ||
| use super::*; | ||
| use crate::concat::concat_batches; | ||
| use crate::filter::filter_record_batch; | ||
| use arrow_array::builder::StringViewBuilder; | ||
| use arrow_array::cast::AsArray; | ||
| use arrow_array::types::Int32Type; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is fine to have, but technically speaking should these paths be covered in the lower BooleanBufferBuilder::extend_trusted_len ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. these tests: https://github.com/apache/arrow-rs/blob/840653bc0b1f072c6526aebce09fabbf4d938ce2/arrow-buffer/src/builder/boolean.rs#L580-L579
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah probably redundant by now