Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) #6679

Merged
merged 16 commits into from
Jun 19, 2023
88 changes: 48 additions & 40 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{
ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
DictionaryArray, FixedSizeBinaryArray, LargeStringArray, PrimitiveArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
Expand All @@ -43,9 +43,8 @@ use arrow::{
util::bit_util,
};
use futures::{ready, Stream, StreamExt, TryStreamExt};
use hashbrown::raw::RawTable;
use smallvec::smallvec;
use std::fmt;
use std::mem::size_of;
use std::sync::Arc;
use std::task::Poll;
use std::{any::Any, usize, vec};
Expand Down Expand Up @@ -510,15 +509,16 @@ async fn collect_left_input(
)
})? / 7)
.next_power_of_two();
// 32 bytes per `(u64, SmallVec<[u64; 1]>)`
// 16 bytes per `(u64, u64)`
// + 1 byte for each bucket
// + 16 bytes fixed
let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;
// + fixed size of JoinHashMap (RawTable + Vec)
let estimated_hastable_size =
16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();

reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);

let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
Expand Down Expand Up @@ -563,16 +563,24 @@ pub fn update_hash(
// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
let item = hash_map
.0
.map
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
if let Some((_, indices)) = item {
indices.push((row + offset) as u64);
if let Some((_, index)) = item {
// Already exists: add index to next array
let prev_index = *index;
// Store new value inside hashmap
*index = (row + offset + 1) as u64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to hold the chain start in hashmap, instead of end of the chain? Is there any particular reason for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additions become O(1) by holding the end of the chain, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reason is that while iterating over the hashes/indices we get the latest index (which contains both the value and points to the previous index each time) as a constant time operation. Not sure how it would work when holding the chain start in the map as we have to iterate the map first to get to the last?

It would be possible (though seems not beneficial for the normal hash join) to also keep the start of the chain in the hashmap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is no gain for the usual hash join, but pruning becomes much more expensive if I do not have the beginning. I think I will not push for it, for now, let s have separate hashmap paradigms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additions become O(1) by holding the end of the chain, right?

Yes, this way next[value - 1] contains the previous value, and the next value / index can be found in the same way again.

// Update chained Vec at row + offset with previous value
hash_map.next[row + offset] = prev_index;
} else {
hash_map.0.insert(
hash_map.map.insert(
*hash_value,
(*hash_value, smallvec![(row + offset) as u64]),
// store the value + 1 as 0 value reserved for end of list
(*hash_value, (row + offset + 1) as u64),
|(hash, _)| *hash,
);
// chained list at (row + offset) is already initialized with 0
// meaning end of list
}
}
Ok(())
Expand Down Expand Up @@ -629,7 +637,6 @@ pub fn build_join_indices(
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
build_side: JoinSide,
) -> Result<(UInt64Array, UInt32Array)> {
// Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
Expand All @@ -642,7 +649,6 @@ pub fn build_join_indices(
random_state,
null_equals_null,
hashes_buffer,
offset,
)?;
if let Some(filter) = filter {
// Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
Expand Down Expand Up @@ -700,7 +706,6 @@ pub fn build_equal_condition_join_indices(
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
Expand All @@ -719,47 +724,44 @@ pub fn build_equal_condition_join_indices(
// Using a buffer builder to avoid slower normal builder
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
let offset_value = offset.unwrap_or(0);
// Visit all of the probe rows
for (row, hash_value) in hash_values.iter().enumerate() {
// Get the hash and find it in the build index

// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, indices)) = build_hashmap
.0
if let Some((_, index)) = build_hashmap
.map
.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Check hash collisions
let offset_build_index = i as usize - offset_value;
let mut i = *index - 1;
loop {
// Check hash collisions
if equal_rows(
offset_build_index,
i as usize,
row,
&build_join_values,
&keys_values,
null_equals_null,
)? {
build_indices.append(offset_build_index as u64);
build_indices.append(i);
probe_indices.append(row as u32);
}
// Follow the chain to get the next index value
let next = build_hashmap.next[i as usize];
if next == 0 {
// end of list
break;
}
i = next - 1;
}
}
}
let build = ArrayData::builder(DataType::UInt64)
.len(build_indices.len())
.add_buffer(build_indices.finish())
.build()?;
let probe = ArrayData::builder(DataType::UInt32)
.len(probe_indices.len())
.add_buffer(probe_indices.finish())
.build()?;

Ok((
PrimitiveArray::<UInt64Type>::from(build),
PrimitiveArray::<UInt32Type>::from(probe),
PrimitiveArray::new(build_indices.finish().into(), None),
PrimitiveArray::new(probe_indices.finish().into(), None),
))
}

Expand Down Expand Up @@ -830,7 +832,7 @@ macro_rules! equal_rows_elem_with_string_dict {
/// Left and right row have equal values
/// If more data types are supported here, please also add the data types in can_hash function
/// to generate hash join logical plan.
fn equal_rows(
pub fn equal_rows(
left: usize,
right: usize,
left_arrays: &[ArrayRef],
Expand Down Expand Up @@ -1157,7 +1159,6 @@ impl HashJoinStream {
&self.random_state,
self.null_equals_null,
&mut hashes_buffer,
None,
JoinSide::Left,
);

Expand Down Expand Up @@ -1258,11 +1259,11 @@ mod tests {

use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
use smallvec::smallvec;

use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::Literal;
use hashbrown::raw::RawTable;

use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
Expand Down Expand Up @@ -2616,16 +2617,24 @@ mod tests {
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;

// Create hash collisions (same hashes)
hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);

let next = vec![2, 0];

let right = build_table_i32(
("a", &vec![10, 20]),
("b", &vec![0, 0]),
("c", &vec![30, 40]),
);

let left_data = (JoinHashMap(hashmap_left), left);
let left_data = (
JoinHashMap {
map: hashmap_left,
next,
},
left,
);
let (l, r) = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
Expand All @@ -2635,7 +2644,6 @@ mod tests {
&random_state,
false,
&mut vec![0; right.num_rows()],
None,
)?;

let mut left_ids = UInt64Builder::with_capacity(0);
Expand Down
78 changes: 71 additions & 7 deletions datafusion/core/src/physical_plan/joins/hash_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,88 @@ use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use datafusion_common::Result;

// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
//
// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
// to put the indices in a certain bucket.
// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
// As the key is a hash value, we need to check possible hash collisions in the probe stage
// During this stage it might be the case that a row is contained the same hashmap value,
// but the values don't match. Those are checked in the [equal_rows] macro
// TODO: speed up collision check and move away from using a hashbrown HashMap
// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
// The chain can be followed until the value "0" has been reached, meaning the end of the list.
Dandandan marked this conversation as resolved.
Show resolved Hide resolved
// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
// See the example below:
// Insert (1,1)
// map:
// ---------
// | 1 | 2 |
// ---------
// next:
// ---------------------
// | 0 | 0 | 0 | 0 | 0 |
// ---------------------
// Insert (2,2)
// map:
// ---------
// | 1 | 2 |
// | 2 | 3 |
// ---------
// next:
// ---------------------
// | 0 | 0 | 0 | 0 | 0 |
// ---------------------
// Insert (1,3)
// map:
// ---------
// | 1 | 4 |
// | 2 | 3 |
// ---------
// next:
// ---------------------
// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1)
// ---------------------
// Insert (1,4)
// map:
// ---------
// | 1 | 5 |
// | 2 | 3 |
// ---------
// next:
// ---------------------
// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1)
// ---------------------

// TODO: speed up collision checks
// https://github.com/apache/arrow-datafusion/issues/50
pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
pub struct JoinHashMap {
// Stores hash value to first index
pub map: RawTable<(u64, u64)>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
}

/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
/// and shrink the indices.
pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berkaysynnada not sure if SmallVec is optimal. It might be an improvement to use Vec here as the >1 case probably occurs more often here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might change this since we are not pushing for the same hash table implementation.


impl JoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
map: RawTable::with_capacity(capacity),
next: vec![0; capacity],
}
}
}

impl SymmetricJoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self(RawTable::with_capacity(capacity))
}

/// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
/// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
/// when necessary. You can adjust the scale_factor value to achieve the desired
/// ,balance between memory usage and performance.
/// balance between memory usage and performance.
//
// If you increase the scale_factor, the capacity will shrink less aggressively,
// leading to potentially higher memory usage but fewer resizes.
Expand Down Expand Up @@ -628,7 +692,7 @@ pub mod tests {
#[test]
fn test_shrink_if_necessary() {
let scale_factor = 4;
let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
let data_size = 2000;
let deleted_part = 3 * data_size / 4;
// Add elements to the JoinHashMap
Expand Down
Loading