Skip to content

Commit 93de66a

Browse files
Dandandanjorgecarleitao
authored andcommitted
ARROW-10837: [Rust][DataFusion] Use Vec<u8> for hash keys
This PR is a follow up of apache/arrow#8765 . Here, the hashmap values for the key are converted to `Vec<u8>` to use as key instead. This is a bit faster as both hashing and cloning will be faster. It will also use less additional memory than the earlier usage of the dynamic `GroupByScalar` values (for hash join). [This PR] ``` Query 12 iteration 0 took 1315 ms Query 12 iteration 1 took 1324 ms Query 12 iteration 2 took 1329 ms Query 12 iteration 3 took 1334 ms Query 12 iteration 4 took 1335 ms Query 12 iteration 5 took 1338 ms Query 12 iteration 6 took 1337 ms Query 12 iteration 7 took 1349 ms Query 12 iteration 8 took 1348 ms Query 12 iteration 9 took 1358 ms ``` [Master] ``` Query 12 iteration 0 took 1379 ms Query 12 iteration 1 took 1383 ms Query 12 iteration 2 took 1401 ms Query 12 iteration 3 took 1406 ms Query 12 iteration 4 took 1420 ms Query 12 iteration 5 took 1435 ms Query 12 iteration 6 took 1401 ms Query 12 iteration 7 took 1404 ms Query 12 iteration 8 took 1418 ms Query 12 iteration 9 took 1416 ms ``` [This PR] ``` Query 1 iteration 0 took 871 ms Query 1 iteration 1 took 866 ms Query 1 iteration 2 took 869 ms Query 1 iteration 3 took 869 ms Query 1 iteration 4 took 867 ms Query 1 iteration 5 took 874 ms Query 1 iteration 6 took 870 ms Query 1 iteration 7 took 875 ms Query 1 iteration 8 took 871 ms Query 1 iteration 9 took 869 ms ``` [Master] ``` Query 1 iteration 0 took 1189 ms Query 1 iteration 1 took 1192 ms Query 1 iteration 2 took 1189 ms Query 1 iteration 3 took 1185 ms Query 1 iteration 4 took 1193 ms Query 1 iteration 5 took 1202 ms Query 1 iteration 6 took 1547 ms Query 1 iteration 7 took 1242 ms Query 1 iteration 8 took 1202 ms Query 1 iteration 9 took 1197 ms ``` FWIW, micro benchmark results for aggregate queries: ``` aggregate_query_no_group_by 15 12 time: [538.54 us 541.48 us 544.74 us] change: [+5.4384% +6.6260% +8.2034%] (p = 0.00 < 0.05) Performance has regressed. Found 8 outliers among 100 measurements (8.00%) 7 (7.00%) high mild 1 (1.00%) high severe aggregate_query_no_group_by_count_distinct_wide 15 12 time: [4.8418 ms 4.8744 ms 4.9076 ms] change: [-13.890% -12.580% -11.260%] (p = 0.00 < 0.05) Performance has improved. aggregate_query_no_group_by_count_distinct_narrow 15 12 time: [2.1910 ms 2.2100 ms 2.2291 ms] change: [-30.490% -28.886% -27.271%] (p = 0.00 < 0.05) Performance has improved. Benchmarking aggregate_query_group_by 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.1s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by 15 12 time: [1.5905 ms 1.5977 ms 1.6054 ms] change: [-18.271% -16.780% -15.396%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) high mild 5 (5.00%) high severe aggregate_query_group_by_with_filter 15 12 time: [788.26 us 792.05 us 795.74 us] change: [-9.8088% -8.5606% -7.4141%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 5 (5.00%) high mild 1 (1.00%) high severe Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.3s, enable flat sampling, or reduce sample count to 50. aggregate_query_group_by_u64 15 12 time: [1.8502 ms 1.8565 ms 1.8630 ms] change: [+8.6203% +9.8872% +10.973%] (p = 0.00 < 0.05) Performance has regressed. Found 8 outliers among 100 measurements (8.00%) 3 (3.00%) low mild 2 (2.00%) high mild 3 (3.00%) high severe aggregate_query_group_by_with_filter_u64 15 12 time: [777.83 us 782.75 us 788.15 us] change: [-7.5157% -6.6393% -5.6558%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) high mild 2 (2.00%) high severe ``` FYI @jorgecarleitao Closes #8863 from Dandandan/key_byte_vec Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
1 parent 8f1931a commit 93de66a

File tree

2 files changed

+93
-27
lines changed

2 files changed

+93
-27
lines changed

rust/datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ use arrow::{
4343
use pin_project_lite::pin_project;
4444

4545
use super::{
46-
common, expressions::Column, group_scalar::GroupByScalar, RecordBatchStream,
47-
SendableRecordBatchStream,
46+
common, expressions::Column, group_scalar::GroupByScalar, hash_join::create_key,
47+
RecordBatchStream, SendableRecordBatchStream,
4848
};
4949
use ahash::RandomState;
5050
use hashbrown::HashMap;
@@ -245,12 +245,14 @@ fn group_aggregate_batch(
245245
// create vector large enough to hold the grouping key
246246
// this is an optimization to avoid allocating `key` on every row.
247247
// it will be overwritten on every iteration of the loop below
248-
let mut key = Vec::with_capacity(group_values.len());
248+
let mut group_by_values = Vec::with_capacity(group_values.len());
249249
for _ in 0..group_values.len() {
250-
key.push(GroupByScalar::UInt32(0));
250+
group_by_values.push(GroupByScalar::UInt32(0));
251251
}
252252

253-
let mut key = key.into_boxed_slice();
253+
let mut group_by_values = group_by_values.into_boxed_slice();
254+
255+
let mut key = Vec::with_capacity(group_values.len());
254256

255257
// 1.1 construct the key from the group values
256258
// 1.2 construct the mapping key if it does not exist
@@ -263,16 +265,21 @@ fn group_aggregate_batch(
263265
// 1.1
264266
create_key(&group_values, row, &mut key)
265267
.map_err(DataFusionError::into_arrow_external_error)?;
268+
266269
accumulators
267270
.raw_entry_mut()
268271
.from_key(&key)
269272
// 1.3
270-
.and_modify(|_, (_, v)| v.push(row as u32))
273+
.and_modify(|_, (_, _, v)| v.push(row as u32))
271274
// 1.2
272275
.or_insert_with(|| {
273276
// We can safely unwrap here as we checked we can create an accumulator before
274277
let accumulator_set = create_accumulators(aggr_expr).unwrap();
275-
(key.clone(), (accumulator_set, vec![row as u32]))
278+
let _ = create_group_by_values(&group_values, row, &mut group_by_values);
279+
(
280+
key.clone(),
281+
(group_by_values.clone(), accumulator_set, vec![row as u32]),
282+
)
276283
});
277284
}
278285

@@ -284,7 +291,7 @@ fn group_aggregate_batch(
284291
accumulators
285292
.iter_mut()
286293
// 2.1
287-
.map(|(_, (accumulator_set, indices))| {
294+
.map(|(_, (_, accumulator_set, indices))| {
288295
// 2.2
289296
accumulator_set
290297
.into_iter()
@@ -391,7 +398,7 @@ impl GroupedHashAggregateStream {
391398

392399
type AccumulatorSet = Vec<Box<dyn Accumulator>>;
393400
type Accumulators =
394-
HashMap<Box<[GroupByScalar]>, (AccumulatorSet, Vec<u32>), RandomState>;
401+
HashMap<Vec<u8>, (Box<[GroupByScalar]>, AccumulatorSet, Vec<u32>), RandomState>;
395402

396403
impl Stream for GroupedHashAggregateStream {
397404
type Item = ArrowResult<RecordBatch>;
@@ -646,10 +653,10 @@ fn create_batch_from_map(
646653
// 5. concatenate the arrays over the second index [j] into a single vec<ArrayRef>.
647654
let arrays = accumulators
648655
.iter()
649-
.map(|(k, (accumulator_set, _))| {
656+
.map(|(_, (group_by_values, accumulator_set, _))| {
650657
// 2.
651658
let mut groups = (0..num_group_expr)
652-
.map(|i| match &k[i] {
659+
.map(|i| match &group_by_values[i] {
653660
GroupByScalar::Int8(n) => {
654661
Arc::new(Int8Array::from(vec![*n])) as ArrayRef
655662
}
@@ -726,8 +733,8 @@ fn finalize_aggregation(
726733
}
727734
}
728735

729-
/// Create a Vec<GroupByScalar> that can be used as a map key
730-
pub(crate) fn create_key(
736+
/// Create a Box<[GroupByScalar]> for the group by values
737+
pub(crate) fn create_group_by_values(
731738
group_by_keys: &[ArrayRef],
732739
row: usize,
733740
vec: &mut Box<[GroupByScalar]>,

rust/datafusion/src/physical_plan/hash_join.rs

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Defines the join plan for executing partitions in parallel and then joining the results
1919
//! into a set of partitions.
2020
21+
use arrow::array::ArrayRef;
2122
use std::sync::Arc;
2223
use std::{any::Any, collections::HashSet};
2324

@@ -26,21 +27,24 @@ use futures::{Stream, StreamExt, TryStreamExt};
2627
use hashbrown::HashMap;
2728

2829
use arrow::array::{make_array, Array, MutableArrayData};
30+
use arrow::datatypes::DataType;
2931
use arrow::datatypes::{Schema, SchemaRef};
3032
use arrow::error::Result as ArrowResult;
3133
use arrow::record_batch::RecordBatch;
3234

33-
use super::{expressions::col, hash_aggregate::create_key};
35+
use arrow::array::{
36+
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, UInt16Array, UInt32Array,
37+
UInt64Array, UInt8Array,
38+
};
39+
40+
use super::expressions::col;
3441
use super::{
3542
hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
3643
merge::MergeExec,
3744
};
3845
use crate::error::{DataFusionError, Result};
3946

40-
use super::{
41-
group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream,
42-
SendableRecordBatchStream,
43-
};
47+
use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
4448
use ahash::RandomState;
4549

4650
// An index of (batch, row) uniquely identifying a row in a part.
@@ -52,7 +56,7 @@ type JoinIndex = Option<(usize, usize)>;
5256
// Maps ["on" value] -> [list of indices with this key's value]
5357
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
5458
// for rows 3 and 8 from batch 0 and row 6 from batch 1.
55-
type JoinHashMap = HashMap<Box<[GroupByScalar]>, Vec<Index>, RandomState>;
59+
type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
5660
type JoinLeftData = (JoinHashMap, Vec<RecordBatch>);
5761

5862
/// join execution plan executes partitions in parallel and combines them into a set of
@@ -205,11 +209,6 @@ fn update_hash(
205209
.collect::<Result<Vec<_>>>()?;
206210

207211
let mut key = Vec::with_capacity(keys_values.len());
208-
for _ in 0..keys_values.len() {
209-
key.push(GroupByScalar::UInt32(0));
210-
}
211-
212-
let mut key = key.into_boxed_slice();
213212

214213
// update the hash map
215214
for row in 0..batch.num_rows() {
@@ -318,6 +317,67 @@ fn build_batch_from_indices(
318317
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
319318
}
320319

320+
/// Create a key `Vec<u8>` that is used as key for the hashmap
321+
pub(crate) fn create_key(
322+
group_by_keys: &[ArrayRef],
323+
row: usize,
324+
vec: &mut Vec<u8>,
325+
) -> Result<()> {
326+
vec.clear();
327+
for i in 0..group_by_keys.len() {
328+
let col = &group_by_keys[i];
329+
match col.data_type() {
330+
DataType::UInt8 => {
331+
let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
332+
vec.extend(array.value(row).to_le_bytes().iter());
333+
}
334+
DataType::UInt16 => {
335+
let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
336+
vec.extend(array.value(row).to_le_bytes().iter());
337+
}
338+
DataType::UInt32 => {
339+
let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
340+
vec.extend(array.value(row).to_le_bytes().iter());
341+
}
342+
DataType::UInt64 => {
343+
let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
344+
vec.extend(array.value(row).to_le_bytes().iter());
345+
}
346+
DataType::Int8 => {
347+
let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
348+
vec.extend(array.value(row).to_le_bytes().iter());
349+
}
350+
DataType::Int16 => {
351+
let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
352+
vec.extend(array.value(row).to_le_bytes().iter());
353+
}
354+
DataType::Int32 => {
355+
let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
356+
vec.extend(array.value(row).to_le_bytes().iter());
357+
}
358+
DataType::Int64 => {
359+
let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
360+
vec.extend(array.value(row).to_le_bytes().iter());
361+
}
362+
DataType::Utf8 => {
363+
let array = col.as_any().downcast_ref::<StringArray>().unwrap();
364+
let value = array.value(row);
365+
// store the size
366+
vec.extend(value.len().to_le_bytes().iter());
367+
// store the string value
368+
vec.extend(array.value(row).as_bytes().iter());
369+
}
370+
_ => {
371+
// This is internal because we should have caught this before.
372+
return Err(DataFusionError::Internal(
373+
"Unsupported GROUP BY data type".to_string(),
374+
));
375+
}
376+
}
377+
}
378+
Ok(())
379+
}
380+
321381
fn build_batch(
322382
batch: &RecordBatch,
323383
left_data: &JoinLeftData,
@@ -370,9 +430,8 @@ fn build_join_indexes(
370430
JoinType::Inner => {
371431
// inner => key intersection
372432
// unfortunately rust does not support intersection of map keys :(
373-
let left_set: HashSet<Box<[GroupByScalar]>> = left.keys().cloned().collect();
374-
let left_right: HashSet<Box<[GroupByScalar]>> =
375-
right.keys().cloned().collect();
433+
let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
434+
let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
376435
let inner = left_set.intersection(&left_right);
377436

378437
let mut indexes = Vec::new(); // unknown a prior size

0 commit comments

Comments
 (0)