Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {

_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
auto& p = _parent->cast<HashJoinProbeOperatorX>();
_right_col_idx = p._right_col_idx;
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
Expand Down Expand Up @@ -536,7 +537,7 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
const bool probe_dispose_null =
_match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN ||
_join_op == TJoinOp::LEFT_SEMI_JOIN;
_join_op == TJoinOp::LEFT_SEMI_JOIN || _is_mark_join;
const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts;
std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
size_t conjuncts_index = 0;
Expand Down Expand Up @@ -668,14 +669,34 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
}
}

const int right_col_idx =
(_is_right_semi_anti && !_have_other_join_conjunct) ? 0 : _left_table_data_types.size();
_right_col_idx = (_is_right_semi_anti && !_have_other_join_conjunct &&
(!_is_mark_join || _mark_join_conjuncts.empty()))
? 0
: _left_table_data_types.size();
size_t idx = 0;
for (const auto* slot : slots_to_check) {
auto data_type = slot->get_data_type_ptr();
const auto slot_on_left = idx < right_col_idx;
const auto slot_on_left = idx < _right_col_idx;

if (slot_on_left) {
if (idx >= _left_table_data_types.size()) {
return Status::InternalError(
"Join node(id={}, OP={}) intermediate slot({}, #{})'s on left table "
"idx out bound of _left_table_data_types: {} vs {}",
_node_id, _join_op, slot->col_name(), slot->id(), idx,
_left_table_data_types.size());
}
} else if (idx - _right_col_idx >= _right_table_data_types.size()) {
return Status::InternalError(
"Join node(id={}, OP={}) intermediate slot({}, #{})'s on right table "
"idx out bound of _right_table_data_types: {} vs {}(idx = {}, _right_col_idx = "
"{})",
_node_id, _join_op, slot->col_name(), slot->id(), idx - _right_col_idx,
_right_table_data_types.size(), idx, _right_col_idx);
}

auto target_data_type = slot_on_left ? _left_table_data_types[idx]
: _right_table_data_types[idx - right_col_idx];
: _right_table_data_types[idx - _right_col_idx];
++idx;
if (data_type->equals(*target_data_type)) {
continue;
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ class HashJoinProbeLocalState final
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<HashTableCtxVariants>();

// Index of column(slot) from right table in the `_intermediate_row_desc`.
size_t _right_col_idx;

RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
RuntimeProfile::Counter* _probe_next_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
Expand Down Expand Up @@ -213,6 +216,9 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
std::vector<bool> _right_output_slot_flags;
std::vector<std::string> _right_table_column_names;
const std::vector<TExpr> _partition_exprs;

// Index of column(slot) from right table in the `_intermediate_row_desc`.
size_t _right_col_idx;
};

} // namespace pipeline
Expand Down
44 changes: 18 additions & 26 deletions be/src/vec/common/hash_table/join_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,23 @@ class JoinHashTable {
}
}

if constexpr (with_other_conjuncts ||
(is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN)) {
if constexpr (!with_other_conjuncts) {
constexpr bool is_null_aware_join =
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
constexpr bool is_left_half_join = JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == TJoinOp::LEFT_ANTI_JOIN;

/// For null aware join or left half(semi/anti) join without other conjuncts and without
/// mark join conjunct.
/// If one row on probe side has one match in build side, we should stop searching the
/// hash table for this row.
if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) {
return _find_batch_conjunct<JoinOpType, need_judge_null, true>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs,
build_idxs);
}
if constexpr (with_other_conjuncts) {
return _find_batch_conjunct<JoinOpType, need_judge_null, false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs, build_idxs);
} else if constexpr (is_mark_join) {
constexpr bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
constexpr bool is_left_half_join =
JoinOpType == TJoinOp::LEFT_SEMI_JOIN || JoinOpType == TJoinOp::LEFT_ANTI_JOIN;

/// For null aware join or left half(semi/anti) join without other conjuncts and without
/// mark join conjunct.
/// If one row on probe side has one match in build side, we should stop searching the
/// hash table for this row.
if (is_null_aware_join || (is_left_half_join && !has_mark_join_conjunct)) {
return _find_batch_conjunct<JoinOpType, need_judge_null, true>(
keys, build_idx_map, probe_idx, build_idx, probe_rows, probe_idxs,
build_idxs);
}

return _find_batch_conjunct<JoinOpType, need_judge_null, false>(
Expand Down Expand Up @@ -339,14 +338,7 @@ class JoinHashTable {

auto do_the_probe = [&]() {
while (build_idx && matched_cnt < batch_size) {
if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if (!visited[build_idx] && keys[probe_idx] == build_keys[build_idx]) {
probe_idxs[matched_cnt] = probe_idx;
build_idxs[matched_cnt] = build_idx;
matched_cnt++;
}
} else if constexpr (need_judge_null) {
if constexpr (need_judge_null) {
if (build_idx == bucket_size) {
build_idxs[matched_cnt] = build_idx;
probe_idxs[matched_cnt] = probe_idx;
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
#include "vec/common/custom_allocator.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -132,8 +133,15 @@ struct ProcessHashTableProbe {
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;

int _right_col_idx;
// See `HashJoinProbeOperatorX::_right_col_idx`
const int _right_col_idx;

int _right_col_len;

// For right semi with mark join conjunct, we need to store the mark join flags
// in the hash table.
// -1 means null, 0 means false, 1 means true
DorisVector<int8_t> mark_join_flags;
};

} // namespace vectorized
Expand Down
138 changes: 101 additions & 37 deletions be/src/vec/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,18 @@ ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* parent,
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer),
_right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
? 0
: _parent->left_table_data_types().size()),
_right_col_idx(parent->_right_col_idx),
_right_col_len(_parent->right_table_data_types().size()) {}

template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size,
bool have_other_join_conjunct, bool is_mark_join) {
SCOPED_TIMER(_build_side_output_timer);
constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN;
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;

constexpr auto probe_all =
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN;
Expand Down Expand Up @@ -209,7 +206,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash
(JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
(is_mark_join && JoinOpType != doris::TJoinOp::RIGHT_SEMI_JOIN)));
(is_mark_join)));
}

auto& mcol = mutable_block.mutable_columns();
Expand Down Expand Up @@ -268,8 +265,8 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash
build_side_output_column(mcol, *_right_output_slot_flags, current_offset, with_other_conjuncts,
is_mark_join);

if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) {
if (with_other_conjuncts || !_parent->_mark_join_conjuncts.empty() ||
(JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) {
auto check_all_match_one = [](const std::vector<uint32_t>& vecs, uint32_t probe_idx,
int size) {
if (!size || vecs[0] != probe_idx || vecs[size - 1] != probe_idx + size - 1) {
Expand All @@ -291,7 +288,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash

output_block->swap(mutable_block.to_block());

if constexpr (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
if constexpr (is_mark_join) {
return do_mark_join_conjuncts<with_other_conjuncts>(
output_block, hash_table_ctx.hash_table->get_bucket_size());
} else if constexpr (with_other_conjuncts) {
Expand Down Expand Up @@ -363,21 +360,31 @@ template <int JoinOpType, typename Parent>
template <bool with_other_conjuncts>
Status ProcessHashTableProbe<JoinOpType, Parent>::do_mark_join_conjuncts(
Block* output_block, size_t hash_table_bucket_size) {
DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
if (JoinOpType != TJoinOp::LEFT_ANTI_JOIN && JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
JoinOpType != TJoinOp::LEFT_SEMI_JOIN && JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
return Status::InternalError("join type {} is not supported", JoinOpType);
}

constexpr bool is_anti_join = JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::RIGHT_ANTI_JOIN;
constexpr bool is_null_aware_join = JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
constexpr bool is_right_half_join =
JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || JoinOpType == TJoinOp::RIGHT_ANTI_JOIN;

const auto row_count = output_block->rows();
if (!row_count) {
return Status::OK();
}

if constexpr (is_right_half_join) {
if (mark_join_flags.empty() && _build_block != nullptr) {
mark_join_flags.resize(_build_block->rows(), 0);
}
}

auto mark_column_mutable =
output_block->get_by_position(_parent->_mark_column_id).column->assume_mutable();
auto& mark_column = assert_cast<ColumnNullable&>(*mark_column_mutable);
Expand Down Expand Up @@ -455,36 +462,70 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_mark_join_conjuncts(
const bool should_be_null_if_build_side_has_null =
*_has_null_in_build_side && is_null_aware_join && !with_other_conjuncts;
for (size_t i = 0; i != row_count; ++i) {
bool not_matched_before = _parent->_last_probe_match != _probe_indexs[i];
if (_build_indexs[i] == 0) {
bool has_null_mark_value = _parent->_last_probe_null_mark == _probe_indexs[i];
if (not_matched_before) {
filter_map[i] = true;
mark_null_map[i] = has_null_mark_value || should_be_null_if_build_side_has_null;
mark_filter_data[i] = false;
if constexpr (is_right_half_join) {
const auto& build_index = _build_indexs[i];
if (build_index == 0) {
continue;
}

if (mark_join_flags[build_index] == 1) {
continue;
}

if (mark_null_map[i]) {
mark_join_flags[build_index] = -1;
} else if (mark_filter_data[i]) {
mark_join_flags[build_index] = 1;
}
} else {
if (mark_null_map[i]) { // is null
_parent->_last_probe_null_mark = _probe_indexs[i];
} else {
if (mark_filter_data[i] && not_matched_before) {
_parent->_last_probe_match = _probe_indexs[i];
bool not_matched_before = _parent->_last_probe_match != _probe_indexs[i];
if (_build_indexs[i] == 0) {
bool has_null_mark_value = _parent->_last_probe_null_mark == _probe_indexs[i];
if (not_matched_before) {
filter_map[i] = true;
mark_null_map[i] = has_null_mark_value || should_be_null_if_build_side_has_null;
mark_filter_data[i] = false;
}
} else {
if (mark_null_map[i]) { // is null
_parent->_last_probe_null_mark = _probe_indexs[i];
} else {
if (mark_filter_data[i] && not_matched_before) {
_parent->_last_probe_match = _probe_indexs[i];
filter_map[i] = true;
}
}
}
}
}

if constexpr (is_anti_join) {
// flip the mark column
for (size_t i = 0; i != row_count; ++i) {
mark_filter_data[i] ^= 1; // not null/ null
if constexpr (is_right_half_join) {
if constexpr (is_anti_join) {
// flip the mark column
for (size_t i = 0; i != row_count; ++i) {
if (mark_join_flags[i] == -1) {
// -1 means null.
continue;
}

mark_join_flags[i] ^= 1;
}
}
// For right semi/anti join, no rows will be output in probe phase.
output_block->swap(vectorized::Block());
return Status::OK();
} else {
if constexpr (is_anti_join) {
// flip the mark column
for (size_t i = 0; i != row_count; ++i) {
mark_filter_data[i] ^= 1; // not null/ null
}
}
}

auto result_column_id = output_block->columns();
output_block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), ""});
return Block::filter_block(output_block, result_column_id, result_column_id);
auto result_column_id = output_block->columns();
output_block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), ""});
return Block::filter_block(output_block, result_column_id, result_column_id);
}
}

template <int JoinOpType, typename Parent>
Expand Down Expand Up @@ -638,8 +679,31 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
_build_indexs.data() + block_size);
}

if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
if (is_mark_join) {
if (mark_join_flags.empty() && _build_block != nullptr) {
mark_join_flags.resize(_build_block->rows(), 0);
}

// mark column is nullable
auto* mark_column = assert_cast<vectorized::ColumnNullable*>(
mcol[_parent->_mark_column_id].get());
mark_column->resize(block_size);
auto* null_map = mark_column->get_null_map_data().data();
auto* data = assert_cast<vectorized::ColumnUInt8&>(mark_column->get_nested_column())
.get_data()
.data();
for (size_t i = 0; i != block_size; ++i) {
const auto build_index = _build_indexs[i];
null_map[i] = mark_join_flags[build_index] == -1;
data[i] = mark_join_flags[build_index] == 1;
}
}
}

// just resize the left table column in case with other conjunct to make block size is not zero
if (_is_right_semi_anti && _have_other_join_conjunct) {
if (_is_right_semi_anti && _right_col_idx != 0) {
for (int i = 0; i < _right_col_idx; ++i) {
mcol[i]->resize(block_size);
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc());
_left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc());
_right_table_column_names = VectorizedUtils::get_column_names(child(1)->row_desc());

_right_col_idx = (_is_right_semi_anti && !_have_other_join_conjunct &&
(!_is_mark_join || _mark_join_conjuncts.empty()))
? 0
: _left_table_data_types.size();
// Hash Table Init
_hash_table_init(state);
_construct_mutable_join_block();
Expand Down
Loading
Loading