Skip to content

Commit

Permalink
[bugfix](vtablet_sink) fix max_pending_bytes for vtablet_sink (#9462)
Browse files Browse the repository at this point in the history

Co-authored-by: yixiutt <yixiu@selectdb.com>
  • Loading branch information
yixiutt and yixiutt authored May 11, 2022
1 parent 3ba5ff4 commit e3bac86
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here temporarily.
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && _pending_batches_num > 0) {
while (!_cancelled &&
(_pending_batches_bytes > _max_pending_batches_bytes ||
_parent->_mem_tracker->any_limit_exceeded()) &&
_pending_batches_num > 0) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Expand All @@ -165,6 +168,7 @@ Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
//To simplify the add_row logic, postpone adding block into req until the time of sending req
_pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request);
_pending_batches_num++;
}
Expand Down

0 comments on commit e3bac86

Please sign in to comment.