diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 70b5277524a551..30e73c419687f7 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -98,7 +98,8 @@ bool MemTableMemoryLimiter::_soft_limit_reached() { } bool MemTableMemoryLimiter::_hard_limit_reached() { - return _mem_tracker->consumption() > _load_hard_mem_limit || + // Include reserved memory in the check to ensure strict limit enforcement + return (_mem_tracker->consumption() + _reserved_mem_usage) > _load_hard_mem_limit || _sys_avail_mem_less_than_warning_water_mark() > 0 || _process_used_mem_more_than_soft_mem_limit() > 0; } @@ -114,30 +115,46 @@ int64_t MemTableMemoryLimiter::_need_flush() { return 1; } }); - int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; + // Include reserved memory in the calculation + int64_t total_consumption = _mem_tracker->consumption() + _reserved_mem_usage; + int64_t limit1 = total_consumption - _load_soft_mem_limit; int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); int64_t need_flush = std::max({limit1, limit2, limit3}); return need_flush - _queue_mem_usage - _flush_mem_usage; } -void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check) { +void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_check, + int64_t estimated_mem_size) { // Check the soft limit. DCHECK(_load_soft_mem_limit > 0); + + // Fast path: check without lock first do { DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", { LOG(INFO) << "debug memtable limit reached"; break; }); if (!_soft_limit_reached() || _load_usage_low()) { + // If we need to reserve memory, try to do it with lock + if (estimated_mem_size > 0) { + if (reserve_memory(estimated_mem_size)) { + return; // Fast path: reserved successfully without waiting + } + // Reservation failed, need to go through slow path + break; + } return; } } while (false); + MonotonicStopWatch timer; timer.start(); std::unique_lock l(_lock); g_memtable_memory_limit_waiting_threads << 1; bool first = true; + bool reservation_done = false; + do { if (!first) { auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); @@ -151,13 +168,29 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_c return; } first = false; + + // Check if we can reserve the requested memory + if (estimated_mem_size > 0 && !reservation_done) { + // Try to reserve memory. This ensures we don't exceed hard limit even with + // multiple concurrent writes. Uses unsafe version since we already hold the lock. + if (reserve_memory_unsafe(estimated_mem_size)) { + // Successfully reserved, we can proceed + reservation_done = true; + break; + } + } else if (estimated_mem_size <= 0 && !_hard_limit_reached()) { + // No reservation needed and not at hard limit, can proceed + break; + } + int64_t need_flush = _need_flush(); - if (need_flush > 0) { + if (need_flush > 0 || (estimated_mem_size > 0 && !reservation_done)) { auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " << GlobalMemoryArbitrator::sys_mem_available_details_str() << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage) << ", memtable writers num: " << _writers.size() << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) @@ -176,9 +209,15 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_c } else { // will not reach here } - _flush_active_memtables(need_flush); + int64_t flush_size = std::max(need_flush, estimated_mem_size); + _flush_active_memtables(flush_size); } - } while (_hard_limit_reached() && !_load_usage_low()); + // Continue waiting if: + // 1. We need to reserve memory but haven't succeeded yet, OR + // 2. Hard limit is still reached + } while ((estimated_mem_size > 0 && !reservation_done) || + (_hard_limit_reached() && !_load_usage_low())); + g_memtable_memory_limit_waiting_threads << -1; timer.stop(); int64_t time_ms = timer.elapsed_time() / 1000 / 1000; @@ -189,6 +228,7 @@ void MemTableMemoryLimiter::handle_memtable_flush(std::function cancel_c << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " << GlobalMemoryArbitrator::sys_mem_available_details_str() << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage) << ", memtable writers num: " << _writers.size() << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) @@ -249,6 +289,60 @@ int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { return mem_flushed; } +// Thread-safe version - acquires lock internally +bool MemTableMemoryLimiter::reserve_memory(int64_t size) { + if (size <= 0) { + return true; + } + std::lock_guard l(_lock); + return reserve_memory_unsafe(size); +} + +// Thread-safe version - acquires lock internally +void MemTableMemoryLimiter::release_memory(int64_t size) { + if (size <= 0) { + return; + } + std::lock_guard l(_lock); + release_memory_unsafe(size); +} + +// Unsafe version - must be called with _lock already held +bool MemTableMemoryLimiter::reserve_memory_unsafe(int64_t size) { + if (size <= 0) { + return true; + } + int64_t total_after_reserve = _mem_tracker->consumption() + _reserved_mem_usage + size; + // Also check process hard limit to prevent allocation failures + int64_t process_hard_limit_exceeded = GlobalMemoryArbitrator::process_memory_usage() + + size - MemInfo::mem_limit(); + if (total_after_reserve > _load_hard_mem_limit || + _sys_avail_mem_less_than_warning_water_mark() > 0 || + _process_used_mem_more_than_soft_mem_limit() > 0 || + process_hard_limit_exceeded > 0) { + return false; + } + _reserved_mem_usage += size; + return true; +} + +// Unsafe version - must be called with _lock already held +void MemTableMemoryLimiter::release_memory_unsafe(int64_t size) { + if (size <= 0) { + return; + } + _reserved_mem_usage -= size; + if (_reserved_mem_usage < 0) { + LOG(ERROR) << "reserved memory usage is negative: " << _reserved_mem_usage + << ", this indicates a bug in reservation tracking. Reset to 0"; + _reserved_mem_usage = 0; + } + // Notify waiting threads that memory has been released + if (!_hard_limit_reached()) { + _hard_limit_end_cond.notify_all(); + } +} + void MemTableMemoryLimiter::refresh_mem_tracker() { std::lock_guard l(_lock); _refresh_mem_tracker(); @@ -271,6 +365,7 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { _log_timer.reset(); LOG(INFO) << ss.str() << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) + << ", reserved: " << PrettyPrinter::print_bytes(_reserved_mem_usage) << ", memtable writers num: " << _writers.size() << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 34dcb2b06b4cb9..170e53408ebe3a 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -44,7 +44,25 @@ class MemTableMemoryLimiter { // If yes, it will flush memtable to try to reduce memory consumption. // Every write operation will call this API to check if need flush memtable OR hang // when memory is not available. - void handle_memtable_flush(std::function cancel_check); + // estimated_mem_size: estimated memory size that will be allocated after this call. + // If > 0, will reserve this amount before returning to ensure + // strict memory limit enforcement. + void handle_memtable_flush(std::function cancel_check, int64_t estimated_mem_size = 0); + + // Reserve memory space. Returns true if successful, false if would exceed limits. + // Thread-safe version that acquires the lock internally. + bool reserve_memory(int64_t size); + + // Release reserved memory. + // Thread-safe version that acquires the lock internally. + void release_memory(int64_t size); + + // Reserve memory space without acquiring lock. MUST be called with _lock held. + // Returns true if successful, false if would exceed limits. + bool reserve_memory_unsafe(int64_t size); + + // Release reserved memory without acquiring lock. MUST be called with _lock held. + void release_memory_unsafe(int64_t size); void register_writer(std::weak_ptr writer); @@ -54,7 +72,43 @@ class MemTableMemoryLimiter { int64_t mem_usage() const { return _mem_usage; } + // RAII guard for memory reservation + // This guard automatically releases reserved memory when it goes out of scope. + // The reservation must be done externally (e.g., in handle_memtable_flush). + class MemoryReservationGuard { + public: + MemoryReservationGuard(MemTableMemoryLimiter* limiter, int64_t size) + : _limiter(limiter), _size(size), _reserved(false) {} + + ~MemoryReservationGuard() { + if (_reserved && _limiter != nullptr && _size > 0) { + // Use thread-safe version that acquires lock internally + _limiter->release_memory(_size); + } + } + + // Mark as reserved. Should be called after successful reservation in handle_memtable_flush. + void set_reserved(bool reserved) { _reserved = reserved; } + + // Release the reservation explicitly before destruction + void release() { + if (_reserved && _limiter != nullptr && _size > 0) { + _limiter->release_memory(_size); + _reserved = false; + } + } + + MemoryReservationGuard(const MemoryReservationGuard&) = delete; + MemoryReservationGuard& operator=(const MemoryReservationGuard&) = delete; + + private: + MemTableMemoryLimiter* _limiter; + int64_t _size; + bool _reserved; + }; + private: + friend class MemoryReservationGuard; static inline int64_t _sys_avail_mem_less_than_warning_water_mark(); static inline int64_t _process_used_mem_more_than_soft_mem_limit(); @@ -70,6 +124,7 @@ class MemTableMemoryLimiter { int64_t _flush_mem_usage = 0; int64_t _queue_mem_usage = 0; int64_t _active_mem_usage = 0; + int64_t _reserved_mem_usage = 0; // Memory reserved but not yet allocated // sum of all mem table memory. std::unique_ptr _mem_tracker; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index ec59a13fdc241d..82bfed40deb614 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -165,14 +165,48 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, // If this is a high priority load task, do not handle this. // because this may block for a while, which may lead to rpc timeout. SCOPED_TIMER(channel->get_handle_mem_limit_timer()); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [channel]() { return channel->is_cancelled(); }); + + // Estimate memory size: use protobuf message size * 3 as conservative estimate + // The actual deserialized block typically uses 1.5-3x the compressed protobuf size + int64_t estimated_mem = 0; + if (request.has_block()) { + estimated_mem = request.block().ByteSizeLong() * 3; + } + + auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); + // handle_memtable_flush will reserve the memory for us + mem_limiter->handle_memtable_flush([channel]() { return channel->is_cancelled(); }, + estimated_mem); if (channel->is_cancelled()) { + // Note: if cancelled, handle_memtable_flush may not have reserved memory + // so we don't need to release anything return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string()); } + + // Use RAII guard to ensure memory reservation is released after add_batch + // The reservation was already done in handle_memtable_flush, so we just track it for release + MemTableMemoryLimiter::MemoryReservationGuard guard(mem_limiter, estimated_mem); + // Only mark as reserved if we actually have memory to reserve + guard.set_reserved(estimated_mem > 0); + + // 3. add batch to load channel + // batch may not exist in request(eg: eos request without batch), + // this case will be handled in load channel's add batch method. + Status st = channel->add_batch(request, response); + // Guard will automatically release the reservation when going out of scope + if (UNLIKELY(!st.ok())) { + RETURN_IF_ERROR(channel->cancel()); + return st; + } + + // 4. handle finish + if (channel->is_finished()) { + _finish_load_channel(load_id); + } + return Status::OK(); } - // 3. add batch to load channel + // 3. add batch to load channel (high priority path without memory limit check) // batch may not exist in request(eg: eos request without batch), // this case will be handled in load channel's add batch method. Status st = channel->add_batch(request, response); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 6bc7bd6895cff7..a55bc64cd95bc6 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -579,14 +579,27 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block } { SCOPED_TIMER(_wait_mem_limit_timer); - ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( - [state = _state]() { return state->is_cancelled(); }); + // Estimate memory size: use block's allocated bytes as a conservative estimate + // This accounts for the memory that will be allocated during the write operation + int64_t estimated_mem = block->allocated_bytes(); + auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter(); + // handle_memtable_flush will reserve the memory for us + mem_limiter->handle_memtable_flush([state = _state]() { return state->is_cancelled(); }, + estimated_mem); if (_state->is_cancelled()) { return _state->cancel_reason(); } + + // Use RAII guard to ensure memory reservation is released after write + // The reservation was already done in handle_memtable_flush, so we just track it for release + MemTableMemoryLimiter::MemoryReservationGuard guard(mem_limiter, estimated_mem); + // Only mark as reserved if we actually have memory to reserve + guard.set_reserved(estimated_mem > 0); + + SCOPED_TIMER(_write_memtable_timer); + st = delta_writer->write(block.get(), rows.row_idxes); + // Guard will automatically release the reservation when going out of scope } - SCOPED_TIMER(_write_memtable_timer); - st = delta_writer->write(block.get(), rows.row_idxes); return st; }