Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_
_state(state),
_context(state->get_query_ctx()),
_exchange_sink_num(sender_ins_ids.size()),
_send_multi_blocks(false) {
_send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size &&
state->query_options().exchange_multi_blocks_byte_size > 0) {
if (_send_multi_blocks) {
_send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size;
}
Expand Down
51 changes: 13 additions & 38 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,50 +146,25 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
}

bool eos = request->eos();
if (!request->blocks().empty()) {
for (int i = 0; i < request->blocks_size(); i++) {
// Previously there was a const_cast here, but in our internal tests this occasionally caused a hard-to-reproduce core dump.
// We suspect it was caused by the const_cast, so we switched to making a copy here.
// In fact, for PBlock, most of the data resides in the PColumnMeta column_metas field, so the copy overhead is small.
// To make the intent explicit, we do not use
// std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>(request->blocks(i));
std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
pblock_ptr->CopyFrom(request->blocks(i));
auto pass_done = [&]() -> ::google::protobuf::Closure** {
// If it is eos, no callback is needed, done can be nullptr
if (eos) {
return nullptr;
}
// If it is the last block, a callback is needed, pass done
if (i == request->blocks_size() - 1) {
return done;
} else {
// If it is not the last block, the blocks in the request currently belong to the same queue,
// and the callback is handled by the done of the last block
return nullptr;
}
};
RETURN_IF_ERROR(recvr->add_block(
std::move(pblock_ptr), request->sender_id(), request->be_number(),
request->packet_seq() - request->blocks_size() + i, pass_done(),
wait_for_worker, cpu_time_stop_watch.elapsed_time()));
}
}
Status exec_status =
request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();

// old logic, for compatibility
if (request->has_block()) {
auto sender_id = request->sender_id();
auto be_number = request->be_number();
if (!request->blocks().empty()) {
RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
cpu_time_stop_watch.elapsed_time()));
} else if (request->has_block()) {
// old logic, for compatibility
std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
pblock_ptr->CopyFrom(request->block());
RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), request->sender_id(),
request->be_number(), request->packet_seq(),
eos ? nullptr : done, wait_for_worker,
cpu_time_stop_watch.elapsed_time()));
RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
request->packet_seq(), eos ? nullptr : done,
wait_for_worker, cpu_time_stop_watch.elapsed_time()));
}

if (eos) {
Status exec_status =
request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
recvr->remove_sender(request->sender_id(), request->be_number(), exec_status);
recvr->remove_sender(sender_id, be_number, exec_status);
}
return Status::OK();
}
Expand Down
143 changes: 96 additions & 47 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,19 @@ VDataStreamRecvr::SenderQueue::SenderQueue(
}

VDataStreamRecvr::SenderQueue::~SenderQueue() {
// Check pending closures, if it is not empty, should clear it here. but it should not happen.
// closure will delete itself during run method. If it is not called, brpc will memory leak.
DCHECK(_pending_closures.empty());
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
int64_t elapse_time = closure_pair.second.elapsed_time();
if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
_recvr->_max_wait_to_process_time->set(elapse_time);
}
for (auto& block_item : _block_queue) {
block_item.call_done(_recvr);
}
_pending_closures.clear();
_block_queue.clear();
}

Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
#ifndef NDEBUG
if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"_is_cancelled: {}, _block_queue_empty: {}, "
"_num_remaining_senders: {}, _debug_string_info: {}",
_is_cancelled, _block_queue.empty(), _num_remaining_senders,
_debug_string_info());
"_num_remaining_senders: {}",
_is_cancelled, _block_queue.empty(), _num_remaining_senders);
}
#endif
BlockItem block_item;
Expand Down Expand Up @@ -111,25 +103,14 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
_recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
if (!_is_cancelled && _num_remaining_senders > 0) {
_source_dependency->block();
}
}

if (!_pending_closures.empty()) {
auto closure_pair = _pending_closures.front();
closure_pair.first->Run();
int64_t elapse_time = closure_pair.second.elapsed_time();
if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
_recvr->_max_wait_to_process_time->set(elapse_time);
}
_pending_closures.pop_front();
block_item.call_done(_recvr);

closure_pair.second.stop();
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
}
DCHECK(block->empty());
block->swap(*next_block);
*eos = false;
Expand Down Expand Up @@ -187,7 +168,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,

DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
DCHECK(_sender_eos_set.contains(be_number));
return Status::OK();
}
}
Expand All @@ -209,22 +190,88 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,

_block_queue.emplace_back(std::move(pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
set_source_ready(l);

// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
MonotonicStopWatch monotonicStopWatch;
monotonicStopWatch.start();
DCHECK(*done != nullptr);
_pending_closures.emplace_back(*done, monotonicStopWatch);
_block_queue.back().set_done(*done);
*done = nullptr;
}
_recvr->_memory_used_counter->update(block_byte_size);
add_blocks_memory_usage(block_byte_size);
return Status::OK();
}

Status VDataStreamRecvr::SenderQueue::add_blocks(const PTransmitDataParams* request,
::google::protobuf::Closure** done,
const int64_t wait_for_worker,
const uint64_t time_to_find_recvr) {
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return Status::OK();
}
const int be_number = request->be_number();
// In the request, the packet_seq for blocks is [request->packet_seq() - blocks_size(), request->packet_seq())
// Note this is a left-closed, right-open interval; the packet_seq of the last block is request->packet_seq() - 1
// We store the packet_seq of the last block in _packet_seq_map so we can compare it with the packet_seq of the next received packet
const int64_t packet_seq = request->packet_seq() - 1;
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
if (iter->second > (packet_seq - request->blocks_size())) {
return Status::InternalError(
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
iter->second, packet_seq);
}
iter->second = packet_seq;
} else {
_packet_seq_map.emplace(be_number, packet_seq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the packet_seq seem different with origin logic

}

DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
return Status::OK();
}
}

INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return Status::OK();
}

int64_t total_block_byte_size = 0;
for (int i = 0; i < request->blocks_size(); i++) {
std::unique_ptr<PBlock> pblock = std::make_unique<PBlock>();
pblock->CopyFrom(request->blocks(i));

const auto block_byte_size = pblock->ByteSizeLong();
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
_recvr->_max_wait_worker_time->set(wait_for_worker);
}

if (_recvr->_max_find_recvr_time->value() < time_to_find_recvr) {
_recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
}

_block_queue.emplace_back(std::move(pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
total_block_byte_size += block_byte_size;
}

set_source_ready(l);

// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(total_block_byte_size)) {
_block_queue.back().set_done(*done);
*done = nullptr;
}
_recvr->_memory_used_counter->update(total_block_byte_size);
add_blocks_memory_usage(total_block_byte_size);
return Status::OK();
}

void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
if (block->rows() == 0) {
return;
Expand Down Expand Up @@ -260,7 +307,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
return;
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_record_debug_info();
set_source_ready(l);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_memory_used_counter->update(block_mem_size);
Expand All @@ -276,7 +322,6 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
_sender_eos_set.insert(be_number);
DCHECK_GT(_num_remaining_senders, 0);
_num_remaining_senders--;
_record_debug_info();
VLOG_FILE << "decremented senders: fragment_instance_id="
<< print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
<< " #senders=" << _num_remaining_senders;
Expand All @@ -300,14 +345,10 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
}
{
INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
int64_t elapse_time = closure_pair.second.elapsed_time();
if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
_recvr->_max_wait_to_process_time->set(elapse_time);
}
for (auto& block_item : _block_queue) {
block_item.call_done(_recvr);
}
_pending_closures.clear();
_block_queue.clear();
}
}

Expand All @@ -319,14 +360,9 @@ void VDataStreamRecvr::SenderQueue::close() {
_is_cancelled = true;
set_source_ready(l);

for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
int64_t elapse_time = closure_pair.second.elapsed_time();
if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
_recvr->_max_wait_to_process_time->set(elapse_time);
}
for (auto& block_item : _block_queue) {
block_item.call_done(_recvr);
}
_pending_closures.clear();
// Delete any batches queued in _block_queue
_block_queue.clear();
}
Expand Down Expand Up @@ -423,6 +459,19 @@ Status VDataStreamRecvr::add_block(std::unique_ptr<PBlock> pblock, int sender_id
wait_for_worker, time_to_find_recvr);
}

Status VDataStreamRecvr::add_blocks(const PTransmitDataParams* request,
::google::protobuf::Closure** done,
const int64_t wait_for_worker,
const uint64_t time_to_find_recvr) {
SCOPED_ATTACH_TASK(_resource_ctx);
if (_query_context->low_memory_mode()) {
set_low_memory_mode();
}
int use_sender_id = _is_merging ? request->sender_id() : 0;
return _sender_queues[use_sender_id]->add_blocks(request, done, wait_for_worker,
time_to_find_recvr);
}

void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
if (_query_context->low_memory_mode()) {
set_low_memory_mode();
Expand Down
Loading
Loading