Skip to content

Commit

Permalink
chore(query): fix and refine left join (#11950)
Browse files Browse the repository at this point in the history
* fix left join

* make lint

* make lint

* move JOIN_MAX_BLOCK_SIZE to max_block_size

* add test

* add test
  • Loading branch information
Dousir9 authored Jul 4, 2023
1 parent 34b6784 commit eea8971
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 205 deletions.
5 changes: 4 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,12 +1198,15 @@ impl PipelineBuilder {
fn build_join_probe(&mut self, join: &HashJoin, state: Arc<JoinHashTable>) -> Result<()> {
self.build_pipeline(&join.probe)?;

let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let func_ctx = self.ctx.get_function_context()?;
self.main_pipeline.add_transform(|input, output| {
let transform = TransformHashJoinProbe::create(
self.ctx.clone(),
input,
output,
TransformHashJoinProbe::attach(state.clone())?,
max_block_size,
func_ctx.clone(),
&join.join_type,
!join.non_equi_conditions.is_empty(),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use common_hashtable::HashJoinHashtableLike;
use common_hashtable::RowPtr;
use common_sql::executor::cast_expr_to_non_null_boolean;

use super::desc::JOIN_MAX_BLOCK_SIZE;
use super::desc::MARKER_KIND_FALSE;
use super::desc::MARKER_KIND_NULL;
use super::desc::MARKER_KIND_TRUE;
Expand Down Expand Up @@ -73,6 +72,7 @@ impl JoinHashTable {
}

#[inline]
#[allow(clippy::too_many_arguments)]
pub(crate) fn probe_key<'a, H: HashJoinHashtableLike>(
&self,
hash_table: &'a H,
Expand All @@ -81,9 +81,10 @@ impl JoinHashTable {
i: usize,
vec_ptr: *mut RowPtr,
occupied: usize,
max_block_size: usize,
) -> (usize, u64) {
if valids.as_ref().map_or(true, |v| v.get_bit(i)) {
return hash_table.probe_hash_table(key, vec_ptr, occupied, JOIN_MAX_BLOCK_SIZE);
return hash_table.probe_hash_table(key, vec_ptr, occupied, max_block_size);
}
(0, 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use parking_lot::RwLock;

use crate::sql::plans::JoinType;

pub const JOIN_MAX_BLOCK_SIZE: usize = 65536;
pub const MARKER_KIND_TRUE: u8 = 0;
pub const MARKER_KIND_FALSE: u8 = 1;
pub const MARKER_KIND_NULL: u8 = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use ethnum::U256;

use super::ProbeState;
use crate::pipelines::processors::transforms::hash_join::desc::JoinState;
use crate::pipelines::processors::transforms::hash_join::desc::JOIN_MAX_BLOCK_SIZE;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_NULL;
use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_TRUE;
Expand All @@ -63,7 +62,7 @@ impl HashJoinState for JoinHashTable {
let mut buffer = self.row_space.buffer.write();
buffer.push(input);
let buffer_row_size = buffer.iter().fold(0, |acc, x| acc + x.num_rows());
if buffer_row_size < *self.data_block_size_limit {
if buffer_row_size < *self.build_side_block_size_limit {
Ok(())
} else {
let data_block = DataBlock::concat(buffer.as_slice())?;
Expand Down Expand Up @@ -525,6 +524,7 @@ impl HashJoinState for JoinHashTable {
task: usize,
state: &mut ProbeState,
) -> Result<Vec<DataBlock>> {
let max_block_size = state.max_block_size;
let true_validity = &state.true_validity;
let build_indexes = &mut state.build_indexes;
let mut build_indexes_occupied = 0;
Expand All @@ -551,7 +551,7 @@ impl HashJoinState for JoinHashTable {
let outer_map_len = outer_map.len();
let mut row_index = 0;
while row_index < outer_map_len {
while row_index < outer_map_len && build_indexes_occupied < JOIN_MAX_BLOCK_SIZE {
while row_index < outer_map_len && build_indexes_occupied < max_block_size {
if !outer_map[row_index] {
build_indexes[build_indexes_occupied].chunk_index = chunk_index as u32;
build_indexes[build_indexes_occupied].row_index = row_index as u32;
Expand All @@ -567,7 +567,7 @@ impl HashJoinState for JoinHashTable {

if self.hash_join_desc.join_type == JoinType::Full {
let num_rows = unmatched_build_block.num_rows();
let nullable_unmatched_build_columns = if num_rows == JOIN_MAX_BLOCK_SIZE {
let nullable_unmatched_build_columns = if num_rows == max_block_size {
unmatched_build_block
.columns()
.iter()
Expand Down Expand Up @@ -601,6 +601,7 @@ impl HashJoinState for JoinHashTable {
}

fn right_semi_outer_scan(&self, task: usize, state: &mut ProbeState) -> Result<Vec<DataBlock>> {
let max_block_size = state.max_block_size;
let build_indexes = &mut state.build_indexes;
let mut build_indexes_occupied = 0;
let mut result_blocks = vec![];
Expand All @@ -626,7 +627,7 @@ impl HashJoinState for JoinHashTable {
let outer_map_len = outer_map.len();
let mut row_index = 0;
while row_index < outer_map_len {
while row_index < outer_map_len && build_indexes_occupied < JOIN_MAX_BLOCK_SIZE {
while row_index < outer_map_len && build_indexes_occupied < max_block_size {
if outer_map[row_index] {
build_indexes[build_indexes_occupied].chunk_index = chunk_index as u32;
build_indexes[build_indexes_occupied].row_index = row_index as u32;
Expand All @@ -645,6 +646,7 @@ impl HashJoinState for JoinHashTable {
}

fn right_anti_outer_scan(&self, task: usize, state: &mut ProbeState) -> Result<Vec<DataBlock>> {
let max_block_size = state.max_block_size;
let build_indexes = &mut state.build_indexes;
let mut build_indexes_occupied = 0;
let mut result_blocks = vec![];
Expand All @@ -670,7 +672,7 @@ impl HashJoinState for JoinHashTable {
let outer_map_len = outer_map.len();
let mut row_index = 0;
while row_index < outer_map_len {
while row_index < outer_map_len && build_indexes_occupied < JOIN_MAX_BLOCK_SIZE {
while row_index < outer_map_len && build_indexes_occupied < max_block_size {
if !outer_map[row_index] {
build_indexes[build_indexes_occupied].chunk_index = chunk_index as u32;
build_indexes[build_indexes_occupied].row_index = row_index as u32;
Expand All @@ -693,6 +695,7 @@ impl HashJoinState for JoinHashTable {
}

fn left_mark_scan(&self, task: usize, state: &mut ProbeState) -> Result<Vec<DataBlock>> {
let max_block_size = state.max_block_size;
let build_indexes = &mut state.build_indexes;
let mut build_indexes_occupied = 0;
let mut result_blocks = vec![];
Expand Down Expand Up @@ -720,7 +723,7 @@ impl HashJoinState for JoinHashTable {
let markers_len = markers.len();
let mut row_index = 0;
while row_index < markers_len {
let block_size = std::cmp::min(markers_len - row_index, JOIN_MAX_BLOCK_SIZE);
let block_size = std::cmp::min(markers_len - row_index, max_block_size);
let mut validity = MutableBitmap::with_capacity(block_size);
let mut boolean_bit_map = MutableBitmap::with_capacity(block_size);
while build_indexes_occupied < block_size {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub enum HashJoinHashTable {

pub struct JoinHashTable {
pub(crate) ctx: Arc<QueryContext>,
pub(crate) data_block_size_limit: Arc<usize>,
pub(crate) build_side_block_size_limit: Arc<usize>,
/// Reference count
pub(crate) build_count: Mutex<usize>,
pub(crate) finalize_count: Mutex<usize>,
Expand Down Expand Up @@ -161,7 +161,9 @@ impl JoinHashTable {
}
Ok(Self {
row_space: RowSpace::new(ctx.clone(), build_data_schema)?,
data_block_size_limit: Arc::new(ctx.get_settings().get_max_block_size()? as usize * 16),
build_side_block_size_limit: Arc::new(
ctx.get_settings().get_max_block_size()? as usize * 16,
),
ctx,
build_count: Mutex::new(0),
finalize_count: Mutex::new(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_functions::BUILTIN_FUNCTIONS;
use common_hashtable::HashJoinHashtableLike;
use common_sql::executor::cast_expr_to_non_null_boolean;

use crate::pipelines::processors::transforms::hash_join::desc::JOIN_MAX_BLOCK_SIZE;
use crate::pipelines::processors::transforms::hash_join::ProbeState;
use crate::pipelines::processors::JoinHashTable;

Expand All @@ -42,6 +41,7 @@ impl JoinHashTable {
IT: Iterator<Item = &'a H::Key> + TrustedLen,
H::Key: 'a,
{
let max_block_size = probe_state.max_block_size;
let valids = &probe_state.valids;
// The inner join will return multiple data blocks of similar size.
let mut occupied = 0;
Expand All @@ -62,22 +62,28 @@ impl JoinHashTable {

for (i, key) in keys_iter.enumerate() {
// If the join is derived from correlated subquery, then null equality is safe.
let (mut match_count, mut incomplete_ptr) = if self
.hash_join_desc
.from_correlated_subquery
{
hash_table.probe_hash_table(key, build_indexes_ptr, occupied, JOIN_MAX_BLOCK_SIZE)
} else {
self.probe_key(hash_table, key, valids, i, build_indexes_ptr, occupied)
};
let (mut match_count, mut incomplete_ptr) =
if self.hash_join_desc.from_correlated_subquery {
hash_table.probe_hash_table(key, build_indexes_ptr, occupied, max_block_size)
} else {
self.probe_key(
hash_table,
key,
valids,
i,
build_indexes_ptr,
occupied,
max_block_size,
)
};
if match_count == 0 {
continue;
}

occupied += match_count;
probe_indexes[probe_indexes_len] = (i as u32, match_count as u32);
probe_indexes_len += 1;
if occupied >= JOIN_MAX_BLOCK_SIZE {
if occupied >= max_block_size {
loop {
probed_blocks.push(
self.merge_eq_block(
Expand All @@ -103,7 +109,7 @@ impl JoinHashTable {
incomplete_ptr,
build_indexes_ptr,
occupied,
JOIN_MAX_BLOCK_SIZE,
max_block_size,
);
if match_count == 0 {
break;
Expand All @@ -113,7 +119,7 @@ impl JoinHashTable {
probe_indexes[probe_indexes_len] = (i as u32, match_count as u32);
probe_indexes_len += 1;

if occupied < JOIN_MAX_BLOCK_SIZE {
if occupied < max_block_size {
break;
}
}
Expand Down
Loading

1 comment on commit eea8971

@vercel
Copy link

@vercel vercel bot commented on eea8971 Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.