Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix](core) child block is shared between operator and node, it should be shared ptr #28106

Merged
merged 9 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator, StatefulOperator)

Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_left_block());
_child_block = _node->get_left_block();
return StatefulOperator::prepare(state);
}

Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {

StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
: StreamingOperator<OperatorBuilderType>(builder, node),
_child_block(vectorized::Block::create_unique()),
_child_block(vectorized::Block::create_shared()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}

virtual ~StatefulOperator() = default;
Expand Down Expand Up @@ -484,7 +484,7 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
}

protected:
std::unique_ptr<vectorized::Block> _child_block;
std::shared_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)

Status RepeatOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_child_block());
_child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}

Status RepeatOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator)

Status TableFunctionOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_child_block());
_child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}

Status TableFunctionOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
30 changes: 16 additions & 14 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod
_left_block_pos(0),
_left_side_eos(false),
_old_version_flag(!tnode.__isset.nested_loop_join_node),
_runtime_filter_descs(tnode.runtime_filters) {}
_runtime_filter_descs(tnode.runtime_filters) {
_left_block = Block::create_shared();
}

Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
Expand Down Expand Up @@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block*

Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) {
do {
release_block_memory(_left_block);
release_block_memory(*_left_block);
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_left_block, &_left_side_eos,
state, _left_block.get(), &_left_side_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

} while (_left_block.rows() == 0 && !_left_side_eos);
} while (_left_block->rows() == 0 && !_left_side_eos);

return Status::OK();
}
Expand All @@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
RETURN_IF_CANCELLED(state);
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos));
}

return pull(state, block, eos);
Expand All @@ -280,7 +282,7 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
auto& dst_columns = mutable_block.mutable_columns();
DCHECK(_is_mark_join);
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
Expand Down Expand Up @@ -310,7 +312,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
auto& dst_columns = mutable_block.mutable_columns();
const int max_added_rows = now_process_build_block.rows();
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
Expand Down Expand Up @@ -456,13 +458,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
if (!_is_mark_join) {
auto new_size = column_size;
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
if (_cur_probe_row_visited_flags[j] == IsSemi) {
new_size++;
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<ColumnNullable*>(dst_columns[i].get())
Expand All @@ -488,13 +490,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]);
mark_column.reserve(mark_column.size() + _left_side_process_count);
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]);
}
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName src_column = _left_block->get_by_position(i);
DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
_left_side_process_count);
Expand Down Expand Up @@ -541,7 +543,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
}
if constexpr (SetProbeSideFlag) {
int end = filter.size();
for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos;
for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos - 1 : _left_block_pos;
i >= _left_block_start_pos; i--) {
int offset = 0;
if (!_probe_offset_stack.empty()) {
Expand Down Expand Up @@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream*
}

void VNestedLoopJoinNode::_release_mem() {
_left_block.clear();
_left_block->clear();

Blocks tmp_build_blocks;
_build_blocks.swap(tmp_build_blocks);
Expand All @@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block,
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_is_output_left_side_only) {
RETURN_IF_ERROR(_build_output_block(&_left_block, block));
RETURN_IF_ERROR(_build_output_block(_left_block.get(), block));
*eos = _left_side_eos;
_need_more_input_data = !_left_side_eos;
} else {
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/exec/join/vnested_loop_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
: *_output_row_desc;
}

Block* get_left_block() { return &_left_block; }
std::shared_ptr<Block> get_left_block() { return _left_block; }

std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return _runtime_filter_descs; }
VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
Expand All @@ -120,14 +120,14 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
// We should try to join rows if there still are some rows from probe side.
while (_join_block.rows() < state->batch_size()) {
while (_current_build_pos == _build_blocks.size() ||
_left_block_pos == _left_block.rows()) {
_left_block_pos == _left_block->rows()) {
// if left block is empty(), do not need disprocess the left block rows
if (_left_block.rows() > _left_block_pos) {
if (_left_block->rows() > _left_block_pos) {
_left_side_process_count++;
}

_reset_with_next_probe_row();
if (_left_block_pos < _left_block.rows()) {
if (_left_block_pos < _left_block->rows()) {
if constexpr (set_probe_side_flag) {
_probe_offset_stack.push(mutable_join_block.rows());
}
Expand Down Expand Up @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
// _left_block must be cleared before calling get_next(). The child node
// does not initialize all tuple ptrs in the row, only the ones that it
// is responsible for.
Block _left_block;
std::shared_ptr<Block> _left_block;

int _left_block_start_pos = 0;
int _left_block_pos; // current scan pos in _left_block
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/exec/vrepeat_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descrip
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id),
_child_eos(false),
_repeat_id_idx(0) {}
_repeat_id_idx(0) {
_child_block = Block::create_shared();
}

Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Expand Down Expand Up @@ -189,12 +191,12 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b
int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_intermediate_block->clear();
release_block_memory(_child_block);
release_block_memory(*_child_block);
_repeat_id_idx = 0;
}
}
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
*eos = _child_eos && _child_block.rows() == 0;
*eos = _child_eos && _child_block->rows() == 0;
reached_limit(output_block, eos);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
return Status::OK();
Expand Down Expand Up @@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo
}

bool VRepeatNode::need_more_input_data() const {
return !_child_block.rows() && !_child_eos;
return !_child_block->rows() && !_child_eos;
}

Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
Expand All @@ -243,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
DCHECK(block->rows() == 0);
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_child_block, &_child_eos,
state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

static_cast<void>(push(state, &_child_block, _child_eos));
static_cast<void>(push(state, _child_block.get(), _child_eos));
}

return pull(state, block, eos);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vrepeat_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode {
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
bool need_more_input_data() const;
Block* get_child_block() { return &_child_block; }
std::shared_ptr<Block> get_child_block() { return _child_block; }

void debug_string(int indentation_level, std::stringstream* out) const override;

Expand All @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode {
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc;

Block _child_block;
std::shared_ptr<Block> _child_block;
std::unique_ptr<Block> _intermediate_block {};

std::vector<SlotDescriptor*> _output_slots;
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace doris::vectorized {

VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {}
: ExecNode(pool, tnode, descs) {
_child_block = Block::create_shared();
}

Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Expand Down Expand Up @@ -144,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
// if child_block is empty, get data from child.
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_child_block, &_child_eos,
state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

RETURN_IF_ERROR(push(state, &_child_block, _child_eos));
RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos));
}

return pull(state, block, eos);
Expand All @@ -170,7 +172,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));

if (_child_block.rows() == 0) {
if (_child_block->rows() == 0) {
break;
}

Expand Down Expand Up @@ -227,13 +229,13 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
Status VTableFunctionNode::_process_next_child_row() {
_cur_child_offset++;

if (_cur_child_offset >= _child_block.rows()) {
if (_cur_child_offset >= _child_block->rows()) {
// release block use count.
for (TableFunction* fn : _fns) {
RETURN_IF_ERROR(fn->process_close());
}

release_block_memory(_child_block);
release_block_memory(*_child_block);
_cur_child_offset = -1;
return Status::OK();
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/vtable_function_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class VTableFunctionNode final : public ExecNode {
return VExpr::open(_vfn_ctxs, state);
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; }
bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; }

void release_resource(doris::RuntimeState* state) override {
if (_num_rows_filtered_counter != nullptr) {
Expand Down Expand Up @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode {
return Status::OK();
}

Block* get_child_block() { return &_child_block; }
std::shared_ptr<Block> get_child_block() { return _child_block; }

private:
Status _prepare_output_slot_ids(const TPlanNode& tnode);
Expand Down Expand Up @@ -135,15 +135,15 @@ class VTableFunctionNode final : public ExecNode {
return;
}
for (auto index : _output_slot_indexs) {
auto src_column = _child_block.get_by_position(index).column;
auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_many_from(*src_column, _cur_child_offset,
_current_row_insert_times);
}
_current_row_insert_times = 0;
}
int _current_row_insert_times = 0;

Block _child_block;
std::shared_ptr<Block> _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
Expand Down
Loading