Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid allocating vector of indices in lexicographical_partition_ranges #998

Merged
merged 3 commits into from
Dec 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 108 additions & 25 deletions arrow/src/compute/kernels/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct LexicographicalPartitionIterator<'a> {
num_rows: usize,
previous_partition_point: usize,
partition_point: usize,
value_indices: Vec<usize>,
}

impl<'a> LexicographicalPartitionIterator<'a> {
Expand All @@ -62,39 +61,78 @@ impl<'a> LexicographicalPartitionIterator<'a> {
};

let comparator = LexicographicalComparator::try_new(columns)?;
let value_indices = (0..num_rows).collect::<Vec<usize>>();
Ok(LexicographicalPartitionIterator {
comparator,
num_rows,
previous_partition_point: 0,
partition_point: 0,
value_indices,
})
}
}

/// Exponential search is to remedy for the case when array size and cardinality are both large
/// Returns the next partition point of the range `start..end` according to the given comparator.
/// The return value is the index of the first element of the second partition,
/// and is guaranteed to be between `start..=end` (inclusive).
///
/// The values corresponding to those indices are assumed to be partitioned according to the given comparator.
///
/// Exponential search is to remedy for the case when array size and cardinality are both large.
/// In these cases the partition point would be near the beginning of the range and
/// plain binary search would be doing some unnecessary iterations on each call.
///
/// see <https://en.wikipedia.org/wiki/Exponential_search>
#[inline]
fn exponential_search(
indices: &[usize],
target: &usize,
fn exponential_search_next_partition_point(
start: usize,
end: usize,
comparator: &LexicographicalComparator<'_>,
) -> usize {
let target = start;
let mut bound = 1;
while bound < indices.len()
&& comparator.compare(&indices[bound], target) != Ordering::Greater
while bound + start < end
&& comparator.compare(&(bound + start), &target) != Ordering::Greater
{
bound *= 2;
}

// invariant after while loop:
// indices[bound / 2] <= target < indices[min(indices.len(), bound + 1)]
// (start + bound / 2) <= target < min(end, start + bound + 1)
// where <= and < are defined by the comparator;
// note here we have right = min(indices.len(), bound + 1) because indices[bound] might
// note here we have right = min(end, start + bound + 1) because (start + bound) might
// actually be considered and must be included.
(bound / 2)
+ indices[(bound / 2)..indices.len().min(bound + 1)]
.partition_point(|idx| comparator.compare(idx, target) != Ordering::Greater)
partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
comparator.compare(&idx, &target) != Ordering::Greater
})
}

/// Returns the partition point of the range `start..end` according to the given predicate.
/// The return value is the index of the first element of the second partition,
/// and is guaranteed to be between `start..=end` (inclusive).
///
/// The algorithm is similar to a binary search.
///
/// The values corresponding to those indices are assumed to be partitioned according to the given predicate.
///
/// See [`std::slice::partition_point`]
#[inline]
fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> usize {
let mut left = start;
let mut right = end;
let mut size = right - left;
while left < right {
let mid = left + size / 2;

let less = pred(mid);

if less {
left = mid + 1;
} else {
right = mid;
}

size = right - left;
}
left
}

impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
Expand All @@ -103,17 +141,12 @@ impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
fn next(&mut self) -> Option<Self::Item> {
if self.partition_point < self.num_rows {
// invariant:
// value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
// so in order to save time we can do binary search on the value_indices[previous_partition_point..]
// and find when any value is greater than value_indices[previous_partition_point]; because we are using
// new indices, the new offset is _added_ to the previous_partition_point.
//
// be careful that idx is of type &usize which points to the actual value within value_indices, which itself
// contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
// original columnar data.
self.partition_point += exponential_search(
&self.value_indices[self.partition_point..],
&self.partition_point,
// in the range [0..previous_partition_point] all values are <= the value at [previous_partition_point]
// so in order to save time we can do binary search on the range [previous_partition_point..num_rows]
// and find the index where any value is greater than the value at [previous_partition_point]
self.partition_point = exponential_search_next_partition_point(
self.partition_point,
self.num_rows,
&self.comparator,
);
let start = self.previous_partition_point;
Expand All @@ -134,6 +167,56 @@ mod tests {
use crate::datatypes::DataType;
use std::sync::Arc;

#[test]
fn test_partition_point() {
let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
{
let median = input[input.len() / 2];
assert_eq!(
9,
partition_point(
0,
input.len(),
&(|i: usize| input[i].cmp(&median) != Ordering::Greater)
)
);
}
{
let search = input[9];
assert_eq!(
12,
partition_point(
9,
input.len(),
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
{
let search = input[0];
assert_eq!(
3,
partition_point(
0,
9,
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
{
let search = input[5];
assert_eq!(
8,
partition_point(
5,
9,
&(|i: usize| input[i].cmp(&search) != Ordering::Greater)
)
);
}
}

#[test]
fn test_lexicographical_partition_ranges_empty() {
let input = vec![];
Expand Down