From 7659e30319e2b1da36f1f81c35379b00b88fbd38 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 10:46:36 +0800 Subject: [PATCH 01/11] feat: integrate hash join with new filter framework --- .../expression/src/filter/filter_executor.rs | 6 +- .../processors/transforms/hash_join/common.rs | 33 +++++++ .../merge_into_hash_join_optimization.rs | 96 ++++++++++++------- .../hash_join/probe_join/inner_join.rs | 26 +---- .../hash_join/probe_join/left_anti_join.rs | 19 ++-- .../hash_join/probe_join/left_join.rs | 92 +++++++----------- .../hash_join/probe_join/left_semi_join.rs | 44 ++++----- .../transforms/hash_join/probe_state.rs | 23 +++++ .../hash_join/transform_hash_join_probe.rs | 6 ++ .../read/native_data_source_deserializer.rs | 2 +- 10 files changed, 191 insertions(+), 156 deletions(-) diff --git a/src/query/expression/src/filter/filter_executor.rs b/src/query/expression/src/filter/filter_executor.rs index 17b01e66589f0..1bf81226ce192 100644 --- a/src/query/expression/src/filter/filter_executor.rs +++ b/src/query/expression/src/filter/filter_executor.rs @@ -179,7 +179,11 @@ impl FilterExecutor { true_idx } - pub fn mut_true_selection(&mut self) -> &mut [u32] { + pub fn true_selection(&mut self) -> &[u32] { + &self.true_selection + } + + pub fn mutable_true_selection(&mut self) -> &mut [u32] { &mut self.true_selection } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 3c53de4d70775..719ce2c62b920 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -25,6 +25,7 @@ use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::Evaluator; use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::Value; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -93,6 +94,38 @@ impl HashJoinProbeState { Ok(DataBlock::new_from_columns(vec![marker_column])) } + // return (result data block, filtered indices, all_true, all_false). + pub(crate) fn get_other_predicate_result_block<'a>( + &self, + filter_executor: &'a mut FilterExecutor, + data_block: DataBlock, + ) -> Result<(DataBlock, &'a [u32], bool, bool)> { + let origin_count = data_block.num_rows(); + let result_block = filter_executor.filter(data_block)?; + let result_count = result_block.num_rows(); + Ok(( + result_block, + &filter_executor.true_selection()[0..result_count], + result_count == origin_count, + result_count == 0, + )) + } + + // return (result data block, filtered indices, all_true, all_false). + pub(crate) fn get_other_predicate_selection<'a>( + &self, + filter_executor: &'a mut FilterExecutor, + data_block: &DataBlock, + ) -> Result<(&'a [u32], bool, bool)> { + let origin_count = data_block.num_rows(); + let result_count = filter_executor.select(data_block)?; + Ok(( + &filter_executor.true_selection()[0..result_count], + result_count == origin_count, + result_count == 0, + )) + } + // return an (option bitmap, all_true, all_false). pub(crate) fn get_other_filters( &self, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs index 08984a78b56ee..89d1c540d7f35 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs @@ -16,7 +16,6 @@ use std::cell::SyncUnsafeCell; use std::sync::atomic::AtomicU8; use std::sync::atomic::Ordering; -use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_catalog::plan::compute_row_id_prefix; use databend_common_catalog::plan::split_prefix; use databend_common_exception::ErrorCode; @@ -157,7 +156,7 @@ impl HashJoinProbeState { &self, build_indexes: &[RowPtr], matched_idx: usize, - valids: &Bitmap, + selection: Option<&[u32]>, ) -> Result<()> { // merge into target table as build side. if self @@ -176,45 +175,70 @@ impl HashJoinProbeState { let pointer = &merge_into_state.atomic_pointer; // add matched indexes. - for (idx, row_ptr) in build_indexes[0..matched_idx].iter().enumerate() { - unsafe { - if !valids.get_bit_unchecked(idx) { - continue; - } - } - let offset = if row_ptr.chunk_index == 0 { - row_ptr.row_index as usize - } else { - chunk_offsets[(row_ptr.chunk_index - 1) as usize] as usize - + row_ptr.row_index as usize - }; + if let Some(selection) = selection { + for idx in selection { + let row_ptr = unsafe { build_indexes.get_unchecked(*idx as usize) }; + let offset = if row_ptr.chunk_index == 0 { + row_ptr.row_index as usize + } else { + chunk_offsets[(row_ptr.chunk_index - 1) as usize] as usize + + row_ptr.row_index as usize + }; - let mut old_mactehd_counts = - unsafe { (*pointer.0.add(offset)).load(Ordering::Relaxed) }; - let mut new_matched_count = old_mactehd_counts + 1; - loop { - if old_mactehd_counts > 0 { - return Err(ErrorCode::UnresolvableConflict( - "multi rows from source match one and the same row in the target_table multi times in probe phase", - )); - } + let mut old_mactehd_counts = + unsafe { (*pointer.0.add(offset)).load(Ordering::Relaxed) }; + loop { + if old_mactehd_counts > 0 { + return Err(ErrorCode::UnresolvableConflict( + "multi rows from source match one and the same row in the target_table multi times in probe phase", + )); + } - let res = unsafe { - (*pointer.0.add(offset)).compare_exchange_weak( - old_mactehd_counts, - new_matched_count, - Ordering::SeqCst, - Ordering::SeqCst, - ) + let res = unsafe { + (*pointer.0.add(offset)).compare_exchange_weak( + old_mactehd_counts, + old_mactehd_counts + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) + }; + match res { + Ok(_) => break, + Err(x) => old_mactehd_counts = x, + }; + } + } + } else { + for row_ptr in &build_indexes[0..matched_idx] { + let offset = if row_ptr.chunk_index == 0 { + row_ptr.row_index as usize + } else { + chunk_offsets[(row_ptr.chunk_index - 1) as usize] as usize + + row_ptr.row_index as usize }; - match res { - Ok(_) => break, - Err(x) => { - old_mactehd_counts = x; - new_matched_count = old_mactehd_counts + 1; + let mut old_mactehd_counts = + unsafe { (*pointer.0.add(offset)).load(Ordering::Relaxed) }; + loop { + if old_mactehd_counts > 0 { + return Err(ErrorCode::UnresolvableConflict( + "multi rows from source match one and the same row in the target_table multi times in probe phase", + )); } - }; + + let res = unsafe { + (*pointer.0.add(offset)).compare_exchange_weak( + old_mactehd_counts, + old_mactehd_counts + 1, + Ordering::SeqCst, + Ordering::SeqCst, + ) + }; + match res { + Ok(_) => break, + Err(x) => old_mactehd_counts = x, + }; + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 9885931245a12..74f4669a0fd8e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -17,15 +17,10 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::types::BooleanType; -use databend_common_expression::types::DataType; use databend_common_expression::DataBlock; -use databend_common_expression::Evaluator; use databend_common_expression::KeyAccessor; -use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; -use databend_common_sql::executor::cast_expr_to_non_null_boolean; use crate::pipelines::processors::transforms::hash_join::build_state::BuildBlockGenerationState; use crate::pipelines::processors::transforms::hash_join::common::wrap_true_validity; @@ -183,13 +178,9 @@ impl HashJoinProbeState { )?); } - match &self.hash_join_state.hash_join_desc.other_predicate { + match &mut probe_state.filter_executor { None => Ok(result_blocks), - Some(other_predicate) => { - // Wrap `is_true` to `other_predicate` - let other_predicate = cast_expr_to_non_null_boolean(other_predicate.clone())?; - assert_eq!(other_predicate.data_type(), &DataType::Boolean); - + Some(filter_executor) => { let mut filtered_blocks = Vec::with_capacity(result_blocks.len()); for result_block in result_blocks { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { @@ -197,16 +188,9 @@ impl HashJoinProbeState { "Aborted query, because the server is shutting down or the query was killed.", )); } - - let evaluator = - Evaluator::new(&result_block, &self.func_ctx, &BUILTIN_FUNCTIONS); - let predicate = evaluator - .run(&other_predicate)? - .try_downcast::() - .unwrap(); - let res = result_block.filter_boolean_value(&predicate)?; - if !res.is_empty() { - filtered_blocks.push(res); + let result_block = filter_executor.filter(result_block)?; + if !result_block.is_empty() { + filtered_blocks.push(result_block); } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs index 9a6e709a73074..b0a6f8a99d649 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs @@ -17,7 +17,7 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; @@ -112,12 +112,7 @@ impl HashJoinProbeState { // For anti join, it defaults to false. let mut row_state = vec![false; input.num_rows()]; - let other_predicate = self - .hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(); + let filter_executor = probe_state.filter_executor.as_mut().unwrap(); // Results. let mut matched_idx = 0; @@ -152,8 +147,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -193,8 +188,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -217,8 +212,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; } @@ -251,8 +246,8 @@ impl HashJoinProbeState { build_indexes: &[RowPtr], probe_state: &mut ProbeBlockGenerationState, build_state: &BuildBlockGenerationState, - other_predicate: &Expr, row_state: &mut [bool], + filter_executor: &mut FilterExecutor, ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( @@ -284,9 +279,9 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block.clone(), build_block, matched_idx); self.update_row_state( &result_block, - other_predicate, &probe_indexes[0..matched_idx], row_state, + filter_executor, )?; Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index 1335e4fd88778..dc0937248d62e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -18,7 +18,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockEntry; use databend_common_expression::DataBlock; -use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_expression::Scalar; use databend_common_expression::Value; @@ -233,12 +233,7 @@ impl HashJoinProbeState { // Build states. let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; let outer_scan_map = &mut build_state.outer_scan_map; - let other_predicate = self - .hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(); + let filter_executor = probe_state.filter_executor.as_mut().unwrap(); // Results. let mut matched_idx = 0; @@ -282,7 +277,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut result_blocks, - Some(other_predicate), + Some(filter_executor), Some(row_state), Some(row_state_indexes), )?; @@ -336,7 +331,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut result_blocks, - Some(other_predicate), + Some(filter_executor), Some(row_state), Some(row_state_indexes), )?; @@ -366,7 +361,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut result_blocks, - Some(other_predicate), + Some(filter_executor), Some(row_state), Some(row_state_indexes), )?; @@ -465,7 +460,7 @@ impl HashJoinProbeState { build_state: &BuildBlockGenerationState, outer_scan_map: &mut [Vec], result_blocks: &mut Vec, - other_predicate: Option<&Expr>, + filter_executor: Option<&mut FilterExecutor>, row_state: Option<&mut Vec>, row_state_indexes: Option<&mut Vec>, ) -> Result<()> { @@ -526,7 +521,7 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); - if other_predicate.is_none() { + if filter_executor.is_none() { result_blocks.push(result_block); if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { for row_ptr in build_indexes[0..matched_idx].iter() { @@ -537,25 +532,17 @@ impl HashJoinProbeState { }; } } - self.merge_into_check_and_set_matched( - build_indexes, - matched_idx, - &probe_state.true_validity, - )?; + self.merge_into_check_and_set_matched(build_indexes, matched_idx, None)?; return Ok(()); } + // Safe to unwrap. let row_state = row_state.unwrap(); let row_state_indexes = row_state_indexes.unwrap(); - let (bm, all_true, all_false) = self.get_other_filters( - &result_block, - self.hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(), - &self.func_ctx, - )?; + let filter_executor = filter_executor.unwrap(); + + let (result_block, selection, all_true, all_false) = + self.get_other_predicate_result_block(filter_executor, result_block)?; if all_true { result_blocks.push(result_block); @@ -568,55 +555,42 @@ impl HashJoinProbeState { }; } } - self.merge_into_check_and_set_matched( - build_indexes, - matched_idx, - &probe_state.true_validity, - )?; + self.merge_into_check_and_set_matched(build_indexes, matched_idx, Some(selection))?; } else if all_false { - let mut idx = 0; - while idx < matched_idx { + for idx in 0..matched_idx { unsafe { *row_state.get_unchecked_mut(*row_state_indexes.get_unchecked(idx)) -= 1; }; - idx += 1; } } else { - // Safe to unwrap. - let validity = bm.unwrap(); + result_blocks.push(result_block); if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { - let mut idx = 0; - while idx < matched_idx { + let mut count = 0; + for idx in selection { unsafe { - let valid = validity.get_bit_unchecked(idx); - let row_ptr = build_indexes.get_unchecked(idx); - if valid { - *outer_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = true; - } else { - *row_state.get_unchecked_mut(*row_state_indexes.get_unchecked(idx)) -= - 1; + while count < *idx { + *row_state.get_unchecked_mut( + *row_state_indexes.get_unchecked(count as usize), + ) -= 1; + count += 1; } + let row_ptr = build_indexes.get_unchecked(*idx as usize); + *outer_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = true; + count += 1; } - idx += 1; } } else { - let mut idx = 0; - self.merge_into_check_and_set_matched(build_indexes, matched_idx, &validity)?; - while idx < matched_idx { + self.merge_into_check_and_set_matched(build_indexes, matched_idx, Some(selection))?; + for idx in selection { unsafe { - let valid = validity.get_bit_unchecked(idx); - if !valid { - *row_state.get_unchecked_mut(*row_state_indexes.get_unchecked(idx)) -= - 1; - } + *row_state + .get_unchecked_mut(*row_state_indexes.get_unchecked(*idx as usize)) -= + 1; } - idx += 1; } } - let filtered_block = DataBlock::filter_with_bitmap(result_block, &validity)?; - result_blocks.push(filtered_block); } Ok(()) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs index 3a280f5b75540..2aa9f7dcea28a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs @@ -17,7 +17,7 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; @@ -109,12 +109,7 @@ impl HashJoinProbeState { // For semi join, it defaults to false. let mut row_state = vec![false; input.num_rows()]; - let other_predicate = self - .hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(); + let filter_executor = probe_state.filter_executor.as_mut().unwrap(); // Results. let mut matched_idx = 0; @@ -148,8 +143,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -188,8 +183,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -212,8 +207,8 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, &mut row_state, + filter_executor, )?; } @@ -246,8 +241,8 @@ impl HashJoinProbeState { build_indexes: &[RowPtr], probe_state: &mut ProbeBlockGenerationState, build_state: &BuildBlockGenerationState, - other_predicate: &Expr, row_state: &mut [bool], + filter_executor: &mut FilterExecutor, ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( @@ -279,9 +274,9 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block.clone(), build_block, matched_idx); self.update_row_state( &result_block, - other_predicate, &probe_indexes[0..matched_idx], row_state, + filter_executor, )?; Ok(()) @@ -291,25 +286,22 @@ impl HashJoinProbeState { pub(crate) fn update_row_state( &self, result_block: &DataBlock, - other_predicate: &Expr, probe_indexes: &[u32], row_state: &mut [bool], + filter_executor: &mut FilterExecutor, ) -> Result<()> { - match self.get_other_filters(result_block, other_predicate, &self.func_ctx)? { - (Some(bm), _, _) => { - for (row, selected) in probe_indexes.iter().zip(bm.iter()) { - if selected && unsafe { !*row_state.get_unchecked(*row as usize) } { - unsafe { *row_state.get_unchecked_mut(*row as usize) = true }; - } - } + let (selection, all_true, all_false) = + self.get_other_predicate_selection(filter_executor, result_block)?; + if all_true { + for index in probe_indexes.iter() { + unsafe { *row_state.get_unchecked_mut(*index as usize) = true }; } - (_, true, _) => { - for row in probe_indexes.iter() { - unsafe { *row_state.get_unchecked_mut(*row as usize) = true }; - } + } else if !all_false { + for i in selection { + let index = unsafe { *probe_indexes.get_unchecked(*i as usize) }; + unsafe { *row_state.get_unchecked_mut(index as usize) = true }; } - _ => (), - }; + } Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index 8be9a294a40d4..1aea966b5451a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -13,7 +13,11 @@ // limitations under the License. use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_expression::filter::build_select_expr; +use databend_common_expression::filter::FilterExecutor; +use databend_common_expression::Expr; use databend_common_expression::FunctionContext; +use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::RowPtr; use super::desc::MARKER_KIND_FALSE; @@ -58,6 +62,8 @@ pub struct ProbeState { // 5.If join type is LEFT / LEFT SINGLE / LEFT ANTI / FULL, we use it to store unmatched indexes // count during early filtering. pub(crate) probe_unmatched_indexes_count: usize, + + pub(crate) filter_executor: Option, } impl ProbeState { @@ -72,6 +78,7 @@ impl ProbeState { with_conjunction: bool, has_string_column: bool, func_ctx: FunctionContext, + other_predicate: Option, ) -> Self { let (row_state, row_state_indexes) = match &join_type { JoinType::Left | JoinType::LeftSingle | JoinType::Full => { @@ -97,6 +104,21 @@ impl ProbeState { } else { None }; + let filter_executor = if let Some(predicate) = other_predicate { + let (select_expr, has_or) = build_select_expr(&predicate).into(); + let filter_executor = FilterExecutor::new( + select_expr, + func_ctx.clone(), + has_or, + max_block_size, + None, + &BUILTIN_FUNCTIONS, + false, + ); + Some(filter_executor) + } else { + None + }; ProbeState { max_block_size, mutable_indexes: MutableIndexes::new(max_block_size), @@ -114,6 +136,7 @@ impl ProbeState { markers, probe_unmatched_indexes_count: 0, with_conjunction, + filter_executor, } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 3610998e808eb..776c741a08977 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -87,6 +87,11 @@ impl TransformHashJoinProbe { has_string_column: bool, ) -> Result> { let id = join_probe_state.probe_attach()?; + let other_predicate = join_probe_state + .hash_join_state + .hash_join_desc + .other_predicate + .clone(); Ok(Box::new(TransformHashJoinProbe { input_port, output_port, @@ -102,6 +107,7 @@ impl TransformHashJoinProbe { with_conjunct, has_string_column, func_ctx, + other_predicate, ), max_block_size, outer_scan_finished: false, diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 80101a78bcc4d..5fcd46827e8ca 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -984,7 +984,7 @@ impl NativeDeserializeDataTransform { let offset = self.read_state.offset; let offsets = if let Some(count) = self.read_state.filtered_count { let filter_executor = self.filter_executor.as_mut().unwrap(); - filter_executor.mut_true_selection()[0..count] + filter_executor.mutable_true_selection()[0..count] .iter() .map(|idx| *idx as usize + offset) .collect::>() From 4fee4d66eac7a0274d6e1fdc53cc45839413a750 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 12:43:49 +0800 Subject: [PATCH 02/11] fix: left outer update row_state --- .../transforms/hash_join/probe_join/left_join.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index dc0937248d62e..0b503354a4ce4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -583,11 +583,16 @@ impl HashJoinProbeState { } } else { self.merge_into_check_and_set_matched(build_indexes, matched_idx, Some(selection))?; + let mut count = 0; for idx in selection { unsafe { - *row_state - .get_unchecked_mut(*row_state_indexes.get_unchecked(*idx as usize)) -= - 1; + while count < *idx { + *row_state.get_unchecked_mut( + *row_state_indexes.get_unchecked(count as usize), + ) -= 1; + count += 1; + } + count += 1; } } } From ed7d28464e8a832700a9dc6a11dca505cd3fe0bf Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 14:31:33 +0800 Subject: [PATCH 03/11] fix: add row_state end index --- .../hash_join/probe_join/left_join.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index 0b503354a4ce4..2de3926dfe1e5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -564,8 +564,8 @@ impl HashJoinProbeState { } } else { result_blocks.push(result_block); + let mut count = 0; if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { - let mut count = 0; for idx in selection { unsafe { while count < *idx { @@ -583,19 +583,25 @@ impl HashJoinProbeState { } } else { self.merge_into_check_and_set_matched(build_indexes, matched_idx, Some(selection))?; - let mut count = 0; for idx in selection { - unsafe { - while count < *idx { + while count < *idx { + unsafe { *row_state.get_unchecked_mut( *row_state_indexes.get_unchecked(count as usize), - ) -= 1; - count += 1; - } + ) -= 1 + }; count += 1; } + count += 1; } } + while (count as usize) < matched_idx { + unsafe { + *row_state + .get_unchecked_mut(*row_state_indexes.get_unchecked(count as usize)) -= 1 + }; + count += 1; + } } Ok(()) From e23760490224ec93c05c60fda327611d20f81401 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 16:09:21 +0800 Subject: [PATCH 04/11] feat: support left mark join --- .../hash_join/probe_join/left_mark_join.rs | 59 +++++++++---------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index 6dc6498cd47a5..b3296612a77b1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -20,7 +20,7 @@ use databend_common_expression::types::BooleanType; use databend_common_expression::types::NullableType; use databend_common_expression::types::ValueType; use databend_common_expression::DataBlock; -use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; @@ -169,12 +169,7 @@ impl HashJoinProbeState { let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; let mark_scan_map = &mut build_state.mark_scan_map; let _mark_scan_map_lock = self.mark_scan_map_lock.lock(); - let other_predicate = self - .hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(); + let filter_executor = probe_state.filter_executor.as_mut().unwrap(); // Results. let mut matched_idx = 0; @@ -207,7 +202,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, + filter_executor, mark_scan_map, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( @@ -247,7 +242,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, + filter_executor, mark_scan_map, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( @@ -283,7 +278,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - other_predicate, + filter_executor, mark_scan_map, )?; } @@ -325,7 +320,7 @@ impl HashJoinProbeState { build_indexes: &[RowPtr], probe_state: &mut ProbeBlockGenerationState, build_state: &BuildBlockGenerationState, - other_predicate: &Expr, + filter_executor: &mut FilterExecutor, mark_scan_map: &mut [Vec], ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { @@ -354,32 +349,34 @@ impl HashJoinProbeState { } else { None }; - let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); - let filter = - self.get_nullable_filter_column(&result_block, other_predicate, &self.func_ctx)?; - let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); - let validity = &filter_viewer.validity; - let data = &filter_viewer.column; + let (selection, all_true, all_false) = self.get_other_predicate_selection(filter_executor, &result_block)?; - for (idx, build_index) in build_indexes[0..matched_idx].iter().enumerate() { + let mut count = 0; + for idx in selection { unsafe { - if !validity.get_bit_unchecked(idx) { - if *mark_scan_map - .get_unchecked(build_index.chunk_index as usize) - .get_unchecked(build_index.row_index as usize) - == MARKER_KIND_FALSE - { - *mark_scan_map - .get_unchecked_mut(build_index.chunk_index as usize) - .get_unchecked_mut(build_index.row_index as usize) = MARKER_KIND_NULL; - } - } else if data.get_bit_unchecked(idx) { + while count < *idx { + let row_ptr = build_indexes.get_unchecked(count as usize); *mark_scan_map - .get_unchecked_mut(build_index.chunk_index as usize) - .get_unchecked_mut(build_index.row_index as usize) = MARKER_KIND_TRUE; + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; + count += 1; } + let row_ptr = build_indexes.get_unchecked(*idx as usize); + *mark_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_TRUE; + count += 1; + } + } + while (count as usize) < matched_idx { + unsafe { + let row_ptr = build_indexes.get_unchecked(count as usize); + *mark_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; + count += 1; } } From 2f99009cd6a047df779658cf8257d54dd77587f4 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 16:09:50 +0800 Subject: [PATCH 05/11] chore: make lint --- .../hash_join/probe_join/left_mark_join.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index b3296612a77b1..0f79e4fd21447 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -351,7 +351,8 @@ impl HashJoinProbeState { }; let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); - let (selection, all_true, all_false) = self.get_other_predicate_selection(filter_executor, &result_block)?; + let (selection, all_true, all_false) = + self.get_other_predicate_selection(filter_executor, &result_block)?; let mut count = 0; for idx in selection { @@ -359,14 +360,14 @@ impl HashJoinProbeState { while count < *idx { let row_ptr = build_indexes.get_unchecked(count as usize); *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; count += 1; } let row_ptr = build_indexes.get_unchecked(*idx as usize); *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_TRUE; + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_TRUE; count += 1; } } @@ -374,8 +375,8 @@ impl HashJoinProbeState { unsafe { let row_ptr = build_indexes.get_unchecked(count as usize); *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; count += 1; } } From e93a1bed2a16367e82362a687ed70429680b1996 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 17:12:52 +0800 Subject: [PATCH 06/11] feat: support right join --- .../hash_join/probe_join/left_mark_join.rs | 6 +- .../hash_join/probe_join/right_join.rs | 155 +++++++++--------- 2 files changed, 80 insertions(+), 81 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index 0f79e4fd21447..23bbebfc6feb2 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -16,9 +16,6 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::types::BooleanType; -use databend_common_expression::types::NullableType; -use databend_common_expression::types::ValueType; use databend_common_expression::DataBlock; use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; @@ -26,7 +23,6 @@ use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; use crate::pipelines::processors::transforms::hash_join::build_state::BuildBlockGenerationState; -use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_NULL; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_TRUE; use crate::pipelines::processors::transforms::hash_join::probe_state::ProbeBlockGenerationState; @@ -351,7 +347,7 @@ impl HashJoinProbeState { }; let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); - let (selection, all_true, all_false) = + let (selection, _, _) = self.get_other_predicate_selection(filter_executor, &result_block)?; let mut count = 0; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index 809476118049f..104ba9070687a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -15,10 +15,10 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; @@ -99,6 +99,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut right_single_scan_map, + probe_state.filter_executor.as_mut(), )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -140,6 +141,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut right_single_scan_map, + probe_state.filter_executor.as_mut(), )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -165,6 +167,7 @@ impl HashJoinProbeState { &build_state.generation_state, outer_scan_map, &mut right_single_scan_map, + probe_state.filter_executor.as_mut(), )?; } @@ -175,38 +178,57 @@ impl HashJoinProbeState { &self, build_indexes: &[RowPtr], right_single_scan_map: &[*mut AtomicBool], - bitmap: Option<&Bitmap>, + selection: Option<&[u32]>, ) -> Result<()> { - let dummy_bitmap = Bitmap::new(); - let (has_bitmap, validity) = match bitmap { - Some(validity) => (true, validity), - None => (false, &dummy_bitmap), - }; - for (idx, row_ptr) in build_indexes.iter().enumerate() { - if has_bitmap && unsafe { !validity.get_bit_unchecked(idx) } { - continue; - } - let old = unsafe { - (*right_single_scan_map[row_ptr.chunk_index as usize] - .add(row_ptr.row_index as usize)) - .load(Ordering::SeqCst) - }; - if old { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + if let Some(selection) = selection { + for idx in selection { + let row_ptr = unsafe { build_indexes.get_unchecked(*idx as usize) }; + let old = unsafe { + (*right_single_scan_map[row_ptr.chunk_index as usize] + .add(row_ptr.row_index as usize)) + .load(Ordering::SeqCst) + }; + if old { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + let res = unsafe { + (*right_single_scan_map[row_ptr.chunk_index as usize] + .add(row_ptr.row_index as usize)) + .compare_exchange_weak(old, true, Ordering::SeqCst, Ordering::SeqCst) + }; + if res.is_err() { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } } - let res = unsafe { - (*right_single_scan_map[row_ptr.chunk_index as usize] - .add(row_ptr.row_index as usize)) - .compare_exchange_weak(old, true, Ordering::SeqCst, Ordering::SeqCst) - }; - if res.is_err() { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + } else { + for row_ptr in build_indexes { + let old = unsafe { + (*right_single_scan_map[row_ptr.chunk_index as usize] + .add(row_ptr.row_index as usize)) + .load(Ordering::SeqCst) + }; + if old { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + let res = unsafe { + (*right_single_scan_map[row_ptr.chunk_index as usize] + .add(row_ptr.row_index as usize)) + .compare_exchange_weak(old, true, Ordering::SeqCst, Ordering::SeqCst) + }; + if res.is_err() { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } } } + Ok(()) } @@ -223,6 +245,7 @@ impl HashJoinProbeState { build_state: &BuildBlockGenerationState, outer_scan_map: &mut [Vec], right_single_scan_map: &mut [*mut AtomicBool], + filter_executor: Option<&mut FilterExecutor>, ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( @@ -262,39 +285,9 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); if !result_block.is_empty() { - if self - .hash_join_state - .hash_join_desc - .other_predicate - .is_none() - { - result_blocks.push(result_block); - if self.hash_join_state.hash_join_desc.join_type == JoinType::RightSingle { - self.update_right_single_scan_map( - &build_indexes[0..matched_idx], - right_single_scan_map, - None, - )?; - } else { - for row_ptr in &build_indexes[0..matched_idx] { - unsafe { - *outer_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = true; - }; - } - } - } else { - let (bm, all_true, all_false) = self.get_other_filters( - &result_block, - self.hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(), - &self.func_ctx, - )?; - + if let Some(filter_executor) = filter_executor { + let (result_block, selection, all_true, all_false) = + self.get_other_predicate_result_block(filter_executor, result_block)?; if all_true { result_blocks.push(result_block); if self.hash_join_state.hash_join_desc.join_type == JoinType::RightSingle { @@ -313,31 +306,41 @@ impl HashJoinProbeState { } } } else if !all_false { + result_blocks.push(result_block); // Safe to unwrap. - let validity = bm.unwrap(); if self.hash_join_state.hash_join_desc.join_type == JoinType::RightSingle { self.update_right_single_scan_map( &build_indexes[0..matched_idx], right_single_scan_map, - Some(&validity), + Some(selection), )?; } else { - let mut idx = 0; - while idx < matched_idx { + for idx in selection { unsafe { - let valid = validity.get_bit_unchecked(idx); - if valid { - let row_ptr = build_indexes.get_unchecked(idx); - *outer_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = true; - } + let row_ptr = build_indexes.get_unchecked(*idx as usize); + *outer_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = true; } - idx += 1; } } - let filtered_block = DataBlock::filter_with_bitmap(result_block, &validity)?; - result_blocks.push(filtered_block); + } + } else { + result_blocks.push(result_block); + if self.hash_join_state.hash_join_desc.join_type == JoinType::RightSingle { + self.update_right_single_scan_map( + &build_indexes[0..matched_idx], + right_single_scan_map, + None, + )?; + } else { + for row_ptr in &build_indexes[0..matched_idx] { + unsafe { + *outer_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = true; + }; + } } } } From 92d16fa96b301ac3d4ba953c8cad5b24b381cb23 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 17:27:35 +0800 Subject: [PATCH 07/11] feat: support right semi and revert left mark --- .../hash_join/probe_join/left_mark_join.rs | 64 ++++++++++--------- .../probe_join/right_semi_anti_join.rs | 37 ++++------- 2 files changed, 48 insertions(+), 53 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs index 23bbebfc6feb2..6dc6498cd47a5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark_join.rs @@ -16,13 +16,17 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::BooleanType; +use databend_common_expression::types::NullableType; +use databend_common_expression::types::ValueType; use databend_common_expression::DataBlock; -use databend_common_expression::FilterExecutor; +use databend_common_expression::Expr; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; use crate::pipelines::processors::transforms::hash_join::build_state::BuildBlockGenerationState; +use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_FALSE; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_NULL; use crate::pipelines::processors::transforms::hash_join::desc::MARKER_KIND_TRUE; use crate::pipelines::processors::transforms::hash_join::probe_state::ProbeBlockGenerationState; @@ -165,7 +169,12 @@ impl HashJoinProbeState { let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; let mark_scan_map = &mut build_state.mark_scan_map; let _mark_scan_map_lock = self.mark_scan_map_lock.lock(); - let filter_executor = probe_state.filter_executor.as_mut().unwrap(); + let other_predicate = self + .hash_join_state + .hash_join_desc + .other_predicate + .as_ref() + .unwrap(); // Results. let mut matched_idx = 0; @@ -198,7 +207,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - filter_executor, + other_predicate, mark_scan_map, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( @@ -238,7 +247,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - filter_executor, + other_predicate, mark_scan_map, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( @@ -274,7 +283,7 @@ impl HashJoinProbeState { build_indexes, &mut probe_state.generation_state, &build_state.generation_state, - filter_executor, + other_predicate, mark_scan_map, )?; } @@ -316,7 +325,7 @@ impl HashJoinProbeState { build_indexes: &[RowPtr], probe_state: &mut ProbeBlockGenerationState, build_state: &BuildBlockGenerationState, - filter_executor: &mut FilterExecutor, + other_predicate: &Expr, mark_scan_map: &mut [Vec], ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { @@ -345,35 +354,32 @@ impl HashJoinProbeState { } else { None }; + let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); - let (selection, _, _) = - self.get_other_predicate_selection(filter_executor, &result_block)?; + let filter = + self.get_nullable_filter_column(&result_block, other_predicate, &self.func_ctx)?; + let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); + let validity = &filter_viewer.validity; + let data = &filter_viewer.column; - let mut count = 0; - for idx in selection { + for (idx, build_index) in build_indexes[0..matched_idx].iter().enumerate() { unsafe { - while count < *idx { - let row_ptr = build_indexes.get_unchecked(count as usize); + if !validity.get_bit_unchecked(idx) { + if *mark_scan_map + .get_unchecked(build_index.chunk_index as usize) + .get_unchecked(build_index.row_index as usize) + == MARKER_KIND_FALSE + { + *mark_scan_map + .get_unchecked_mut(build_index.chunk_index as usize) + .get_unchecked_mut(build_index.row_index as usize) = MARKER_KIND_NULL; + } + } else if data.get_bit_unchecked(idx) { *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; - count += 1; + .get_unchecked_mut(build_index.chunk_index as usize) + .get_unchecked_mut(build_index.row_index as usize) = MARKER_KIND_TRUE; } - let row_ptr = build_indexes.get_unchecked(*idx as usize); - *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_TRUE; - count += 1; - } - } - while (count as usize) < matched_idx { - unsafe { - let row_ptr = build_indexes.get_unchecked(count as usize); - *mark_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = MARKER_KIND_NULL; - count += 1; } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs index 4c190d4dc454b..9941de3bb3df8 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs @@ -18,6 +18,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::Expr; +use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; use databend_common_hashtable::RowPtr; @@ -136,12 +137,7 @@ impl HashJoinProbeState { // Build states. let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; let outer_scan_map = &mut build_state.outer_scan_map; - let other_predicate = self - .hash_join_state - .hash_join_desc - .other_predicate - .as_ref() - .unwrap(); + let filter_executor = probe_state.filter_executor.as_mut().unwrap(); // Results. let mut matched_idx = 0; @@ -175,7 +171,7 @@ impl HashJoinProbeState { &mut probe_state.generation_state, &build_state.generation_state, outer_scan_map, - other_predicate, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -212,7 +208,7 @@ impl HashJoinProbeState { &mut probe_state.generation_state, &build_state.generation_state, outer_scan_map, - other_predicate, + filter_executor, )?; (matched_idx, incomplete_ptr) = self.fill_probe_and_build_indexes::<_, false>( hash_table, @@ -236,7 +232,7 @@ impl HashJoinProbeState { &mut probe_state.generation_state, &build_state.generation_state, outer_scan_map, - other_predicate, + filter_executor, )?; } @@ -278,7 +274,7 @@ impl HashJoinProbeState { probe_state: &mut ProbeBlockGenerationState, build_state: &BuildBlockGenerationState, outer_scan_map: &mut [Vec], - other_predicate: &Expr, + filter_executor: &mut FilterExecutor, ) -> Result<()> { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( @@ -310,8 +306,8 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block, build_block, matched_idx); if !result_block.is_empty() { - let (bm, all_true, all_false) = - self.get_other_filters(&result_block, other_predicate, &self.func_ctx)?; + let (selection, all_true, all_false) = + self.get_other_predicate_selection(filter_executor, &result_block)?; if all_true { for row_ptr in &build_indexes[0..matched_idx] { @@ -322,20 +318,13 @@ impl HashJoinProbeState { }; } } else if !all_false { - // Safe to unwrap. - let validity = bm.unwrap(); - let mut idx = 0; - while idx < matched_idx { + for idx in selection { unsafe { - let valid = validity.get_bit_unchecked(idx); - if valid { - let row_ptr = build_indexes.get_unchecked(idx); - *outer_scan_map - .get_unchecked_mut(row_ptr.chunk_index as usize) - .get_unchecked_mut(row_ptr.row_index as usize) = true; - } + let row_ptr = build_indexes.get_unchecked(*idx as usize); + *outer_scan_map + .get_unchecked_mut(row_ptr.chunk_index as usize) + .get_unchecked_mut(row_ptr.row_index as usize) = true; } - idx += 1; } } } From 60f43552ec4a4b69c1488d34b8fe8b23a72ffe58 Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Tue, 20 Feb 2024 17:39:13 +0800 Subject: [PATCH 08/11] chore: remove useless code --- .../processors/transforms/hash_join/common.rs | 26 ------------------- .../probe_join/right_semi_anti_join.rs | 1 - 2 files changed, 27 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 719ce2c62b920..6eea2829d766d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -18,7 +18,6 @@ use databend_common_exception::Result; use databend_common_expression::arrow::or_validities; use databend_common_expression::types::nullable::NullableColumn; use databend_common_expression::types::AnyType; -use databend_common_expression::types::BooleanType; use databend_common_expression::types::DataType; use databend_common_expression::BlockEntry; use databend_common_expression::Column; @@ -29,7 +28,6 @@ use databend_common_expression::FilterExecutor; use databend_common_expression::FunctionContext; use databend_common_expression::Value; use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_common_sql::executor::cast_expr_to_non_null_boolean; use super::desc::MARKER_KIND_FALSE; use super::desc::MARKER_KIND_NULL; @@ -126,30 +124,6 @@ impl HashJoinProbeState { )) } - // return an (option bitmap, all_true, all_false). - pub(crate) fn get_other_filters( - &self, - merged_block: &DataBlock, - filter: &Expr, - func_ctx: &FunctionContext, - ) -> Result<(Option, bool, bool)> { - let filter = cast_expr_to_non_null_boolean(filter.clone())?; - let evaluator = Evaluator::new(merged_block, func_ctx, &BUILTIN_FUNCTIONS); - let predicates = evaluator - .run(&filter)? - .try_downcast::() - .unwrap(); - - match predicates { - Value::Scalar(v) => Ok((None, v, !v)), - Value::Column(s) => { - let count_zeros = s.unset_bits(); - let all_false = s.len() == count_zeros; - Ok((Some(s), count_zeros == 0, all_false)) - } - } - } - pub(crate) fn get_nullable_filter_column( &self, merged_block: &DataBlock, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs index 9941de3bb3df8..f0e98612d5e7b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_anti_join.rs @@ -17,7 +17,6 @@ use std::sync::atomic::Ordering; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::Expr; use databend_common_expression::FilterExecutor; use databend_common_expression::KeyAccessor; use databend_common_hashtable::HashJoinHashtableLike; From c0fe30c2a7921395e35a208000c31353dd02a92d Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Fri, 23 Feb 2024 16:39:43 +0800 Subject: [PATCH 09/11] fix: cant support mark join --- .../pipelines/processors/transforms/hash_join/probe_state.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index 1aea966b5451a..d3da10cbcd906 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -104,7 +104,9 @@ impl ProbeState { } else { None }; - let filter_executor = if let Some(predicate) = other_predicate { + let filter_executor = matches!(&join_type, JoinType::LeftMark | JoinType::RightMark | JoinType::Cross) + && let Some(predicate) = other_predicate + { let (select_expr, has_or) = build_select_expr(&predicate).into(); let filter_executor = FilterExecutor::new( select_expr, From d44bb13e6f202268cda7d6cda1312e91d59002fc Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Fri, 23 Feb 2024 16:44:09 +0800 Subject: [PATCH 10/11] chore: make lint --- .../processors/transforms/hash_join/probe_state.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index d3da10cbcd906..d3700eb57c3ad 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -104,8 +104,10 @@ impl ProbeState { } else { None }; - let filter_executor = matches!(&join_type, JoinType::LeftMark | JoinType::RightMark | JoinType::Cross) - && let Some(predicate) = other_predicate + let filter_executor = if matches!( + &join_type, + JoinType::LeftMark | JoinType::RightMark | JoinType::Cross + ) && let Some(predicate) = other_predicate { let (select_expr, has_or) = build_select_expr(&predicate).into(); let filter_executor = FilterExecutor::new( From 72d39b79ec3c5c53641caf8463cc9693df3d459b Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Fri, 23 Feb 2024 17:36:31 +0800 Subject: [PATCH 11/11] chore: fix filter executor join type --- .../pipelines/processors/transforms/hash_join/probe_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index d3700eb57c3ad..ee802f34df00d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -104,7 +104,7 @@ impl ProbeState { } else { None }; - let filter_executor = if matches!( + let filter_executor = if !matches!( &join_type, JoinType::LeftMark | JoinType::RightMark | JoinType::Cross ) && let Some(predicate) = other_predicate