diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index a3d24aa0614767..b7477d0b4f8755 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator, StatefulOperator) Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_left_block()); + _child_block = _node->get_left_block(); return StatefulOperator::prepare(state); } Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2beea932a8b891..7ff91fba695aa2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -444,7 +444,7 @@ class StatefulOperator : public StreamingOperator { StatefulOperator(OperatorBuilderBase* builder, ExecNode* node) : StreamingOperator(builder, node), - _child_block(vectorized::Block::create_unique()), + _child_block(vectorized::Block::create_shared()), _child_source_state(SourceState::DEPEND_ON_SOURCE) {} virtual ~StatefulOperator() = default; @@ -484,7 +484,7 @@ class StatefulOperator : public StreamingOperator { } protected: - std::unique_ptr _child_block; + std::shared_ptr _child_block; SourceState _child_source_state; }; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 40f6f5d7b21fb4..d1613a6125f628 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator) Status RepeatOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_child_block()); + _child_block = _node->get_child_block(); return StatefulOperator::prepare(state); } Status RepeatOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 5e1d5c281eb416..6e947c640c9f31 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator) Status TableFunctionOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_child_block()); + _child_block = _node->get_child_block(); return StatefulOperator::prepare(state); } Status TableFunctionOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index f32146398568c3..7d8100aa6caa18 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod _left_block_pos(0), _left_side_eos(false), _old_version_flag(!tnode.__isset.nested_loop_join_node), - _runtime_filter_descs(tnode.runtime_filters) {} + _runtime_filter_descs(tnode.runtime_filters) { + _left_block = Block::create_shared(); +} Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state)); @@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) { do { - release_block_memory(_left_block); + release_block_memory(*_left_block); RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_left_block, &_left_side_eos, + state, _left_block.get(), &_left_side_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - } while (_left_block.rows() == 0 && !_left_side_eos); + } while (_left_block->rows() == 0 && !_left_side_eos); return Status::OK(); } @@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo RETURN_IF_CANCELLED(state); while (need_more_input_data()) { RETURN_IF_ERROR(_fresh_left_block(state)); - RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos)); + RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos)); } return pull(state, block, eos); @@ -280,7 +282,7 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc auto& dst_columns = mutable_block.mutable_columns(); DCHECK(_is_mark_join); for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); @@ -310,7 +312,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, auto& dst_columns = mutable_block.mutable_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); @@ -456,13 +458,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s } else { if (!_is_mark_join) { auto new_size = column_size; - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { if (_cur_probe_row_visited_flags[j] == IsSemi) { new_size++; for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast(dst_columns[i].get()) @@ -488,13 +490,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s } else { ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]); mark_column.reserve(mark_column.size() + _left_side_process_count); - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName src_column = _left_block->get_by_position(i); DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN); dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, _left_side_process_count); @@ -541,7 +543,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( } if constexpr (SetProbeSideFlag) { int end = filter.size(); - for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos; + for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos - 1 : _left_block_pos; i >= _left_block_start_pos; i--) { int offset = 0; if (!_probe_offset_stack.empty()) { @@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream* } void VNestedLoopJoinNode::_release_mem() { - _left_block.clear(); + _left_block->clear(); Blocks tmp_build_blocks; _build_blocks.swap(tmp_build_blocks); @@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_probe_timer); if (_is_output_left_side_only) { - RETURN_IF_ERROR(_build_output_block(&_left_block, block)); + RETURN_IF_ERROR(_build_output_block(_left_block.get(), block)); *eos = _left_side_eos; _need_more_input_data = !_left_side_eos; } else { diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index b309485db519b4..810bf57e7f5e8b 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { : *_output_row_desc; } - Block* get_left_block() { return &_left_block; } + std::shared_ptr get_left_block() { return _left_block; } std::vector& runtime_filter_descs() { return _runtime_filter_descs; } VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } @@ -120,14 +120,14 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // We should try to join rows if there still are some rows from probe side. while (_join_block.rows() < state->batch_size()) { while (_current_build_pos == _build_blocks.size() || - _left_block_pos == _left_block.rows()) { + _left_block_pos == _left_block->rows()) { // if left block is empty(), do not need disprocess the left block rows - if (_left_block.rows() > _left_block_pos) { + if (_left_block->rows() > _left_block_pos) { _left_side_process_count++; } _reset_with_next_probe_row(); - if (_left_block_pos < _left_block.rows()) { + if (_left_block_pos < _left_block->rows()) { if constexpr (set_probe_side_flag) { _probe_offset_stack.push(mutable_join_block.rows()); } @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // _left_block must be cleared before calling get_next(). The child node // does not initialize all tuple ptrs in the row, only the ones that it // is responsible for. - Block _left_block; + std::shared_ptr _left_block; int _left_block_start_pos = 0; int _left_block_pos; // current scan pos in _left_block diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index aaa7ed17d004ab..1765c1dcf7f44c 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descrip _grouping_list(tnode.repeat_node.grouping_list), _output_tuple_id(tnode.repeat_node.output_tuple_id), _child_eos(false), - _repeat_id_idx(0) {} + _repeat_id_idx(0) { + _child_block = Block::create_shared(); +} Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -189,12 +191,12 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - release_block_memory(_child_block); + release_block_memory(*_child_block); _repeat_id_idx = 0; } } RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - *eos = _child_eos && _child_block.rows() == 0; + *eos = _child_eos && _child_block->rows() == 0; reached_limit(output_block, eos); COUNTER_SET(_rows_returned_counter, _num_rows_returned); return Status::OK(); @@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo } bool VRepeatNode::need_more_input_data() const { - return !_child_block.rows() && !_child_eos; + return !_child_block->rows() && !_child_eos; } Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { @@ -243,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { DCHECK(block->rows() == 0); while (need_more_input_data()) { RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_child_block, &_child_eos, + state, _child_block.get(), &_child_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - static_cast(push(state, &_child_block, _child_eos)); + static_cast(push(state, _child_block.get(), _child_eos)); } return pull(state, block, eos); diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 837b4c8aca14e9..94737580031e7b 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode { Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; - Block* get_child_block() { return &_child_block; } + std::shared_ptr get_child_block() { return _child_block; } void debug_string(int indentation_level, std::stringstream* out) const override; @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode { TupleId _output_tuple_id; const TupleDescriptor* _output_tuple_desc; - Block _child_block; + std::shared_ptr _child_block; std::unique_ptr _intermediate_block {}; std::vector _output_slots; diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 23da667b7d6646..be93bde0295b17 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -45,7 +45,9 @@ namespace doris::vectorized { VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} + : ExecNode(pool, tnode, descs) { + _child_block = Block::create_shared(); +} Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -144,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos // if child_block is empty, get data from child. while (need_more_input_data()) { RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_child_block, &_child_eos, + state, _child_block.get(), &_child_eos, std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(push(state, &_child_block, _child_eos)); + RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos)); } return pull(state, block, eos); @@ -170,7 +172,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); - if (_child_block.rows() == 0) { + if (_child_block->rows() == 0) { break; } @@ -227,13 +229,13 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu Status VTableFunctionNode::_process_next_child_row() { _cur_child_offset++; - if (_cur_child_offset >= _child_block.rows()) { + if (_cur_child_offset >= _child_block->rows()) { // release block use count. for (TableFunction* fn : _fns) { RETURN_IF_ERROR(fn->process_close()); } - release_block_memory(_child_block); + release_block_memory(*_child_block); _cur_child_offset = -1; return Status::OK(); } diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 56d1d50330d3a6..bcee8ced50f720 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -62,7 +62,7 @@ class VTableFunctionNode final : public ExecNode { return VExpr::open(_vfn_ctxs, state); } Status get_next(RuntimeState* state, Block* block, bool* eos) override; - bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; } + bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; } void release_resource(doris::RuntimeState* state) override { if (_num_rows_filtered_counter != nullptr) { @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode { return Status::OK(); } - Block* get_child_block() { return &_child_block; } + std::shared_ptr get_child_block() { return _child_block; } private: Status _prepare_output_slot_ids(const TPlanNode& tnode); @@ -135,7 +135,7 @@ class VTableFunctionNode final : public ExecNode { return; } for (auto index : _output_slot_indexs) { - auto src_column = _child_block.get_by_position(index).column; + auto src_column = _child_block->get_by_position(index).column; columns[index]->insert_many_from(*src_column, _cur_child_offset, _current_row_insert_times); } @@ -143,7 +143,7 @@ class VTableFunctionNode final : public ExecNode { } int _current_row_insert_times = 0; - Block _child_block; + std::shared_ptr _child_block; std::vector _child_slots; std::vector _output_slots; int64_t _cur_child_offset = 0;