Skip to content

Commit

Permalink
[fix](mark join) mark join column should be nullable (apache#24910)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Nov 1, 2023
1 parent c87724c commit 74fbe78
Show file tree
Hide file tree
Showing 15 changed files with 280 additions and 132 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ namespace doris::pipeline {

OPERATOR_CODE_GENERATOR(HashJoinBuildSink, StreamingOperator)

} // namespace doris::pipeline
} // namespace doris::pipeline
46 changes: 46 additions & 0 deletions be/src/vec/columns/column_filter_helper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/columns/column_filter_helper.h"

namespace doris::vectorized {
ColumnFilterHelper::ColumnFilterHelper(IColumn& column_)
: _column(assert_cast<ColumnNullable&>(column_)),
_value_column(assert_cast<ColumnUInt8&>(_column.get_nested_column())),
_null_map_column(_column.get_null_map_column()) {}

void ColumnFilterHelper::resize_fill(size_t size, doris::vectorized::UInt8 value) {
_value_column.get_data().resize_fill(size, value);
_null_map_column.get_data().resize_fill(size, 0);
}

void ColumnFilterHelper::insert_value(doris::vectorized::UInt8 value) {
_value_column.get_data().push_back(value);
_null_map_column.get_data().push_back(0);
}

void ColumnFilterHelper::insert_null() {
_value_column.insert_default();
_null_map_column.get_data().push_back(1);
}

void ColumnFilterHelper::reserve(size_t size) {
_value_column.reserve(size);
_null_map_column.reserve(size);
}

} // namespace doris::vectorized
39 changes: 39 additions & 0 deletions be/src/vec/columns/column_filter_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "column_nullable.h"

namespace doris::vectorized {
class ColumnFilterHelper {
public:
ColumnFilterHelper(IColumn&);

void resize_fill(size_t size, UInt8 value);
void insert_null();
void insert_value(UInt8 value);
void reserve(size_t size);

[[nodiscard]] size_t size() const { return _column.size(); }

private:
ColumnNullable& _column;
ColumnUInt8& _value_column;
ColumnUInt8& _null_map_column;
};
} // namespace doris::vectorized
57 changes: 38 additions & 19 deletions be/src/vec/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "process_hash_table_probe.h"
#include "runtime/thread_context.h" // IWYU pragma: keep
#include "util/simd/bits.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/exprs/vexpr_context.h"
#include "vhash_join_node.h"

Expand Down Expand Up @@ -212,6 +213,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
size_t probe_size = 0;
auto& probe_row_match_iter =
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);

std::unique_ptr<ColumnFilterHelper> mark_column;
if (is_mark_join) {
mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]);
}
{
SCOPED_TIMER(_search_hashtable_timer);
if constexpr (!is_right_semi_anti_join) {
Expand Down Expand Up @@ -285,9 +291,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (is_mark_join) {
++current_offset;
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
.get_data()
.template push_back(!find_result.is_found());
bool null_result =
(*null_map)[probe_index] ||
(!find_result.is_found() && _join_node->_has_null_in_build_side);
if (null_result) {
mark_column->insert_null();
} else {
mark_column->insert_value(!find_result.is_found());
}
} else {
if (!find_result.is_found()) {
++current_offset;
Expand All @@ -297,9 +308,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
} else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
if (is_mark_join) {
++current_offset;
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
.get_data()
.template push_back(find_result.is_found());
bool null_result =
(*null_map)[probe_index] ||
(!find_result.is_found() && _join_node->_has_null_in_build_side);
if (null_result) {
mark_column->insert_null();
} else {
mark_column->insert_value(find_result.is_found());
}
} else {
if (find_result.is_found()) {
++current_offset;
Expand Down Expand Up @@ -840,20 +856,20 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}

if (is_mark_join) {
auto& matched_map = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
*(output_block->get_by_position(num_cols - 1)
.column->assume_mutable()))
.get_data();
/// FIXME: incorrect result of semi mark join with other conjuncts(null value missed).
auto mark_column = output_block->get_by_position(num_cols - 1)
.column->assume_mutable();
ColumnFilterHelper helper(*mark_column);

// For mark join, we only filter rows which have duplicate join keys.
// And then, we set matched_map to the join result to do the mark join's filtering.
for (size_t i = 1; i < row_count; ++i) {
if (!same_to_prev[i]) {
matched_map.push_back(filter_map[i - 1]);
helper.insert_value(filter_map[i - 1]);
filter_map[i - 1] = true;
}
}
matched_map.push_back(filter_map[filter_map.size() - 1]);
helper.insert_value(filter_map[filter_map.size() - 1]);
filter_map[filter_map.size() - 1] = true;
}

Expand Down Expand Up @@ -913,17 +929,20 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}

if (is_mark_join) {
auto& matched_map = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
*(output_block->get_by_position(num_cols - 1)
.column->assume_mutable()))
.get_data();
for (int i = 1; i < row_count; ++i) {
/// FIXME: incorrect result of semi mark join with other conjuncts(null value missed).
auto mark_column = output_block->get_by_position(num_cols - 1)
.column->assume_mutable();
ColumnFilterHelper helper(*mark_column);

// For mark join, we only filter rows which have duplicate join keys.
// And then, we set matched_map to the join result to do the mark join's filtering.
for (size_t i = 1; i < row_count; ++i) {
if (!same_to_prev[i]) {
matched_map.push_back(!filter_map[i - 1]);
helper.insert_value(filter_map[i - 1]);
filter_map[i - 1] = true;
}
}
matched_map.push_back(!filter_map[row_count - 1]);
helper.insert_value(filter_map[row_count - 1]);
filter_map[row_count - 1] = true;
} else {
int end_row_idx;
Expand Down
31 changes: 17 additions & 14 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ struct ProcessHashTableBuild {
}
if constexpr (ignore_null) {
if ((*null_map)[k]) {
if (has_null_key != nullptr) {
*has_null_key = true;
}
continue;
}
}
Expand Down Expand Up @@ -535,11 +538,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
return Status::OK();
}

if (_short_circuit_for_null_in_probe_side && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
_is_mark_join) {
/// If `_short_circuit_for_null_in_probe_side` is true, this indicates no rows
/// match the join condition, and this is 'mark join', so we need to create a column as mark
/// with all rows set to 0.
/// `_has_null_in_build_side` means have null value in build side.
/// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join).
if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && _is_mark_join) {
/// We need to create a column as mark with all rows set to NULL.
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
*eos = _probe_eos;
Expand All @@ -553,8 +555,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
temp_block.insert(_probe_block.get_by_position(i));
}
}
auto mark_column = ColumnUInt8::create(block_rows, 0);
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});
auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0),
ColumnUInt8::create(block_rows, 1));
temp_block.insert(
{std::move(mark_column), make_nullable(std::make_shared<DataTypeUInt8>()), ""});

{
SCOPED_TIMER(_join_filter_timer);
Expand Down Expand Up @@ -878,7 +882,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
Block block;
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from data.
while (!eos && !_short_circuit_for_null_in_probe_side) {
while (!eos && (!_short_circuit_for_null_in_build_side || !_has_null_in_build_side)) {
block.clear_column_data();
RETURN_IF_CANCELLED(state);
{
Expand Down Expand Up @@ -906,8 +910,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
// make one block for each 4 gigabytes
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;

if (_short_circuit_for_null_in_probe_side) {
// TODO: if _short_circuit_for_null_in_probe_side is true we should finish current pipeline task.
if (_has_null_in_build_side) {
// TODO: if _has_null_in_build_side is true we should finish current pipeline task.
DCHECK(state->enable_pipeline_exec());
return Status::OK();
}
Expand Down Expand Up @@ -979,7 +983,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
_shared_hash_table_context->blocks = _build_blocks;
_shared_hash_table_context->hash_table_variants = _hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
_short_circuit_for_null_in_probe_side;
_has_null_in_build_side;
if (_runtime_filter_slots) {
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
}
Expand All @@ -997,8 +1001,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
_build_phase_profile->add_info_string(
"SharedHashTableFrom",
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
_short_circuit_for_null_in_probe_side =
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
_has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side;
_hash_table_variants = std::static_pointer_cast<HashTableVariants>(
_shared_hash_table_context->hash_table_variants);
_build_blocks = _shared_hash_table_context->blocks;
Expand Down Expand Up @@ -1183,7 +1186,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
has_null_value || short_circuit_for_null_in_build_side
? &null_map_val->get_data()
: nullptr,
&_short_circuit_for_null_in_probe_side);
&_has_null_in_build_side);
}},
*_hash_table_variants, make_bool_variant(_build_side_ignore_null),
make_bool_variant(_short_circuit_for_null_in_build_side));
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ class HashJoinNode final : public VJoinNodeBase {
private:
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
(_short_circuit_for_null_in_probe_side &&
_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) ||
(_has_null_in_build_side && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,9 @@ void VJoinNodeBase::_construct_mutable_join_block() {
_join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()});
}
}
if (_is_mark_join) {
_join_block.replace_by_position(
_join_block.columns() - 1,
remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
}

DCHECK(!_is_mark_join ||
_join_block.get_by_position(_join_block.columns() - 1).column->is_nullable());
}

Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block,
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ class VJoinNodeBase : public ExecNode {

// For null aware left anti join, we apply a short circuit strategy.
// 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join.
// 2. In build phase, we stop materialize build side when we meet the first null value and set _short_circuit_for_null_in_probe_side to true.
// 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
// 2. In build phase, we stop materialize build side when we meet the first null value and set _has_null_in_build_side to true.
// 3. In probe phase, if _has_null_in_build_side is true, join node returns empty block directly. Otherwise, probing will continue as the same as generic left anti join.
const bool _short_circuit_for_null_in_build_side = false;
bool _short_circuit_for_null_in_probe_side = false;
bool _has_null_in_build_side = false;

// For some join case, we can apply a short circuit strategy
// 1. _short_circuit_for_null_in_probe_side = true
// 1. _has_null_in_build_side = true
// 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
bool _short_circuit_for_probe = false;

Expand Down
Loading

0 comments on commit 74fbe78

Please sign in to comment.