Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Nov 21, 2024
1 parent b3cb480 commit 6c96e8d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 24 deletions.
27 changes: 7 additions & 20 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <string>

#include "common/logging.h"
#include "common/status.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "runtime/memory/mem_tracker.h"
Expand Down Expand Up @@ -95,7 +96,9 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
}

DCHECK(!_block_queue.empty());
auto [next_block, block_byte_size] = std::move(_block_queue.front());
BlockUPtr next_block;
RETURN_IF_ERROR(_block_queue.front().get_block(next_block));
size_t block_byte_size = _block_queue.front().block_byte_size();
_block_queue.pop_front();
_recvr->_parent->memory_used_counter()->update(-(int64_t)block_byte_size);
sub_blocks_memory_usage(block_byte_size);
Expand Down Expand Up @@ -163,30 +166,14 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
}
}

BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(block->deserialize(pblock));
}

const auto rows = block->rows();
if (rows == 0) {
return Status::OK();
}
auto block_byte_size = block->allocated_bytes();
VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n";
PBlock new_pblock = pblock;

std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::OK();
}

COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
const auto block_byte_size = new_pblock.ByteSizeLong();
COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
if (_recvr->_max_wait_worker_time->value() < wait_for_worker) {
_recvr->_max_wait_worker_time->set(wait_for_worker);
Expand All @@ -196,7 +183,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
_recvr->_max_find_recvr_time->set((int64_t)time_to_find_recvr);
}

_block_queue.emplace_back(std::move(block), block_byte_size);
_block_queue.emplace_back(std::move(new_pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
try_set_dep_ready_without_lock();
Expand Down
32 changes: 31 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>

Expand Down Expand Up @@ -260,7 +261,36 @@ class VDataStreamRecvr::SenderQueue {
Status _cancel_status;
int _num_remaining_senders;
std::unique_ptr<MemTracker> _queue_mem_tracker;
std::list<std::pair<BlockUPtr, size_t>> _block_queue;

struct BlockItem {
Status get_block(BlockUPtr& block) {
if (!_block) {
SCOPED_RAW_TIMER(&_deserialize_time);
_block = Block::create_unique();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(_pblock));
}
block.swap(_block);
_block.reset();
return Status::OK();
}

size_t block_byte_size() const { return _block_byte_size; }
int64_t deserialize_time() const { return _deserialize_time; }

BlockItem(BlockUPtr&& block, size_t block_byte_size)
: _block(std::move(block)), _block_byte_size(block_byte_size) {}

BlockItem(PBlock&& pblock, size_t block_byte_size)
: _block(nullptr), _pblock(std::move(pblock)), _block_byte_size(block_byte_size) {}

private:
BlockUPtr _block;
PBlock _pblock;
const size_t _block_byte_size;
int64_t _deserialize_time = 0;
};

std::list<BlockItem> _block_queue;

// sender_id
std::unordered_set<int> _sender_eos_set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
public boolean enableCommonExprPushdown = true;

@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true, varType = VariableAnnotation.DEPRECATED)
public boolean enableLocalExchange = true;
public boolean enableLocalExchange = false;

/**
* For debug purpose, don't merge unique key and agg key when reading data.
Expand Down Expand Up @@ -2352,7 +2352,7 @@ public void initFuzzyModeVariables() {
this.parallelPipelineTaskNum = random.nextInt(8);
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.enableLocalExchange = false;
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -3883,7 +3883,7 @@ public TQueryOptions toThrift() {
tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase());
tResult.setEnableLocalExchange(enableLocalExchange);
tResult.setEnableLocalExchange(false);

tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);

Expand Down

0 comments on commit 6c96e8d

Please sign in to comment.