Skip to content

Commit

Permalink
[bugfix](core) child block is shared between operator and node, it sh…
Browse files Browse the repository at this point in the history
…ould be shared ptr (apache#28106)

_child_block in nest loop join , table value function, repeat node will be shared between ExecNode and related operator, but it should not be a unique ptr in operator, it belongs to exec node.

It will double free the block, if operator's close method is not called correctly.

It should be a shared ptr, then it will not core even if the opeartor's close method is not called.
  • Loading branch information
yiguolei authored and 胥剑旭 committed Dec 14, 2023
1 parent fecdbd5 commit 0e1c3d2
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 45 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator, StatefulOperator)

Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_left_block());
_child_block = _node->get_left_block();
return StatefulOperator::prepare(state);
}

Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {

StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
: StreamingOperator<OperatorBuilderType>(builder, node),
_child_block(vectorized::Block::create_unique()),
_child_block(vectorized::Block::create_shared()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}

virtual ~StatefulOperator() = default;
Expand Down Expand Up @@ -484,7 +484,7 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
}

protected:
std::unique_ptr<vectorized::Block> _child_block;
std::shared_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator)

Status RepeatOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_child_block());
_child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}

Status RepeatOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator)

Status TableFunctionOperator::prepare(doris::RuntimeState* state) {
// just for speed up, the way is dangerous
_child_block.reset(_node->get_child_block());
_child_block = _node->get_child_block();
return StatefulOperator::prepare(state);
}

Status TableFunctionOperator::close(doris::RuntimeState* state) {
_child_block.release();
return StatefulOperator::close(state);
}

Expand Down
30 changes: 16 additions & 14 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod
_left_block_pos(0),
_left_side_eos(false),
_old_version_flag(!tnode.__isset.nested_loop_join_node),
_runtime_filter_descs(tnode.runtime_filters) {}
_runtime_filter_descs(tnode.runtime_filters) {
_left_block = Block::create_shared();
}

Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state));
Expand Down Expand Up @@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block*

Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) {
do {
release_block_memory(_left_block);
release_block_memory(*_left_block);
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_left_block, &_left_side_eos,
state, _left_block.get(), &_left_side_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

} while (_left_block.rows() == 0 && !_left_side_eos);
} while (_left_block->rows() == 0 && !_left_side_eos);

return Status::OK();
}
Expand All @@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
RETURN_IF_CANCELLED(state);
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos));
}

return pull(state, block, eos);
Expand All @@ -280,7 +282,7 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
auto& dst_columns = mutable_block.mutable_columns();
DCHECK(_is_mark_join);
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
Expand Down Expand Up @@ -310,7 +312,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
auto& dst_columns = mutable_block.mutable_columns();
const int max_added_rows = now_process_build_block.rows();
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
auto origin_sz = dst_columns[i]->size();
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
Expand Down Expand Up @@ -456,13 +458,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
if (!_is_mark_join) {
auto new_size = column_size;
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
if (_cur_probe_row_visited_flags[j] == IsSemi) {
new_size++;
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName src_column = _left_block->get_by_position(i);
if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<ColumnNullable*>(dst_columns[i].get())
Expand All @@ -488,13 +490,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
} else {
ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]);
mark_column.reserve(mark_column.size() + _left_side_process_count);
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]);
}
for (size_t i = 0; i < _num_probe_side_columns; ++i) {
const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
const ColumnWithTypeAndName src_column = _left_block->get_by_position(i);
DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
_left_side_process_count);
Expand Down Expand Up @@ -541,7 +543,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
}
if constexpr (SetProbeSideFlag) {
int end = filter.size();
for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos;
for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos - 1 : _left_block_pos;
i >= _left_block_start_pos; i--) {
int offset = 0;
if (!_probe_offset_stack.empty()) {
Expand Down Expand Up @@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream*
}

void VNestedLoopJoinNode::_release_mem() {
_left_block.clear();
_left_block->clear();

Blocks tmp_build_blocks;
_build_blocks.swap(tmp_build_blocks);
Expand All @@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block,
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
if (_is_output_left_side_only) {
RETURN_IF_ERROR(_build_output_block(&_left_block, block));
RETURN_IF_ERROR(_build_output_block(_left_block.get(), block));
*eos = _left_side_eos;
_need_more_input_data = !_left_side_eos;
} else {
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/exec/join/vnested_loop_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
: *_output_row_desc;
}

Block* get_left_block() { return &_left_block; }
std::shared_ptr<Block> get_left_block() { return _left_block; }

std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return _runtime_filter_descs; }
VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
Expand All @@ -120,14 +120,14 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
// We should try to join rows if there still are some rows from probe side.
while (_join_block.rows() < state->batch_size()) {
while (_current_build_pos == _build_blocks.size() ||
_left_block_pos == _left_block.rows()) {
_left_block_pos == _left_block->rows()) {
// if left block is empty(), do not need disprocess the left block rows
if (_left_block.rows() > _left_block_pos) {
if (_left_block->rows() > _left_block_pos) {
_left_side_process_count++;
}

_reset_with_next_probe_row();
if (_left_block_pos < _left_block.rows()) {
if (_left_block_pos < _left_block->rows()) {
if constexpr (set_probe_side_flag) {
_probe_offset_stack.push(mutable_join_block.rows());
}
Expand Down Expand Up @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
// _left_block must be cleared before calling get_next(). The child node
// does not initialize all tuple ptrs in the row, only the ones that it
// is responsible for.
Block _left_block;
std::shared_ptr<Block> _left_block;

int _left_block_start_pos = 0;
int _left_block_pos; // current scan pos in _left_block
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/exec/vrepeat_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descrip
_grouping_list(tnode.repeat_node.grouping_list),
_output_tuple_id(tnode.repeat_node.output_tuple_id),
_child_eos(false),
_repeat_id_idx(0) {}
_repeat_id_idx(0) {
_child_block = Block::create_shared();
}

Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Expand Down Expand Up @@ -189,12 +191,12 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b
int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_intermediate_block->clear();
release_block_memory(_child_block);
release_block_memory(*_child_block);
_repeat_id_idx = 0;
}
}
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
*eos = _child_eos && _child_block.rows() == 0;
*eos = _child_eos && _child_block->rows() == 0;
reached_limit(output_block, eos);
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
return Status::OK();
Expand Down Expand Up @@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo
}

bool VRepeatNode::need_more_input_data() const {
return !_child_block.rows() && !_child_eos;
return !_child_block->rows() && !_child_eos;
}

Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
Expand All @@ -243,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) {
DCHECK(block->rows() == 0);
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_child_block, &_child_eos,
state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

static_cast<void>(push(state, &_child_block, _child_eos));
static_cast<void>(push(state, _child_block.get(), _child_eos));
}

return pull(state, block, eos);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vrepeat_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode {
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
bool need_more_input_data() const;
Block* get_child_block() { return &_child_block; }
std::shared_ptr<Block> get_child_block() { return _child_block; }

void debug_string(int indentation_level, std::stringstream* out) const override;

Expand All @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode {
TupleId _output_tuple_id;
const TupleDescriptor* _output_tuple_desc;

Block _child_block;
std::shared_ptr<Block> _child_block;
std::unique_ptr<Block> _intermediate_block {};

std::vector<SlotDescriptor*> _output_slots;
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ namespace doris::vectorized {

VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {}
: ExecNode(pool, tnode, descs) {
_child_block = Block::create_shared();
}

Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Expand Down Expand Up @@ -144,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
// if child_block is empty, get data from child.
while (need_more_input_data()) {
RETURN_IF_ERROR(child(0)->get_next_after_projects(
state, &_child_block, &_child_eos,
state, _child_block.get(), &_child_eos,
std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next,
_children[0], std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3)));

RETURN_IF_ERROR(push(state, &_child_block, _child_eos));
RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos));
}

return pull(state, block, eos);
Expand All @@ -170,7 +172,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch."));

if (_child_block.rows() == 0) {
if (_child_block->rows() == 0) {
break;
}

Expand Down Expand Up @@ -227,13 +229,13 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
Status VTableFunctionNode::_process_next_child_row() {
_cur_child_offset++;

if (_cur_child_offset >= _child_block.rows()) {
if (_cur_child_offset >= _child_block->rows()) {
// release block use count.
for (TableFunction* fn : _fns) {
RETURN_IF_ERROR(fn->process_close());
}

release_block_memory(_child_block);
release_block_memory(*_child_block);
_cur_child_offset = -1;
return Status::OK();
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/vtable_function_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class VTableFunctionNode final : public ExecNode {
return VExpr::open(_vfn_ctxs, state);
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; }
bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; }

void release_resource(doris::RuntimeState* state) override {
if (_num_rows_filtered_counter != nullptr) {
Expand Down Expand Up @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode {
return Status::OK();
}

Block* get_child_block() { return &_child_block; }
std::shared_ptr<Block> get_child_block() { return _child_block; }

private:
Status _prepare_output_slot_ids(const TPlanNode& tnode);
Expand Down Expand Up @@ -135,15 +135,15 @@ class VTableFunctionNode final : public ExecNode {
return;
}
for (auto index : _output_slot_indexs) {
auto src_column = _child_block.get_by_position(index).column;
auto src_column = _child_block->get_by_position(index).column;
columns[index]->insert_many_from(*src_column, _cur_child_offset,
_current_row_insert_times);
}
_current_row_insert_times = 0;
}
int _current_row_insert_times = 0;

Block _child_block;
std::shared_ptr<Block> _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
Expand Down

0 comments on commit 0e1c3d2

Please sign in to comment.