Skip to content

Commit

Permalink
recalculate hashes & remove iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Jan 21, 2024
1 parent b631c1e commit 343b192
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 36 deletions.
37 changes: 23 additions & 14 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,15 +1039,14 @@ impl RecordBatchStream for HashJoinStream {
/// Probe indices: 3, 3, 4, 5
/// ```
#[allow(clippy::too_many_arguments)]
fn lookup_join_hashmap<T: JoinHashMapType>(
build_hashmap: &T,
fn lookup_join_hashmap(
build_hashmap: &JoinHashMap,
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
build_on: &[Column],
probe_on: &[Column],
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
hashes_buffer: &Vec<u64>,
limit: usize,
offset: JoinHashMapOffset,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
Expand All @@ -1063,13 +1062,9 @@ fn lookup_join_hashmap<T: JoinHashMapType>(
})
.collect::<Result<Vec<_>>>()?;

hashes_buffer.clear();
hashes_buffer.resize(probe_batch.num_rows(), 0);
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;

let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(
hash_values.iter().enumerate(),
hashes_buffer,
None,
limit,
offset,
Expand Down Expand Up @@ -1233,6 +1228,16 @@ impl HashJoinStream {
self.state = HashJoinStreamState::ExhaustedProbeSide;
}
Some(Ok(batch)) => {
// Precalculate hash values for fetched batch
let keys_values = self.on_right
.iter()
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
.collect::<Result<Vec<_>>>()?;

self.hashes_buffer.clear();
self.hashes_buffer.resize(batch.num_rows(), 0);
create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?;

self.state =
HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState {
batch,
Expand Down Expand Up @@ -1266,7 +1271,6 @@ impl HashJoinStream {
&state.batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
&mut self.hashes_buffer,
self.batch_size,
Expand Down Expand Up @@ -2935,16 +2939,21 @@ mod tests {
("c", &vec![30, 40]),
);

// Join key column for both join sides
let key_column = Column::new("a", 0);

let join_hash_map = JoinHashMap::new(hashmap_left, next);
let mut hashes_buffer = vec![0];

let right_keys_values = key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(&[right_keys_values], &random_state, &mut hashes_buffer)?;

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
&left,
&right,
&[Column::new("a", 0)],
&[Column::new("a", 0)],
&random_state,
&[key_column.clone()],
&[key_column],
false,
&mut hashes_buffer,
8192,
Expand Down
67 changes: 45 additions & 22 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ pub trait JoinHashMapType {
/// equality may be required.
fn get_matched_indices_with_limit_offset<'a>(
&self,
iter: impl Iterator<Item = (usize, &'a u64)>,
iter: &Vec<u64>,
deleted_offset: Option<usize>,
limit: usize,
offset: JoinHashMapOffset,
Expand All @@ -251,30 +251,52 @@ pub trait JoinHashMapType {
let mut match_indices = UInt64BufferBuilder::new(0);

let mut output_tuples = 0_usize;
let mut next_offset = None;

let hash_map: &RawTable<(u64, u64)> = self.get_map();
let next_chain = self.get_list();

let (initial_idx, initial_next_idx) = offset;
'probe: for (row_idx, hash_value) in iter.skip(initial_idx) {
let index = if initial_next_idx.is_some() && row_idx == initial_idx {
// If `initial_next_idx` is zero, then input index has been processed
// during previous iteration, and it can be skipped now
if let Some(0) = initial_next_idx {
continue;
}
// Otherwise, use `initial_next_idx` as-is
initial_next_idx
} else if let Some((_, index)) =
let to_skip = match offset {
(initial_idx, None) => initial_idx,
(initial_idx, Some(0)) => initial_idx + 1,
(initial_idx, Some(initial_next_idx)) => {
let mut i = initial_next_idx - 1;
loop {
let match_row_idx = if let Some(offset) = deleted_offset {
// This arguments means that we prune the next index way before here.
if i < offset as u64 {
// End of the list due to pruning
break;
}
i - offset as u64
} else {
i
};
match_indices.append(match_row_idx);
input_indices.append(initial_idx as u32);
output_tuples += 1;
// Follow the chain to get the next index value
let next = next_chain[match_row_idx as usize];

if output_tuples >= limit {
let next_offset = Some((initial_idx, Some(next)));
return (input_indices, match_indices, next_offset);
}
if next == 0 {
// end of list
break;
}
i = next - 1;
};

initial_idx + 1
}
};

let mut row_idx = to_skip;
for hash_value in &iter[to_skip..] {
if let Some((_, index)) =
hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
Some(*index)
} else {
None
};

if let Some(index) = index {
let mut i = index - 1;
loop {
let match_row_idx = if let Some(offset) = deleted_offset {
Expand All @@ -294,8 +316,8 @@ pub trait JoinHashMapType {
let next = next_chain[match_row_idx as usize];

if output_tuples >= limit {
next_offset = Some((row_idx, Some(next)));
break 'probe;
let next_offset = Some((row_idx, Some(next)));
return (input_indices, match_indices, next_offset);
}
if next == 0 {
// end of list
Expand All @@ -304,9 +326,10 @@ pub trait JoinHashMapType {
i = next - 1;
}
}
row_idx += 1;
}

(input_indices, match_indices, next_offset)
(input_indices, match_indices, None)
}
}

Expand Down

0 comments on commit 343b192

Please sign in to comment.