Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](exec)lazy deserialize pblock in VDataStreamRecvr::SenderQueue #44378

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
#include "vec/runtime/vdata_stream_mgr.h"

#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <stddef.h>

#include <memory>
#include <ostream>
#include <string>
#include <vector>
Expand Down Expand Up @@ -141,9 +143,12 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,

bool eos = request->eos();
if (request->has_block()) {
RETURN_IF_ERROR(recvr->add_block(
request->block(), request->sender_id(), request->be_number(), request->packet_seq(),
eos ? nullptr : done, wait_for_worker, cpu_time_stop_watch.elapsed_time()));
std::unique_ptr<PBlock> pblock_ptr {
const_cast<PTransmitDataParams*>(request)->release_block()};
RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), request->sender_id(),
request->be_number(), request->packet_seq(),
eos ? nullptr : done, wait_for_worker,
cpu_time_stop_watch.elapsed_time()));
}

if (eos) {
Expand Down
68 changes: 28 additions & 40 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ VDataStreamRecvr::SenderQueue::~SenderQueue() {
}

Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
#ifndef NDEBUG
if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
Expand All @@ -79,25 +78,33 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
_debug_string_info());
}
#endif
return _inner_get_batch_without_lock(block, eos);
}
BlockItem block_item;
{
std::lock_guard<std::mutex> l(_lock);
//check and get block_item from data_queue
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
return Status::Cancelled("Cancelled");
}

Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block, bool* eos) {
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
return Status::Cancelled("Cancelled");
}
if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
*eos = true;
return Status::OK();
}

if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
*eos = true;
return Status::OK();
DCHECK(!_block_queue.empty());
block_item = std::move(_block_queue.front());
_block_queue.pop_front();
}

DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
_block_queue.pop_front();
BlockUPtr next_block;
RETURN_IF_ERROR(block_item.get_block(next_block));
size_t block_byte_size = block_item.block_byte_size();
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, block_item.deserialize_time());
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
_recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
std::lock_guard<std::mutex> l(_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rethink the logic

sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
Expand Down Expand Up @@ -133,7 +140,7 @@ void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
}
}

Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done,
const int64_t wait_for_worker,
Expand Down Expand Up @@ -163,30 +170,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
}
}

BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock));
}

const auto rows = block->rows();
if (rows == 0) {
return Status::OK();
}
auto block_byte_size = block->allocated_bytes();
VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n";

std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}

COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
const auto block_byte_size = pblock->ByteSizeLong();
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
_recvr->_max_wait_worker_time->set(wait_for_worker);
Expand All @@ -196,7 +185,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
_recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
}

_block_queue.emplace_back(std::move(block), block_byte_size);
_block_queue.emplace_back(std::move(pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
Expand Down Expand Up @@ -370,7 +359,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, pipeline::Exchang
_first_batch_wait_total_timer = ADD_TIMER(_profile, "FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
_rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", TUnit::UNIT);
_blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", TUnit::UNIT);
_max_wait_worker_time = ADD_COUNTER(_profile, "MaxWaitForWorkerTime", TUnit::UNIT);
_max_wait_to_process_time = ADD_COUNTER(_profile, "MaxWaitToProcessTime", TUnit::UNIT);
Expand Down Expand Up @@ -401,13 +389,13 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
return Status::OK();
}

Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done,
const int64_t wait_for_worker,
const uint64_t time_to_find_recvr) {
SCOPED_ATTACH_TASK(_query_thread_context);
int use_sender_id = _is_merging ? sender_id : 0;
return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done,
return _sender_queues[use_sender_id]->add_block(std::move(pblock), be_number, packet_seq, done,
wait_for_worker, time_to_find_recvr);
}

Expand Down
49 changes: 40 additions & 9 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <gen_cpp/Types_types.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/Types_types.h>
         ^

#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>

Expand Down Expand Up @@ -84,9 +85,9 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {

std::vector<SenderQueue*> sender_queues() const { return _sender_queues; }

Status add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done, const int64_t wait_for_worker,
const uint64_t time_to_find_recvr);
Status add_block(std::unique_ptr<PBlock> pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done,
const int64_t wait_for_worker, const uint64_t time_to_find_recvr);

void add_block(Block* block, int sender_id, bool use_move);

Expand Down Expand Up @@ -157,8 +158,6 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
RuntimeProfile::Counter* _decompress_timer = nullptr;
RuntimeProfile::Counter* _decompress_bytes = nullptr;

// Number of rows received
RuntimeProfile::Counter* _rows_produced_counter = nullptr;
// Number of blocks received
RuntimeProfile::Counter* _blocks_produced_counter = nullptr;
RuntimeProfile::Counter* _max_wait_worker_time = nullptr;
Expand All @@ -181,7 +180,7 @@ class VDataStreamRecvr::SenderQueue {

Status get_batch(Block* next_block, bool* eos);

Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done, const int64_t wait_for_worker,
const uint64_t time_to_find_recvr);

Expand All @@ -205,8 +204,6 @@ class VDataStreamRecvr::SenderQueue {

protected:
friend class pipeline::ExchangeLocalState;
Status _inner_get_batch_without_lock(Block* block, bool* eos);

void try_set_dep_ready_without_lock();

// To record information about several variables in the event of a DCHECK failure.
Expand Down Expand Up @@ -260,7 +257,41 @@ class VDataStreamRecvr::SenderQueue {
Status _cancel_status;
int _num_remaining_senders;
std::unique_ptr<MemTracker> _queue_mem_tracker;
std::list<std::pair<BlockUPtr, size_t>> _block_queue;

// `BlockItem` is used in `_block_queue` to handle both local and remote exchange blocks.
// For local exchange blocks, `BlockUPtr` is used directly without any modification.
// For remote exchange blocks, the `pblock` is stored in `BlockItem`.
// When `getBlock` is called, the `pblock` is deserialized into a usable block.
struct BlockItem {
Status get_block(BlockUPtr& block) {
if (!_block) {
DCHECK(_pblock);
SCOPED_RAW_TIMER(&_deserialize_time);
_block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock));
}
block.swap(_block);
_block.reset();
return Status::OK();
}

size_t block_byte_size() const { return _block_byte_size; }
int64_t deserialize_time() const { return _deserialize_time; }
BlockItem() = default;
BlockItem(BlockUPtr&& block, size_t block_byte_size)
: _block(std::move(block)), _block_byte_size(block_byte_size) {}

BlockItem(std::unique_ptr<PBlock>&& pblock, size_t block_byte_size)
: _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {}

private:
BlockUPtr _block;
std::unique_ptr<PBlock> _pblock;
size_t _block_byte_size = 0;
int64_t _deserialize_time = 0;
};

std::list<BlockItem> _block_queue;

// sender_id
std::unordered_set<int> _sender_eos_set;
Expand Down
Loading