diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 546a929bf939..da57fa07ccd9 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -73,7 +73,47 @@ use datafusion_physical_expr::EquivalenceProperties; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; -type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation); +/// HashTable and input data for the left (build side) of a join +struct JoinLeftData { + /// The hash table with indices into `batch` + hash_map: JoinHashMap, + /// The input rows for the build side + batch: RecordBatch, + /// Memory reservation that tracks memory used by `hash_map` hash table + /// `batch`. Cleared on drop. + #[allow(dead_code)] + reservation: MemoryReservation, +} + +impl JoinLeftData { + /// Create a new `JoinLeftData` from its parts + fn new( + hash_map: JoinHashMap, + batch: RecordBatch, + reservation: MemoryReservation, + ) -> Self { + Self { + hash_map, + batch, + reservation, + } + } + + /// Returns the number of rows in the build side + fn num_rows(&self) -> usize { + self.batch.num_rows() + } + + /// return a reference to the hash map + fn hash_map(&self) -> &JoinHashMap { + &self.hash_map + } + + /// returns a reference to the build side batch + fn batch(&self) -> &RecordBatch { + &self.batch + } +} /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple /// partitions using a hash table and an optional filter list to apply post @@ -692,8 +732,9 @@ async fn collect_left_input( // Merge all batches into a single batch, so we // can directly index into the arrays let single_batch = concat_batches(&schema, &batches, num_rows)?; + let data = JoinLeftData::new(hashmap, single_batch, reservation); - Ok((hashmap, single_batch, reservation)) + Ok(data) } /// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`, @@ -770,7 +811,7 @@ struct HashJoinStream { left_fut: OnceFut, /// Which left (probe) side rows have been matches while creating output. /// For some OUTER joins, we need to know which rows have not been matched - /// to produce the correct. + /// to produce the correct output. visited_left_side: Option, /// right (probe) input right: SendableRecordBatchStream, @@ -1042,13 +1083,13 @@ impl HashJoinStream { { // TODO: Replace `ceil` wrapper with stable `div_cell` after // https://github.com/rust-lang/rust/issues/88581 - let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8); + let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8); self.reservation.try_grow(visited_bitmap_size)?; self.join_metrics.build_mem_used.add(visited_bitmap_size); } let visited_left_side = self.visited_left_side.get_or_insert_with(|| { - let num_rows = left_data.1.num_rows(); + let num_rows = left_data.num_rows(); if need_produce_result_in_final(self.join_type) { // Some join types need to track which row has be matched or unmatched: // `left semi` join: need to use the bitmap to produce the matched row in the left side @@ -1075,8 +1116,8 @@ impl HashJoinStream { // get the matched two indices for the on condition let left_right_indices = build_equal_condition_join_indices( - &left_data.0, - &left_data.1, + left_data.hash_map(), + left_data.batch(), &batch, &self.on_left, &self.on_right, @@ -1108,7 +1149,7 @@ impl HashJoinStream { let result = build_batch_from_indices( &self.schema, - &left_data.1, + left_data.batch(), &batch, &left_side, &right_side, @@ -1140,7 +1181,7 @@ impl HashJoinStream { // use the left and right indices to produce the batch result let result = build_batch_from_indices( &self.schema, - &left_data.1, + left_data.batch(), &empty_right_batch, &left_side, &right_side, @@ -2519,16 +2560,11 @@ mod tests { ("c", &vec![30, 40]), ); - let left_data = ( - JoinHashMap { - map: hashmap_left, - next, - }, - left, - ); + let join_hash_map = JoinHashMap::new(hashmap_left, next); + let (l, r) = build_equal_condition_join_indices( - &left_data.0, - &left_data.1, + &join_hash_map, + &left, &right, &[Column::new("a", 0)], &[Column::new("a", 0)], diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs index 5ebf370b6d71..fecbf96f0895 100644 --- a/datafusion/physical-plan/src/joins/hash_join_utils.rs +++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs @@ -103,12 +103,17 @@ use hashbrown::HashSet; /// ``` pub struct JoinHashMap { // Stores hash value to last row index - pub map: RawTable<(u64, u64)>, + map: RawTable<(u64, u64)>, // Stores indices in chained list data structure - pub next: Vec, + next: Vec, } impl JoinHashMap { + #[cfg(test)] + pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec) -> Self { + Self { map, next } + } + pub(crate) fn with_capacity(capacity: usize) -> Self { JoinHashMap { map: RawTable::with_capacity(capacity),