diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 92a2766c93003a..a456d895b32ef1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -101,7 +101,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _state(state), _context(state->get_query_ctx()), _exchange_sink_num(sender_ins_ids.size()), - _send_multi_blocks(false) { + _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && + state->query_options().exchange_multi_blocks_byte_size > 0) { if (_send_multi_blocks) { _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; } diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 00fa25bbcc088f..7ff3c3c65fe467 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -146,50 +146,25 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, } bool eos = request->eos(); - if (!request->blocks().empty()) { - for (int i = 0; i < request->blocks_size(); i++) { - // Previously there was a const_cast here, but in our internal tests this occasionally caused a hard-to-reproduce core dump. - // We suspect it was caused by the const_cast, so we switched to making a copy here. - // In fact, for PBlock, most of the data resides in the PColumnMeta column_metas field, so the copy overhead is small. - // To make the intent explicit, we do not use - // std::unique_ptr pblock_ptr = std::make_unique(request->blocks(i)); - std::unique_ptr pblock_ptr = std::make_unique(); - pblock_ptr->CopyFrom(request->blocks(i)); - auto pass_done = [&]() -> ::google::protobuf::Closure** { - // If it is eos, no callback is needed, done can be nullptr - if (eos) { - return nullptr; - } - // If it is the last block, a callback is needed, pass done - if (i == request->blocks_size() - 1) { - return done; - } else { - // If it is not the last block, the blocks in the request currently belong to the same queue, - // and the callback is handled by the done of the last block - return nullptr; - } - }; - RETURN_IF_ERROR(recvr->add_block( - std::move(pblock_ptr), request->sender_id(), request->be_number(), - request->packet_seq() - request->blocks_size() + i, pass_done(), - wait_for_worker, cpu_time_stop_watch.elapsed_time())); - } - } + Status exec_status = + request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK(); - // old logic, for compatibility - if (request->has_block()) { + auto sender_id = request->sender_id(); + auto be_number = request->be_number(); + if (!request->blocks().empty()) { + RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker, + cpu_time_stop_watch.elapsed_time())); + } else if (request->has_block()) { + // old logic, for compatibility std::unique_ptr pblock_ptr = std::make_unique(); pblock_ptr->CopyFrom(request->block()); - RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), request->sender_id(), - request->be_number(), request->packet_seq(), - eos ? nullptr : done, wait_for_worker, - cpu_time_stop_watch.elapsed_time())); + RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number, + request->packet_seq(), eos ? nullptr : done, + wait_for_worker, cpu_time_stop_watch.elapsed_time())); } if (eos) { - Status exec_status = - request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK(); - recvr->remove_sender(request->sender_id(), request->be_number(), exec_status); + recvr->remove_sender(sender_id, be_number, exec_status); } return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index dd86476f483cf9..a3f84a337ec510 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -55,17 +55,10 @@ VDataStreamRecvr::SenderQueue::SenderQueue( } VDataStreamRecvr::SenderQueue::~SenderQueue() { - // Check pending closures, if it is not empty, should clear it here. but it should not happen. - // closure will delete itself during run method. If it is not called, brpc will memory leak. - DCHECK(_pending_closures.empty()); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - int64_t elapse_time = closure_pair.second.elapsed_time(); - if (_recvr->_max_wait_to_process_time->value() < elapse_time) { - _recvr->_max_wait_to_process_time->set(elapse_time); - } + for (auto& block_item : _block_queue) { + block_item.call_done(_recvr); } - _pending_closures.clear(); + _block_queue.clear(); } Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { @@ -73,9 +66,8 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_is_cancelled: {}, _block_queue_empty: {}, " - "_num_remaining_senders: {}, _debug_string_info: {}", - _is_cancelled, _block_queue.empty(), _num_remaining_senders, - _debug_string_info()); + "_num_remaining_senders: {}", + _is_cancelled, _block_queue.empty(), _num_remaining_senders); } #endif BlockItem block_item; @@ -111,25 +103,14 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { _recvr->_memory_used_counter->update(-(int64_t)block_byte_size); INJECT_MOCK_SLEEP(std::lock_guard l(_lock)); sub_blocks_memory_usage(block_byte_size); - _record_debug_info(); if (_block_queue.empty() && _source_dependency) { if (!_is_cancelled && _num_remaining_senders > 0) { _source_dependency->block(); } } - if (!_pending_closures.empty()) { - auto closure_pair = _pending_closures.front(); - closure_pair.first->Run(); - int64_t elapse_time = closure_pair.second.elapsed_time(); - if (_recvr->_max_wait_to_process_time->value() < elapse_time) { - _recvr->_max_wait_to_process_time->set(elapse_time); - } - _pending_closures.pop_front(); + block_item.call_done(_recvr); - closure_pair.second.stop(); - _recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time()); - } DCHECK(block->empty()); block->swap(*next_block); *eos = false; @@ -187,7 +168,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr pblock, DCHECK(_num_remaining_senders >= 0); if (_num_remaining_senders == 0) { - DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); + DCHECK(_sender_eos_set.contains(be_number)); return Status::OK(); } } @@ -209,15 +190,11 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr pblock, _block_queue.emplace_back(std::move(pblock), block_byte_size); COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); - _record_debug_info(); set_source_ready(l); // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { - MonotonicStopWatch monotonicStopWatch; - monotonicStopWatch.start(); - DCHECK(*done != nullptr); - _pending_closures.emplace_back(*done, monotonicStopWatch); + _block_queue.back().set_done(*done); *done = nullptr; } _recvr->_memory_used_counter->update(block_byte_size); @@ -225,6 +202,76 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr pblock, return Status::OK(); } +Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request, + ::google::protobuf::Closure** done, + const int64_t wait_for_worker, + const uint64_t time_to_find_recvr) { + { + INJECT_MOCK_SLEEP(std::lock_guard l(_lock)); + if (_is_cancelled) { + return Status::OK(); + } + const int be_number = request->be_number(); + // In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq()) + // Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1 + // We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet + const int64_t packet_seq = request->packet_seq() - 1; + auto iter = _packet_seq_map.find(be_number); + if (iter != _packet_seq_map.end()) { + if (iter->second > (packet_seq - request->blocks_size())) { + return Status::InternalError( + "packet already exist [cur_packet_id= {} receive_packet_id={}]", + iter->second, packet_seq); + } + iter->second = packet_seq; + } else { + _packet_seq_map.emplace(be_number, packet_seq); + } + + DCHECK(_num_remaining_senders >= 0); + if (_num_remaining_senders == 0) { + DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); + return Status::OK(); + } + } + + INJECT_MOCK_SLEEP(std::lock_guard l(_lock)); + if (_is_cancelled) { + return Status::OK(); + } + + int64_t total_block_byte_size = 0; + for (int i = 0; i < request->blocks_size(); i++) { + std::unique_ptr pblock = std::make_unique(); + pblock->CopyFrom(request->blocks(i)); + + const auto block_byte_size = pblock->ByteSizeLong(); + COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); + if (_recvr->_max_wait_worker_time->value() < wait_for_worker) { + _recvr->_max_wait_worker_time->set(wait_for_worker); + } + + if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) { + _recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr); + } + + _block_queue.emplace_back(std::move(pblock), block_byte_size); + COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); + total_block_byte_size += block_byte_size; + } + + set_source_ready(l); + + // if done is nullptr, this function can't delay this response + if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) { + _block_queue.back().set_done(*done); + *done = nullptr; + } + _recvr->_memory_used_counter->update(total_block_byte_size); + add_blocks_memory_usage(total_block_byte_size); + return Status::OK(); +} + void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (block->rows() == 0) { return; @@ -260,7 +307,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { return; } _block_queue.emplace_back(std::move(nblock), block_mem_size); - _record_debug_info(); set_source_ready(l); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _recvr->_memory_used_counter->update(block_mem_size); @@ -276,7 +322,6 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { _sender_eos_set.insert(be_number); DCHECK_GT(_num_remaining_senders, 0); _num_remaining_senders--; - _record_debug_info(); VLOG_FILE << "decremented senders: fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id() << " #senders=" << _num_remaining_senders; @@ -300,14 +345,10 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { } { INJECT_MOCK_SLEEP(std::lock_guard l(_lock)); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - int64_t elapse_time = closure_pair.second.elapsed_time(); - if (_recvr->_max_wait_to_process_time->value() < elapse_time) { - _recvr->_max_wait_to_process_time->set(elapse_time); - } + for (auto& block_item : _block_queue) { + block_item.call_done(_recvr); } - _pending_closures.clear(); + _block_queue.clear(); } } @@ -319,14 +360,9 @@ void VDataStreamRecvr::SenderQueue::close() { _is_cancelled = true; set_source_ready(l); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - int64_t elapse_time = closure_pair.second.elapsed_time(); - if (_recvr->_max_wait_to_process_time->value() < elapse_time) { - _recvr->_max_wait_to_process_time->set(elapse_time); - } + for (auto& block_item : _block_queue) { + block_item.call_done(_recvr); } - _pending_closures.clear(); // Delete any batches queued in _block_queue _block_queue.clear(); } @@ -423,6 +459,19 @@ Status VDataStreamRecvr::add_block(std::unique_ptr pblock, int sender_id wait_for_worker, time_to_find_recvr); } +Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request, + ::google::protobuf::Closure** done, + const int64_t wait_for_worker, + const uint64_t time_to_find_recvr) { + SCOPED_ATTACH_TASK(_resource_ctx); + if (_query_context->low_memory_mode()) { + set_low_memory_mode(); + } + int use_sender_id = _is_merging ? request->sender_id() : 0; + return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker, + time_to_find_recvr); +} + void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { if (_query_context->low_memory_mode()) { set_low_memory_mode(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 1a1c84f3e67265..b136553bfb2d72 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -90,6 +90,9 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); + Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done, + const int64_t wait_for_worker, const uint64_t time_to_find_recvr); + void add_block(Block* block, int sender_id, bool use_move); std::string debug_string(); @@ -185,6 +188,10 @@ class VDataStreamRecvr::SenderQueue { Status add_block(std::unique_ptr pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); + + Status add_blocks(const PTransmitDataParams* request, ::google::protobuf::Closure** done, + const int64_t wait_for_worker, const uint64_t time_to_find_recvr); + std::string debug_string(); void add_block(Block* block, bool use_move); @@ -209,50 +216,6 @@ class VDataStreamRecvr::SenderQueue { void set_source_ready(std::lock_guard&); - // To record information about several variables in the event of a DCHECK failure. - // DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0) -#ifndef NDEBUG - constexpr static auto max_record_number = 128; - std::list _record_block_queue; - std::list _record_num_remaining_senders; -#else -#endif - - // only in debug - ALWAYS_INLINE inline void _record_debug_info() { -#ifndef NDEBUG - if (_record_block_queue.size() > max_record_number) { - _record_block_queue.pop_front(); - } - if (_record_num_remaining_senders.size() > max_record_number) { - _record_num_remaining_senders.pop_front(); - } - _record_block_queue.push_back(_block_queue.size()); - _record_num_remaining_senders.push_back(_num_remaining_senders); -#else -#endif - } - - ALWAYS_INLINE inline std::string _debug_string_info() { -#ifndef NDEBUG - std::stringstream out; - DCHECK_EQ(_record_block_queue.size(), _record_num_remaining_senders.size()); - out << "record_debug_info [ \n"; - - auto it1 = _record_block_queue.begin(); - auto it2 = _record_num_remaining_senders.begin(); - for (; it1 != _record_block_queue.end(); it1++, it2++) { - out << "( " - << "_block_queue size : " << *it1 << " , _num_remaining_senders : " << *it2 - << " ) \n"; - } - out << " ]\n"; - return out.str(); -#else -#endif - return ""; - } - // Not managed by this class VDataStreamRecvr* _recvr = nullptr; std::mutex _lock; @@ -290,6 +253,25 @@ class VDataStreamRecvr::SenderQueue { BlockItem(std::unique_ptr&& pblock, size_t block_byte_size) : _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {} + void set_done(google::protobuf::Closure* done) { + // The done callback is only set when the queue memory limit is exceeded. + _done_cb = done; + _wait_timer.start(); + } + + void call_done(VDataStreamRecvr* recvr) { + if (_done_cb != nullptr) { + _done_cb->Run(); + _done_cb = nullptr; + _wait_timer.stop(); + int64_t elapse_time = _wait_timer.elapsed_time(); + if (recvr->_max_wait_to_process_time->value() < elapse_time) { + recvr->_max_wait_to_process_time->set(elapse_time); + } + recvr->_buffer_full_total_timer->update(elapse_time); + } + } + private: BlockUPtr _block; std::unique_ptr _pblock; @@ -297,6 +279,9 @@ class VDataStreamRecvr::SenderQueue { int64_t _deserialize_time = 0; int64_t _decompress_time = 0; size_t _decompress_bytes = 0; + + google::protobuf::Closure* _done_cb = nullptr; + MonotonicStopWatch _wait_timer; }; std::list _block_queue; @@ -305,7 +290,6 @@ class VDataStreamRecvr::SenderQueue { std::unordered_set _sender_eos_set; // be_number => packet_seq std::unordered_map _packet_seq_map; - std::deque> _pending_closures; std::shared_ptr _source_dependency; std::shared_ptr _local_channel_dependency;