Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized sorting for ordered dictionaries #1048

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
63f1936
Optionally sort dictionary arrays by key instead of value
jhorstmann Dec 14, 2021
38dfd18
Handle comparing by dictionary keys in lexicographical sort and parti…
jhorstmann Dec 16, 2021
60eca77
Benchmarks for partition kernels with dictionary arrays
jhorstmann Dec 16, 2021
cf2aa11
Benchmarks with different cardinalities
jhorstmann Dec 16, 2021
a2ca03e
Use the is_ordered flag on DictionaryArray instead of a separate flag…
jhorstmann Jan 7, 2022
5c31db2
Comments on one test case
jhorstmann Jan 7, 2022
d84fbb3
Rename method for setting the is_ordered flag
jhorstmann Jan 7, 2022
25fcee6
Function to convert DictionaryArray to one with ordered values
jhorstmann Jan 14, 2022
d5a0575
Adjust error checking
jhorstmann Jan 14, 2022
789f6d1
Merge remote-tracking branch 'upstream/master' into sorting-and-parti…
jhorstmann Apr 18, 2022
6ad616e
Actually mark the dictionary as ordered after sorting and avoid some …
jhorstmann Apr 18, 2022
743ea19
Reuse ArrayData::ptr_eq
jhorstmann Apr 18, 2022
fc7189e
Add benchmarks
jhorstmann Apr 18, 2022
6ccf7f1
Remove leftover comment
jhorstmann Apr 18, 2022
199aaeb
Correct data for as_ordered keys
jhorstmann Apr 18, 2022
791e863
Remove parameter from as_ordered method
jhorstmann Apr 18, 2022
4482b1b
Revert enabling force_validate
jhorstmann Apr 18, 2022
172752a
Need a loopup vector for mapping to new keys, previous test was someh…
jhorstmann May 14, 2022
a013778
Optimize partition_validity
jhorstmann May 26, 2022
17c20e7
Optimize DictionaryArray::make_ordered
jhorstmann May 29, 2022
a540f08
Merge remote-tracking branch 'upstream/master' into sorting-and-parti…
jhorstmann May 29, 2022
10bc713
Fix partition_kernels benchmark
jhorstmann May 29, 2022
07389a2
Formatting and safety comment
jhorstmann May 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions arrow/benches/partition_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use std::sync::Arc;
extern crate arrow;
use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::{lexsort, SortColumn};
use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType, Int32Type};
use arrow::util::bench_util::*;
use arrow::util::test_util::seedable_rng;
use arrow::{
array::*,
datatypes::{ArrowPrimitiveType, Float64Type, UInt8Type},
};
use rand::distributions::{Distribution, Standard};
use rand::Rng;
use std::iter;

fn create_array<T: ArrowPrimitiveType>(size: usize, with_nulls: bool) -> ArrayRef
Expand All @@ -39,6 +42,47 @@ where
Arc::new(array)
}

fn create_sorted_dictionary_data<T: ArrowDictionaryKeyType>(
size: usize,
num_distinct_keys: usize,
mark_as_sorted: bool,
) -> Vec<ArrayRef>
where
Standard: Distribution<T::Native>,
T::Native: Ord,
{
let mut rng = seedable_rng();

let mut keys = (0..size)
.map(|_| T::Native::from_usize(rng.gen_range(0..num_distinct_keys)).unwrap())
.collect::<Vec<_>>();
keys.sort();
let keys = PrimitiveArray::<T>::from_iter_values(keys);
let mut values = (0..num_distinct_keys)
.map(|_| format!("{}", rng.gen_range(10_000_usize..1_000_000_000_000_usize)))
.collect::<Vec<_>>();
values.sort();
let values = StringArray::from_iter_values(values);

let data = ArrayData::try_new(
DataType::Dictionary(Box::new(T::DATA_TYPE), Box::new(DataType::Utf8)),
size,
keys.data().null_buffer().cloned(),
0,
keys.data().buffers().to_vec(),
vec![values.data().clone()],
)
.unwrap();

let mut dictionary_array = DictionaryArray::<T>::from(data);

if mark_as_sorted {
dictionary_array = dictionary_array.as_ordered();
}

vec![Arc::new(dictionary_array)]
}

fn bench_partition(sorted_columns: &[ArrayRef]) {
let columns = sorted_columns
.iter()
Expand Down Expand Up @@ -140,6 +184,33 @@ fn add_benchmark(c: &mut Criterion) {
"lexicographical_partition_ranges(low cardinality) 1024",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns =
create_sorted_dictionary_data::<Int32Type>(16 * 1024, 100, false);
c.bench_function(
"lexicographical_partition_ranges(dictionary_values_low_cardinality)",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns =
create_sorted_dictionary_data::<Int32Type>(16 * 1024, 1000, false);
c.bench_function(
"lexicographical_partition_ranges(dictionary_values_high_cardinality)",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns = create_sorted_dictionary_data::<Int32Type>(16 * 1024, 100, true);
c.bench_function(
"lexicographical_partition_ranges(dictionary_keys_low_cardinality)",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns =
create_sorted_dictionary_data::<Int32Type>(16 * 1024, 1000, true);
c.bench_function(
"lexicographical_partition_ranges(dictionary_keys_high_cardinality)",
|b| b.iter(|| bench_partition(&sorted_columns)),
);
}

criterion_group!(benches, add_benchmark);
Expand Down
86 changes: 86 additions & 0 deletions arrow/benches/sort_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
extern crate criterion;
use criterion::Criterion;

use rand::Rng;
use std::sync::Arc;

extern crate arrow;

use arrow::compute::kernels::sort::{lexsort, SortColumn};
use arrow::util::bench_util::*;
use arrow::util::test_util::seedable_rng;
use arrow::{array::*, datatypes::Float32Type};

fn create_f32_array(size: usize, with_nulls: bool) -> ArrayRef {
Expand All @@ -40,6 +42,36 @@ fn create_bool_array(size: usize, with_nulls: bool) -> ArrayRef {
Arc::new(array)
}

fn create_string_array(
size: usize,
max_len: usize,
cardinality: usize,
with_nulls: bool,
) -> ArrayRef {
let null_density = if with_nulls { 0.5 } else { 0.0 };

let strings = create_string_array_with_len::<i32>(cardinality, 0.0, max_len);
let rng = &mut seedable_rng();

let values = (0..size)
.map(|_| {
if rng.gen_bool(null_density) {
None
} else {
let idx = rng.gen_range(0..strings.len());
Some(strings.value(idx))
}
})
.collect::<StringArray>();

Arc::new(values)
}

fn create_string_dict_array(string_array: &ArrayRef) -> ArrayRef {
let strings = string_array.as_any().downcast_ref::<StringArray>().unwrap();
Arc::new(Int32DictionaryArray::from_iter(strings.into_iter()))
}

fn bench_sort(array_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
let columns = vec![
SortColumn {
Expand Down Expand Up @@ -92,6 +124,60 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_sort(&arr_a, &arr_b, None))
});

let arr_a: ArrayRef = create_string_array(2_usize.pow(12), 32, 32, true);
let arr_b: ArrayRef = create_string_array(2_usize.pow(12), 16, 64, true);
c.bench_function("string sort nulls 2^12", |b| {
b.iter(|| bench_sort(&arr_a, &arr_b, None))
});

let arr_a = create_string_dict_array(&arr_a);
let arr_b = create_string_dict_array(&arr_b);
c.bench_function("dict string sort nulls 2^12", |b| {
b.iter(|| bench_sort(&arr_a, &arr_b, None))
});

c.bench_function("make_ordered dict string sort nulls 2^12", |b| {
b.iter(|| {
let arr_a: ArrayRef = Arc::new(
arr_a
.as_any()
.downcast_ref::<Int32DictionaryArray>()
.unwrap()
.make_ordered()
.unwrap(),
);
let arr_b: ArrayRef = Arc::new(
arr_b
.as_any()
.downcast_ref::<Int32DictionaryArray>()
.unwrap()
.make_ordered()
.unwrap(),
);
bench_sort(&arr_a, &arr_b, None);
});
});

let arr_a: ArrayRef = Arc::new(
arr_a
.as_any()
.downcast_ref::<Int32DictionaryArray>()
.unwrap()
.make_ordered()
.unwrap(),
);
let arr_b: ArrayRef = Arc::new(
arr_b
.as_any()
.downcast_ref::<Int32DictionaryArray>()
.unwrap()
.make_ordered()
.unwrap(),
);
c.bench_function("presorted dict string sort nulls 2^12", |b| {
b.iter(|| bench_sort(&arr_a, &arr_b, None))
});

// with limit
{
let arr_a = create_f32_array(2u64.pow(12) as usize, false);
Expand Down
120 changes: 119 additions & 1 deletion arrow/src/array/array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::buffer::Buffer;
use crate::compute::{sort_to_indices, take, TakeOptions};
use std::any::Any;
use std::fmt;
use std::iter::IntoIterator;
Expand All @@ -27,7 +29,7 @@ use super::{
use crate::datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, ArrowPrimitiveType, DataType,
};
use crate::error::Result;
use crate::error::{ArrowError, Result};

/// A dictionary array where each element is a single value indexed by an integer key.
/// This is mostly used to represent strings or a limited set of primitive types as integers,
Expand Down Expand Up @@ -163,6 +165,92 @@ impl<'a, K: ArrowPrimitiveType> DictionaryArray<K> {
self.is_ordered
}

/// Returns a DictionaryArray referencing the same data
/// with the [DictionaryArray::is_ordered] flag set to `true`.
/// Note that this does not actually reorder the values in the dictionary.
pub fn as_ordered(&self) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this API should be marked as unsafe as it relies on the user "doing the right thing"?

Perhaps we could call it something like pub unsafe fn as_orderd_unchecked() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I don't think it would be possible to trigger undefined behavior with this method, it would only sort differently. But it certainly makes an assumption that the compiler can not verify.

Maybe the method could also have a better name, my intention was something like assume_can_be_sorted_by_keys. Even if the dictionary is not actually sorted, setting the flag allows useful behavior, like sorting by keys and then using lexicographical_partition_ranges with the same key-based comparator.

Self {
data: self.data.clone(),
values: self.values.clone(),
keys: PrimitiveArray::<K>::from(self.keys.data().clone()),
is_ordered: true,
}
}

pub fn make_ordered(&self) -> Result<Self> {
let values = self.values();
if self.is_ordered || values.is_empty() {
Ok(self.as_ordered())
} else {
// validate up front that we can do conversions from/to usize for the whole range of keys
// this allows using faster unchecked conversions below
K::Native::from_usize(values.len())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.ok_or(ArrowError::DictionaryKeyOverflowError)?;
// sort indices are u32 so we cannot sort larger dictionaries
u32::try_from(values.len())
.map_err(|_| ArrowError::DictionaryKeyOverflowError)?;

// sort the dictionary values
let sort_indices = sort_to_indices(values, None, None)?;
let sorted_dictionary = take(
values.as_ref(),
&sort_indices,
Some(TakeOptions {
check_bounds: false,
}),
)?;

// build a lookup table from old to new key
let mut lookup = vec![0; sort_indices.len()];
sort_indices
.values()
.iter()
.enumerate()
.for_each(|(i, idx)| {
lookup[*idx as usize] = i;
});

let mapped_keys_iter = self.keys_iter().map(|opt_key| {
if let Some(key) = opt_key {
// Safety:
// lookup has the same length as the dictionary values
// so if the keys were valid for values they will be valid indices into lookup
unsafe {
debug_assert!(key < lookup.len());
let new_key = *lookup.get_unchecked(key);
debug_assert!(new_key < values.len());
K::Native::from_usize(new_key).unwrap_unchecked()
}
} else {
K::default_value()
}
});

// Safety:
// PrimitiveIter has a trusted len
let new_key_buffer =
unsafe { Buffer::from_trusted_len_iter(mapped_keys_iter) };

// Safety:
// after remapping the keys will be in the same range as before
let new_data = unsafe {
ArrayData::new_unchecked(
self.data_type().clone(),
self.len(),
Some(self.data.null_count()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense the nullness is the same before and after sorting. I assume it was faster to do this than to create the new values buffer directly from an iterator of Option<K::Native>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I benchmarked this separately, but it should be quite a bit faster. In the most common case of offset being zero this would be zero-copy, while creating a PrimitiveArray from an iterator has some overhead per element, checking whether the buffers need to grow and setting individual bits in the validity bitmap. If there is no validity, the iterator api would also construct one first.

I might try looking into the performance of the iterator based apis in a separate issue.

self.data
.null_buffer()
.map(|b| b.bit_slice(self.data.offset(), self.len())),
0,
vec![new_key_buffer],
vec![sorted_dictionary.data().clone()],
)
};

Ok(DictionaryArray::from(new_data).as_ordered())
}
}

/// Return an iterator over the keys (indexes into the dictionary)
pub fn keys_iter(&self) -> impl Iterator<Item = Option<usize>> + '_ {
self.keys
Expand Down Expand Up @@ -485,6 +573,36 @@ mod tests {
.expect("All null array has valid array data");
}

#[test]
fn test_dictionary_make_ordered() {
let test = vec![
Some("b"),
Some("b"),
None,
Some("d"),
Some("d"),
Some("c"),
Some("a"),
];
let array: DictionaryArray<Int32Type> = test.into_iter().collect();

let ordered = array.make_ordered().unwrap();
let actual_keys = ordered.keys.iter().collect::<Vec<_>>();

let expected_keys =
vec![Some(1), Some(1), None, Some(3), Some(3), Some(2), Some(0)];
assert_eq!(&expected_keys, &actual_keys);

let expected_values = StringArray::from(vec!["a", "b", "c", "d"]);
let actual_values = ordered
.values
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

assert_eq!(&expected_values, actual_values);
}

#[test]
fn test_dictionary_iter() {
// Construct a value array
Expand Down
Loading