Skip to content

Commit

Permalink
Merge pull request #169 from alamb/alamb/hash_reuse
Browse files Browse the repository at this point in the history
Reuse hashes buffer when emitting partial join results
  • Loading branch information
korowa authored Jan 15, 2024
2 parents cdd7f12 + 302c223 commit b631c1e
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ impl ExecutionPlan for HashJoinExec {
state: HashJoinStreamState::WaitBuildSide,
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
batch_size,
hashes_buffer: vec![],
}))
}

Expand Down Expand Up @@ -979,6 +980,8 @@ struct HashJoinStream {
build_side: BuildSide,
/// Maximum output batch size
batch_size: usize,
/// Scratch space for computing hashes
hashes_buffer: Vec<u64>,
}

impl RecordBatchStream for HashJoinStream {
Expand Down Expand Up @@ -1044,6 +1047,7 @@ fn lookup_join_hashmap<T: JoinHashMapType>(
probe_on: &[Column],
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
limit: usize,
offset: JoinHashMapOffset,
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
Expand All @@ -1059,8 +1063,9 @@ fn lookup_join_hashmap<T: JoinHashMapType>(
})
.collect::<Result<Vec<_>>>()?;

let mut hashes_buffer = vec![0; probe_batch.num_rows()];
let hash_values = create_hashes(&keys_values, random_state, &mut hashes_buffer)?;
hashes_buffer.clear();
hashes_buffer.resize(probe_batch.num_rows(), 0);
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;

let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(
Expand Down Expand Up @@ -1263,6 +1268,7 @@ impl HashJoinStream {
&self.on_right,
&self.random_state,
self.null_equals_null,
&mut self.hashes_buffer,
self.batch_size,
state.offset,
)?;
Expand Down Expand Up @@ -2930,6 +2936,7 @@ mod tests {
);

let join_hash_map = JoinHashMap::new(hashmap_left, next);
let mut hashes_buffer = vec![0];

let (l, r, _) = lookup_join_hashmap(
&join_hash_map,
Expand All @@ -2939,6 +2946,7 @@ mod tests {
&[Column::new("a", 0)],
&random_state,
false,
&mut hashes_buffer,
8192,
(0, None),
)?;
Expand Down

0 comments on commit b631c1e

Please sign in to comment.