diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 5b680d9e054fc6..7e1129ba4de6bc 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -33,7 +33,8 @@ MemoryTrackedPageBase::MemoryTrackedPageBase(size_t size, bool use_cache, if (use_cache) { _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); } else { - _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + _mem_tracker_by_allocator = + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index d0f9482d7c2846..3339fd680ab171 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -30,17 +30,18 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( CHECK(init()); flush_untracked_mem(); _last_attach_snapshots_stack.push_back( - {_limiter_tracker, _wg_wptr, _reserved_mem, _consumer_tracker_stack}); + {_limiter_tracker_sptr, _wg_wptr, _reserved_mem, _consumer_tracker_stack}); if (_reserved_mem != 0) { // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. doris::GlobalMemoryArbitrator::shrink_process_reserved(_untracked_mem); - _limiter_tracker->shrink_reserved(_untracked_mem); + _limiter_tracker_sptr->shrink_reserved(_untracked_mem); _reserved_mem = 0; _untracked_mem = 0; } _consumer_tracker_stack.clear(); - _limiter_tracker = mem_tracker; + _limiter_tracker_sptr = mem_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); } void ThreadMemTrackerMgr::attach_limiter_tracker( @@ -55,7 +56,8 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() { flush_untracked_mem(); shrink_reserved(); DCHECK(!_last_attach_snapshots_stack.empty()); - _limiter_tracker = _last_attach_snapshots_stack.back().limiter_tracker; + _limiter_tracker_sptr = _last_attach_snapshots_stack.back().limiter_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); _wg_wptr = _last_attach_snapshots_stack.back().wg_wptr; _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 62a07c69e21fc6..ecffc954925041 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -89,11 +89,19 @@ class ThreadMemTrackerMgr { void shrink_reserved(); - std::shared_ptr limiter_mem_tracker() { + MemTrackerLimiter* limiter_mem_tracker() { CHECK(init()); return _limiter_tracker; } + // Prefer use `limiter_mem_tracker`, which is faster than `limiter_mem_tracker_sptr`. + // when multiple threads hold the same `std::shared_ptr` at the same time, + // modifying the `std::shared_ptr` reference count will be expensive when there is high concurrency. + std::shared_ptr limiter_mem_tracker_sptr() { + CHECK(init()); + return _limiter_tracker_sptr; + } + void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } [[nodiscard]] bool wait_gc() const { return _wait_gc; } @@ -106,7 +114,7 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), limiter_mem_tracker()->make_profile_str(), + std::to_string(_untracked_mem), _limiter_tracker->make_profile_str(), fmt::to_string(consumer_tracker_buf)); } @@ -149,7 +157,8 @@ class ThreadMemTrackerMgr { // A thread of query/load will only wait once during execution. bool _wait_gc = false; - std::shared_ptr _limiter_tracker {nullptr}; + std::shared_ptr _limiter_tracker_sptr {nullptr}; + MemTrackerLimiter* _limiter_tracker {nullptr}; std::vector _consumer_tracker_stack; std::weak_ptr _wg_wptr; @@ -164,7 +173,8 @@ inline bool ThreadMemTrackerMgr::init() { return true; } if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { - _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker_sptr = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker = _limiter_tracker_sptr.get(); _wait_gc = true; _init = true; return true; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 206272ec675c7a..6a8457bfe8b134 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -93,7 +93,7 @@ SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); is_switched_ = true; } diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 17764b9e4f6ec1..474a50339dcd68 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -73,7 +73,8 @@ struct ByteBuffer : private Allocator { : pos(0), limit(capacity_), capacity(capacity_), - mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + mem_tracker_( + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { ptr = reinterpret_cast(Allocator::alloc(capacity_)); }