diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp index 1f0556f4642110..b386da4d7c6c71 100644 --- a/be/src/olap/page_cache.cpp +++ b/be/src/olap/page_cache.cpp @@ -30,7 +30,8 @@ PageBase::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB if (use_cache) { _mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type); } else { - _mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + _mem_tracker_by_allocator = + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); } { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 3b40426f6ef12a..dcdf4b1a4b4293 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -51,12 +51,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( // _untracked_mem temporary store bytes that not synchronized to process reserved memory, // but bytes have been subtracted from thread _reserved_mem. doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); - _limiter_tracker->release_reserved(_untracked_mem); + _limiter_tracker_sptr->release_reserved(_untracked_mem); _reserved_mem = 0; _untracked_mem = 0; } _consumer_tracker_stack.clear(); - _limiter_tracker = mem_tracker; + _limiter_tracker_sptr = mem_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker( @@ -68,7 +69,8 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _reserved_mem = _last_attach_snapshots_stack.back().reserved_mem; _consumer_tracker_stack = _last_attach_snapshots_stack.back().consumer_tracker_stack; _last_attach_snapshots_stack.pop_back(); - _limiter_tracker = old_mem_tracker; + _limiter_tracker_sptr = old_mem_tracker; + _limiter_tracker = _limiter_tracker_sptr.get(); } void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index db3b32a6298820..e3a1409ddfc63f 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -93,11 +93,19 @@ class ThreadMemTrackerMgr { void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = new_val; } - std::shared_ptr limiter_mem_tracker() { + MemTrackerLimiter* limiter_mem_tracker() { CHECK(init()); return _limiter_tracker; } + // Prefer use `limiter_mem_tracker`, which is faster than `limiter_mem_tracker_sptr`. + // when multiple threads hold the same `std::shared_ptr` at the same time, + // modifying the `std::shared_ptr` reference count will be expensive when there is high concurrency. + std::shared_ptr limiter_mem_tracker_sptr() { + CHECK(init()); + return _limiter_tracker_sptr; + } + void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } [[nodiscard]] bool wait_gc() const { return _wait_gc; } @@ -141,7 +149,8 @@ class ThreadMemTrackerMgr { // A thread of query/load will only wait once during execution. bool _wait_gc = false; - std::shared_ptr _limiter_tracker; + std::shared_ptr _limiter_tracker_sptr {nullptr}; + MemTrackerLimiter* _limiter_tracker {nullptr}; std::vector _consumer_tracker_stack; std::weak_ptr _wg_wptr; @@ -156,7 +165,8 @@ inline bool ThreadMemTrackerMgr::init() { // 2. ExecEnv not initialized when thread start, initialized in limiter_mem_tracker(). if (_init) return true; if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { - _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker_sptr = ExecEnv::GetInstance()->orphan_mem_tracker(); + _limiter_tracker = _limiter_tracker_sptr.get(); _wait_gc = true; _init = true; return true; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index c89f532e5927a6..2aee48819c6998 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -28,7 +28,7 @@ class MemTracker; QueryThreadContext ThreadContext::query_thread_context() { DCHECK(doris::pthread_context_ptr_init); ORPHAN_TRACKER_CHECK(); - return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr}; + return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker_sptr(), _wg_wptr}; } void AttachTask::init(const QueryThreadContext& query_thread_context) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e0a44af69c1d66..30871399eedbba 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -235,7 +235,7 @@ class ThreadContext { // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr thread_mem_tracker_mgr; - [[nodiscard]] std::shared_ptr thread_mem_tracker() const { + [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { return thread_mem_tracker_mgr->limiter_mem_tracker(); } @@ -402,7 +402,8 @@ class QueryThreadContext { #ifndef BE_TEST ORPHAN_TRACKER_CHECK(); query_id = doris::thread_context()->task_id(); - query_mem_tracker = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + query_mem_tracker = + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); wg_wptr = doris::thread_context()->workload_group(); #else query_id = TUniqueId(); @@ -468,8 +469,8 @@ class SwitchThreadMemTrackerLimiter { const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); } } @@ -480,8 +481,8 @@ class SwitchThreadMemTrackerLimiter { query_thread_context.query_id); // workload group alse not change DCHECK(query_thread_context.query_mem_tracker); if (query_thread_context.query_mem_tracker != - thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr(); thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( query_thread_context.query_mem_tracker); } diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 17764b9e4f6ec1..474a50339dcd68 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -73,7 +73,8 @@ struct ByteBuffer : private Allocator { : pos(0), limit(capacity_), capacity(capacity_), - mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + mem_tracker_( + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) { ptr = reinterpret_cast(Allocator::alloc(capacity_)); }