From 68067d9679f1542d5ef5c49122ae508991f6c906 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:23:21 +0800 Subject: [PATCH 1/9] [bugfix](core) child block is shared between operator and node, it should be a shared ptr --- .../exec/nested_loop_join_probe_operator.cpp | 3 +- be/src/pipeline/exec/operator.h | 2 +- be/src/pipeline/exec/repeat_operator.cpp | 7 ++--- .../pipeline/exec/table_function_operator.cpp | 3 +- .../vec/exec/join/vnested_loop_join_node.cpp | 30 ++++++++++--------- be/src/vec/exec/join/vnested_loop_join_node.h | 4 +-- be/src/vec/exec/vrepeat_node.cpp | 8 +++-- be/src/vec/exec/vrepeat_node.h | 4 +-- be/src/vec/exec/vtable_function_node.cpp | 8 +++-- be/src/vec/exec/vtable_function_node.h | 8 ++--- 10 files changed, 40 insertions(+), 37 deletions(-) diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index a3d24aa0614767..b7477d0b4f8755 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(NestLoopJoinProbeOperator, StatefulOperator) Status NestLoopJoinProbeOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_left_block()); + _child_block = _node->get_left_block(); return StatefulOperator::prepare(state); } Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2beea932a8b891..37be0d2656a914 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -484,7 +484,7 @@ class StatefulOperator : public StreamingOperator { } protected: - std::unique_ptr _child_block; + std::shared_ptr _child_block; SourceState _child_source_state; }; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 40f6f5d7b21fb4..53792d1ff28286 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -34,12 +34,11 @@ OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator) Status RepeatOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_child_block()); + _child_block = _node->get_child_block(); return StatefulOperator::prepare(state); } Status RepeatOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } @@ -231,13 +230,13 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block->clear_column_data(_child_x->row_desc().num_materialized_slots()); _repeat_id_idx = 0; } } RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - if (_child_eos && _child_block.rows() == 0) { + if (_child_eos && _child_block->rows() == 0) { source_state = SourceState::FINISHED; } local_state.reached_limit(output_block, source_state); diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 5e1d5c281eb416..6e947c640c9f31 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -33,12 +33,11 @@ OPERATOR_CODE_GENERATOR(TableFunctionOperator, StatefulOperator) Status TableFunctionOperator::prepare(doris::RuntimeState* state) { // just for speed up, the way is dangerous - _child_block.reset(_node->get_child_block()); + _child_block = _node->get_child_block(); return StatefulOperator::prepare(state); } Status TableFunctionOperator::close(doris::RuntimeState* state) { - _child_block.release(); return StatefulOperator::close(state); } diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index f32146398568c3..f62fd896f4237f 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -97,7 +97,9 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnod _left_block_pos(0), _left_side_eos(false), _old_version_flag(!tnode.__isset.nested_loop_join_node), - _runtime_filter_descs(tnode.runtime_filters) {} + _runtime_filter_descs(tnode.runtime_filters) { + _left_block = Block::create_shared(); +} Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state)); @@ -252,15 +254,15 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) { do { - release_block_memory(_left_block); + release_block_memory(_left_block.get()); RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_left_block, &_left_side_eos, + state, _left_block.get(), &_left_side_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - } while (_left_block.rows() == 0 && !_left_side_eos); + } while (_left_block->rows() == 0 && !_left_side_eos); return Status::OK(); } @@ -270,7 +272,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo RETURN_IF_CANCELLED(state); while (need_more_input_data()) { RETURN_IF_ERROR(_fresh_left_block(state)); - RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos)); + RETURN_IF_ERROR(push(state, _left_block.get(), _left_side_eos)); } return pull(state, block, eos); @@ -280,7 +282,7 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc auto& dst_columns = mutable_block.mutable_columns(); DCHECK(_is_mark_join); for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); @@ -310,7 +312,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block, auto& dst_columns = mutable_block.mutable_columns(); const int max_added_rows = now_process_build_block.rows(); for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName& src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { auto origin_sz = dst_columns[i]->size(); DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN); @@ -456,13 +458,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s } else { if (!_is_mark_join) { auto new_size = column_size; - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { if (_cur_probe_row_visited_flags[j] == IsSemi) { new_size++; for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName src_column = _left_block->get_by_position(i); if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast(dst_columns[i].get()) @@ -488,13 +490,13 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s } else { ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]); mark_column.reserve(mark_column.size() + _left_side_process_count); - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block->rows()); for (int j = _left_block_start_pos; j < _left_block_start_pos + _left_side_process_count; ++j) { mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < _num_probe_side_columns; ++i) { - const ColumnWithTypeAndName src_column = _left_block.get_by_position(i); + const ColumnWithTypeAndName src_column = _left_block->get_by_position(i); DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN); dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, _left_side_process_count); @@ -541,7 +543,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl( } if constexpr (SetProbeSideFlag) { int end = filter.size(); - for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos; + for (int i = _left_block_pos == _left_block->rows() ? _left_block_pos - 1 : _left_block_pos; i >= _left_block_start_pos; i--) { int offset = 0; if (!_probe_offset_stack.empty()) { @@ -648,7 +650,7 @@ void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream* } void VNestedLoopJoinNode::_release_mem() { - _left_block.clear(); + _left_block->clear(); Blocks tmp_build_blocks; _build_blocks.swap(tmp_build_blocks); @@ -664,7 +666,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_probe_timer); if (_is_output_left_side_only) { - RETURN_IF_ERROR(_build_output_block(&_left_block, block)); + RETURN_IF_ERROR(_build_output_block(_left_block.get(), block)); *eos = _left_side_eos; _need_more_input_data = !_left_side_eos; } else { diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index b309485db519b4..cb3fe3a7ae3eed 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { : *_output_row_desc; } - Block* get_left_block() { return &_left_block; } + BlockSPtr get_left_block() { return _left_block; } std::vector& runtime_filter_descs() { return _runtime_filter_descs; } VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // _left_block must be cleared before calling get_next(). The child node // does not initialize all tuple ptrs in the row, only the ones that it // is responsible for. - Block _left_block; + BlockSPtr _left_block; int _left_block_start_pos = 0; int _left_block_pos; // current scan pos in _left_block diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index aaa7ed17d004ab..bafe54f3d64f6a 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -53,7 +53,9 @@ VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const Descrip _grouping_list(tnode.repeat_node.grouping_list), _output_tuple_id(tnode.repeat_node.output_tuple_id), _child_eos(false), - _repeat_id_idx(0) {} + _repeat_id_idx(0) { + _child_block = Block::create_shared(); +} Status VRepeatNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -194,7 +196,7 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b } } RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - *eos = _child_eos && _child_block.rows() == 0; + *eos = _child_eos && _child_block->rows() == 0; reached_limit(output_block, eos); COUNTER_SET(_rows_returned_counter, _num_rows_returned); return Status::OK(); @@ -225,7 +227,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo } bool VRepeatNode::need_more_input_data() const { - return !_child_block.rows() && !_child_eos; + return !_child_block->rows() && !_child_eos; } Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 837b4c8aca14e9..61a16411de1372 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode { Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; - Block* get_child_block() { return &_child_block; } + BlockSPtr get_child_block() { return _child_block; } void debug_string(int indentation_level, std::stringstream* out) const override; @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode { TupleId _output_tuple_id; const TupleDescriptor* _output_tuple_desc; - Block _child_block; + BlockSPtr _child_block; std::unique_ptr _intermediate_block {}; std::vector _output_slots; diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 23da667b7d6646..b6eaa61b550ef6 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -45,7 +45,9 @@ namespace doris::vectorized { VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) {} + : ExecNode(pool, tnode, descs) { + _child_block = Block::create_shared(); +} Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -170,7 +172,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); - if (_child_block.rows() == 0) { + if (_child_block->rows() == 0) { break; } @@ -227,7 +229,7 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu Status VTableFunctionNode::_process_next_child_row() { _cur_child_offset++; - if (_cur_child_offset >= _child_block.rows()) { + if (_cur_child_offset >= _child_block->rows()) { // release block use count. for (TableFunction* fn : _fns) { RETURN_IF_ERROR(fn->process_close()); diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 56d1d50330d3a6..9c4c3d4ff68b62 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -62,7 +62,7 @@ class VTableFunctionNode final : public ExecNode { return VExpr::open(_vfn_ctxs, state); } Status get_next(RuntimeState* state, Block* block, bool* eos) override; - bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; } + bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; } void release_resource(doris::RuntimeState* state) override { if (_num_rows_filtered_counter != nullptr) { @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode { return Status::OK(); } - Block* get_child_block() { return &_child_block; } + BlockSPtr get_child_block() { return _child_block; } private: Status _prepare_output_slot_ids(const TPlanNode& tnode); @@ -135,7 +135,7 @@ class VTableFunctionNode final : public ExecNode { return; } for (auto index : _output_slot_indexs) { - auto src_column = _child_block.get_by_position(index).column; + auto src_column = _child_block->get_by_position(index).column; columns[index]->insert_many_from(*src_column, _cur_child_offset, _current_row_insert_times); } @@ -143,7 +143,7 @@ class VTableFunctionNode final : public ExecNode { } int _current_row_insert_times = 0; - Block _child_block; + std::shared_ptr _child_block; std::vector _child_slots; std::vector _output_slots; int64_t _cur_child_offset = 0; From 525ace5fe85a57dbbda2faf14f0c1ec391605b14 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:33:21 +0800 Subject: [PATCH 2/9] f --- be/src/vec/exec/join/vnested_loop_join_node.h | 4 ++-- be/src/vec/exec/vrepeat_node.h | 4 ++-- be/src/vec/exec/vtable_function_node.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index cb3fe3a7ae3eed..32e8a23437dfdb 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { : *_output_row_desc; } - BlockSPtr get_left_block() { return _left_block; } + std::shared get_left_block() { return _left_block; } std::vector& runtime_filter_descs() { return _runtime_filter_descs; } VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // _left_block must be cleared before calling get_next(). The child node // does not initialize all tuple ptrs in the row, only the ones that it // is responsible for. - BlockSPtr _left_block; + std::shared _left_block; int _left_block_start_pos = 0; int _left_block_pos; // current scan pos in _left_block diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 61a16411de1372..6b19b41a76fd99 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode { Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; - BlockSPtr get_child_block() { return _child_block; } + std::shared get_child_block() { return _child_block; } void debug_string(int indentation_level, std::stringstream* out) const override; @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode { TupleId _output_tuple_id; const TupleDescriptor* _output_tuple_desc; - BlockSPtr _child_block; + std::shared _child_block; std::unique_ptr _intermediate_block {}; std::vector _output_slots; diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 9c4c3d4ff68b62..b4f9c67db738ea 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode { return Status::OK(); } - BlockSPtr get_child_block() { return _child_block; } + std::shared get_child_block() { return _child_block; } private: Status _prepare_output_slot_ids(const TPlanNode& tnode); From 8910a34004541099a557d64f40d40de30521fca1 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:34:24 +0800 Subject: [PATCH 3/9] f --- be/src/vec/exec/join/vnested_loop_join_node.h | 4 ++-- be/src/vec/exec/vrepeat_node.h | 4 ++-- be/src/vec/exec/vtable_function_node.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 32e8a23437dfdb..11ab3a85d80f58 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -95,7 +95,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { : *_output_row_desc; } - std::shared get_left_block() { return _left_block; } + std::shared_ptr get_left_block() { return _left_block; } std::vector& runtime_filter_descs() { return _runtime_filter_descs; } VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } @@ -260,7 +260,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // _left_block must be cleared before calling get_next(). The child node // does not initialize all tuple ptrs in the row, only the ones that it // is responsible for. - std::shared _left_block; + std::shared_ptr _left_block; int _left_block_start_pos = 0; int _left_block_pos; // current scan pos in _left_block diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 6b19b41a76fd99..94737580031e7b 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -57,7 +57,7 @@ class VRepeatNode : public ExecNode { Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data() const; - std::shared get_child_block() { return _child_block; } + std::shared_ptr get_child_block() { return _child_block; } void debug_string(int indentation_level, std::stringstream* out) const override; @@ -74,7 +74,7 @@ class VRepeatNode : public ExecNode { TupleId _output_tuple_id; const TupleDescriptor* _output_tuple_desc; - std::shared _child_block; + std::shared_ptr _child_block; std::unique_ptr _intermediate_block {}; std::vector _output_slots; diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index b4f9c67db738ea..bcee8ced50f720 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -92,7 +92,7 @@ class VTableFunctionNode final : public ExecNode { return Status::OK(); } - std::shared get_child_block() { return _child_block; } + std::shared_ptr get_child_block() { return _child_block; } private: Status _prepare_output_slot_ids(const TPlanNode& tnode); From c26fd9cc07f3def9e0119b99fc9dc3855ef8a2f1 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:35:35 +0800 Subject: [PATCH 4/9] f --- be/src/vec/exec/join/vnested_loop_join_node.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 11ab3a85d80f58..810bf57e7f5e8b 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -120,14 +120,14 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { // We should try to join rows if there still are some rows from probe side. while (_join_block.rows() < state->batch_size()) { while (_current_build_pos == _build_blocks.size() || - _left_block_pos == _left_block.rows()) { + _left_block_pos == _left_block->rows()) { // if left block is empty(), do not need disprocess the left block rows - if (_left_block.rows() > _left_block_pos) { + if (_left_block->rows() > _left_block_pos) { _left_side_process_count++; } _reset_with_next_probe_row(); - if (_left_block_pos < _left_block.rows()) { + if (_left_block_pos < _left_block->rows()) { if constexpr (set_probe_side_flag) { _probe_offset_stack.push(mutable_join_block.rows()); } From 08689abced6aea95d27435bda0b821c0356236ba Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:40:00 +0800 Subject: [PATCH 5/9] f --- be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index f62fd896f4237f..7d8100aa6caa18 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -254,7 +254,7 @@ Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* Status VNestedLoopJoinNode::_fresh_left_block(doris::RuntimeState* state) { do { - release_block_memory(_left_block.get()); + release_block_memory(*_left_block); RETURN_IF_ERROR(child(0)->get_next_after_projects( state, _left_block.get(), &_left_side_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & From a0e7cbe17f4554aea8abb99a223f0e4331c4f82e Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:41:42 +0800 Subject: [PATCH 6/9] f --- be/src/vec/exec/vrepeat_node.cpp | 4 ++-- be/src/vec/exec/vtable_function_node.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index bafe54f3d64f6a..14c2f1559ddc70 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -245,13 +245,13 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { DCHECK(block->rows() == 0); while (need_more_input_data()) { RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_child_block, &_child_eos, + state, _child_block.get(), &_child_eos, std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - static_cast(push(state, &_child_block, _child_eos)); + static_cast(push(state, _child_block.get(), _child_eos)); } return pull(state, block, eos); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index b6eaa61b550ef6..4d43fadc13e61e 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -146,12 +146,12 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos // if child_block is empty, get data from child. while (need_more_input_data()) { RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, &_child_block, &_child_eos, + state, _child_block.get(), &_child_eos, std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next, _children[0], std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))); - RETURN_IF_ERROR(push(state, &_child_block, _child_eos)); + RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos)); } return pull(state, block, eos); From 59cd92090f8ac89d759e533c1ea1084c7359f7f0 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:42:56 +0800 Subject: [PATCH 7/9] f --- be/src/vec/exec/vrepeat_node.cpp | 2 +- be/src/vec/exec/vtable_function_node.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 14c2f1559ddc70..1765c1dcf7f44c 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -191,7 +191,7 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - release_block_memory(_child_block); + release_block_memory(*_child_block); _repeat_id_idx = 0; } } diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 4d43fadc13e61e..be93bde0295b17 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -235,7 +235,7 @@ Status VTableFunctionNode::_process_next_child_row() { RETURN_IF_ERROR(fn->process_close()); } - release_block_memory(_child_block); + release_block_memory(*_child_block); _cur_child_offset = -1; return Status::OK(); } From fba36e59a3ec1879b64e91f13949bff8249f1b58 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 11:46:46 +0800 Subject: [PATCH 8/9] f --- be/src/pipeline/exec/repeat_operator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 53792d1ff28286..d1613a6125f628 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -230,13 +230,13 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - _child_block->clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); _repeat_id_idx = 0; } } RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - if (_child_eos && _child_block->rows() == 0) { + if (_child_eos && _child_block.rows() == 0) { source_state = SourceState::FINISHED; } local_state.reached_limit(output_block, source_state); From cb140715c7ebd274d910c3a550f1a5a077d0d323 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 7 Dec 2023 13:47:57 +0800 Subject: [PATCH 9/9] f --- be/src/pipeline/exec/operator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 37be0d2656a914..7ff91fba695aa2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -444,7 +444,7 @@ class StatefulOperator : public StreamingOperator { StatefulOperator(OperatorBuilderBase* builder, ExecNode* node) : StreamingOperator(builder, node), - _child_block(vectorized::Block::create_unique()), + _child_block(vectorized::Block::create_shared()), _child_source_state(SourceState::DEPEND_ON_SOURCE) {} virtual ~StatefulOperator() = default;