From 22417fda64c39524a3e4dd22dee98d2849d8d627 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 10 May 2024 19:17:40 +0800 Subject: [PATCH] KVStore: Expand the usage of thread-wise alloc/dealloc trace (#9003) close pingcap/tiflash#8835 --- dbms/src/Common/TiFlashMetrics.cpp | 44 +++- dbms/src/Common/TiFlashMetrics.h | 9 +- dbms/src/Interpreters/Context.cpp | 21 +- dbms/src/Interpreters/Context.h | 5 + dbms/src/Server/Server.cpp | 5 +- .../KVStore/FFI/JointThreadAllocInfo.cpp | 192 ++++++++++++++++++ .../KVStore/FFI/JointThreadAllocInfo.h | 99 +++++++++ dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 5 +- dbms/src/Storages/KVStore/KVStore.cpp | 136 +------------ dbms/src/Storages/KVStore/KVStore.h | 37 +--- .../KVStore/tests/gtest_new_kvstore.cpp | 41 +++- .../Storages/KVStore/tests/kvstore_helper.h | 1 + dbms/src/TestUtils/TiFlashTestEnv.cpp | 2 + 13 files changed, 411 insertions(+), 186 deletions(-) create mode 100644 dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp create mode 100644 dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index 9913b18e640..7a2b6aa835e 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -99,33 +99,59 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id) registered_keyspace_sync_replica_ru.erase(itr); } -double TiFlashMetrics::getProxyThreadMemory(const std::string & k) +static std::string genPrefix(TiFlashMetrics::MemoryAllocType type, const std::string & k) +{ + if (type == TiFlashMetrics::MemoryAllocType::Alloc) + { + return "alloc_" + k; + } + else + { + return "dealloc_" + k; + } +} + +double TiFlashMetrics::getProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k) { std::shared_lock lock(proxy_thread_report_mtx); - auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k); + + auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k)); RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end(), k); return it->second->Value(); } -void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v) +void TiFlashMetrics::setProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k, Int64 v) { std::shared_lock lock(proxy_thread_report_mtx); - if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) + auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k)); + if unlikely (it == registered_raft_proxy_thread_memory_usage_metrics.end()) { // New metrics added through `Reset`. return; } - registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); + it->second->Set(v); } void TiFlashMetrics::registerProxyThreadMemory(const std::string & k) { std::unique_lock lock(proxy_thread_report_mtx); - if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) { - registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Alloc, k); + if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix)) + { + registered_raft_proxy_thread_memory_usage_metrics.emplace( + prefix, + ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}})); + } + } + { + auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Dealloc, k); + if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix)) + { + registered_raft_proxy_thread_memory_usage_metrics.emplace( + prefix, + ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}})); + } } } diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 99ed06ac1f3..1a987005627 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -1114,8 +1114,13 @@ class TiFlashMetrics void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru); UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id); - void setProxyThreadMemory(const std::string & k, Int64 v); - double getProxyThreadMemory(const std::string & k); + enum class MemoryAllocType + { + Alloc = 1, + Dealloc = 2, + }; + void setProxyThreadMemory(MemoryAllocType type, const std::string & k, Int64 v); + double getProxyThreadMemory(MemoryAllocType type, const std::string & k); void registerProxyThreadMemory(const std::string & k); private: diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 1ba7e55c81d..0a9a47e2560 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include @@ -60,6 +59,7 @@ #include #include #include +#include #include #include #include @@ -180,6 +180,8 @@ struct ContextShared /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. + JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map + class SessionKeyHash { public: @@ -278,6 +280,11 @@ struct ContextShared tmt_context->shutdown(); } + if (joint_memory_allocation_map) + { + joint_memory_allocation_map->stopThreadAllocInfo(); + } + if (schema_sync_service) { schema_sync_service = nullptr; @@ -1753,6 +1760,18 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const return shared->global_storage_pool; } + +void Context::initializeJointThreadInfoJeallocMap() +{ + auto lock = getLock(); + shared->joint_memory_allocation_map = std::make_shared(); +} + +JointThreadInfoJeallocMapPtr Context::getJointThreadInfoJeallocMap() const +{ + return shared->joint_memory_allocation_map; +} + /** * This PageStorage is initialized in two cases: * 1. Not in disaggregated mode. diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 610ef892a10..59d9ff90de3 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -100,6 +100,8 @@ using MockMPPServerInfo = DB::tests::MockMPPServerInfo; class TiFlashSecurityConfig; using TiFlashSecurityConfigPtr = std::shared_ptr; class MockStorage; +struct JointThreadInfoJeallocMap; +using JointThreadInfoJeallocMapPtr = std::shared_ptr; enum class PageStorageRunMode : UInt8; namespace DM @@ -458,6 +460,9 @@ class Context bool tryUploadAllDataToRemoteStore() const; void tryReleaseWriteNodePageStorageForTest(); + void initializeJointThreadInfoJeallocMap(); + JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const; + SharedContextDisaggPtr getSharedContextDisagg() const; /// Call after initialization before using system logs. Call for global context. diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3da05ad9ba3..6162fe68968 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1089,6 +1089,9 @@ int Server::main(const std::vector & /*args*/) global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode; global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler; + // Must init this before KVStore. + global_context->initializeJointThreadInfoJeallocMap(); + /// Init File Provider if (proxy_conf.is_proxy_runnable) { @@ -1717,8 +1720,6 @@ int Server::main(const std::vector & /*args*/) LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); exit(-1); } - LOG_INFO(log, "Stop collecting thread alloc metrics"); - tmt_context.getKVStore()->stopThreadAllocInfo(); LOG_INFO(log, "Set store context status Stopping"); tmt_context.setStatusStopping(); { diff --git a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp new file mode 100644 index 00000000000..36ae4ffde2c --- /dev/null +++ b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp @@ -0,0 +1,192 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +JointThreadInfoJeallocMap::JointThreadInfoJeallocMap() +{ + monitoring_thread = new std::thread([&]() { + setThreadName("ThreadMemoryTracer"); + while (true) + { + using namespace std::chrono_literals; + std::unique_lock l(monitoring_mut); + monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; }); + if (is_terminated) + return; + recordThreadAllocInfo(); + } + }); +} + +void JointThreadInfoJeallocMap::recordThreadAllocInfo() +{ + recordThreadAllocInfoForKVStore(); +} + +JointThreadInfoJeallocMap::~JointThreadInfoJeallocMap() +{ + stopThreadAllocInfo(); +} + +static std::string getThreadNameAggPrefix(const std::string_view & s) +{ + if (auto pos = s.find_last_of('-'); pos != std::string::npos) + { + return std::string(s.begin(), s.begin() + pos); + } + return std::string(s.begin(), s.end()); +} + +void JointThreadInfoJeallocMap::reportThreadAllocInfoImpl( + std::unordered_map & m, + std::string_view thdname, + ReportThreadAllocateInfoType type, + uint64_t value) +{ + // Many threads have empty name, better just not handle. + if (thdname.empty()) + return; + std::string tname(thdname.begin(), thdname.end()); + switch (type) + { + case ReportThreadAllocateInfoType::Reset: + { + auto & metrics = TiFlashMetrics::instance(); + metrics.registerProxyThreadMemory(getThreadNameAggPrefix(tname)); + { + std::unique_lock l(memory_allocation_mut); + m.insert_or_assign(tname, ThreadInfoJealloc()); + } + break; + } + case ReportThreadAllocateInfoType::Remove: + { + std::unique_lock l(memory_allocation_mut); + m.erase(tname); + break; + } + case ReportThreadAllocateInfoType::AllocPtr: + { + std::shared_lock l(memory_allocation_mut); + if (value == 0) + return; + auto it = m.find(tname); + if unlikely (it == m.end()) + { + return; + } + it->second.allocated_ptr = value; + break; + } + case ReportThreadAllocateInfoType::DeallocPtr: + { + std::shared_lock l(memory_allocation_mut); + if (value == 0) + return; + auto it = m.find(tname); + if unlikely (it == m.end()) + { + return; + } + it->second.deallocated_ptr = value; + break; + } + } +} + +void JointThreadInfoJeallocMap::reportThreadAllocInfoForKVStore( + std::string_view thdname, + ReportThreadAllocateInfoType type, + uint64_t value) +{ + reportThreadAllocInfoImpl(kvstore_map, thdname, type, value); +} + +static const std::unordered_set PROXY_RECORD_WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"}; + +void JointThreadInfoJeallocMap::recordThreadAllocInfoForKVStore() +{ + std::shared_lock l(memory_allocation_mut); + std::unordered_map agg_allocate; + std::unordered_map agg_deallocate; + for (const auto & [k, v] : kvstore_map) + { + auto agg_thread_name = getThreadNameAggPrefix(k); + // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. + if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.has_ptr()) + { + agg_allocate[agg_thread_name] += v.allocated(); + agg_deallocate[agg_thread_name] += v.deallocated(); + } + } + for (const auto & [k, v] : agg_allocate) + { + auto & tiflash_metrics = TiFlashMetrics::instance(); + tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, v); + } + for (const auto & [k, v] : agg_deallocate) + { + auto & tiflash_metrics = TiFlashMetrics::instance(); + tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, v); + } +} + +void JointThreadInfoJeallocMap::reportThreadAllocBatchForKVStore( + std::string_view name, + ReportThreadAllocateInfoBatch data) +{ + // Many threads have empty name, better just not handle. + if (name.empty()) + return; + // TODO(jemalloc-trace) Could be costy. + auto k = getThreadNameAggPrefix(name); + auto & tiflash_metrics = TiFlashMetrics::instance(); + tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, data.alloc); + tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, data.dealloc); +} + +void JointThreadInfoJeallocMap::stopThreadAllocInfo() +{ + LOG_INFO(DB::Logger::get(), "Stop collecting thread alloc metrics"); + { + std::unique_lock lk(monitoring_mut); + // Only one caller can successfully stop the thread. + if (is_terminated) + return; + if (monitoring_thread == nullptr) + return; + is_terminated = true; + monitoring_cv.notify_all(); + } + LOG_INFO(DB::Logger::get(), "JointThreadInfoJeallocMap shutdown, wait thread alloc monitor join"); + monitoring_thread->join(); + { + std::unique_lock lk(monitoring_mut); + delete monitoring_thread; + monitoring_thread = nullptr; + } +} + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h new file mode 100644 index 00000000000..62ccb661f11 --- /dev/null +++ b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h @@ -0,0 +1,99 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ + +enum class ReportThreadAllocateInfoType : uint64_t; +struct ReportThreadAllocateInfoBatch; + +struct ThreadInfoJealloc +{ + uint64_t allocated_ptr{0}; + uint64_t deallocated_ptr{0}; + + bool has_ptr() const { return allocated_ptr != 0 && deallocated_ptr != 0; } + + uint64_t allocated() const + { + if (allocated_ptr == 0) + return 0; + return *reinterpret_cast(allocated_ptr); + } + uint64_t deallocated() const + { + if (deallocated_ptr == 0) + return 0; + return *reinterpret_cast(deallocated_ptr); + } + int64_t remaining() const + { + uint64_t a = allocated(); + uint64_t d = deallocated(); + if (a > d) + { + return static_cast(a - d); + } + else + { + return -static_cast(d - a); + } + } +}; + +/// Works in two different ways: +/// NOTE in both ways, call reportThreadAllocInfo to register by `Reset` for every thread to be monitored. +/// And call reportThreadAllocInfo to deregister by `Remove` for every thread that is guaranteed to no longer be monitored. +/// - Register by reportThreadAllocInfo with AllocPtr/DellocPtr +/// In this way, by recordThreadAllocInfo the routine thread will update the allocation info. +/// One must guarantee that the thread must exist before `Remove`. +/// - Directly report by reportThreadAllocBatch +/// The call will immedialy update the alloc info of the specified thread. +struct JointThreadInfoJeallocMap +{ + JointThreadInfoJeallocMap(); + ~JointThreadInfoJeallocMap(); + /// Called by a bg thread as a routine work. + void recordThreadAllocInfoForKVStore(); + void recordThreadAllocInfo(); + void stopThreadAllocInfo(); + + /// For those everlasting threads, we can directly access their allocatedp/allocatedp. + void reportThreadAllocInfoForKVStore(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); + /// For those threads with shorter life, we can only update in their call chain. + static void reportThreadAllocBatchForKVStore(std::string_view, ReportThreadAllocateInfoBatch data); + + mutable std::shared_mutex memory_allocation_mut; + std::unordered_map kvstore_map; + +private: + void reportThreadAllocInfoImpl( + std::unordered_map &, + std::string_view, + ReportThreadAllocateInfoType type, + uint64_t value); + bool is_terminated{false}; + mutable std::mutex monitoring_mut; + std::condition_variable monitoring_cv; + std::thread * monitoring_thread{nullptr}; +}; + +using JointThreadInfoJeallocMapPtr = std::shared_ptr; + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index f979acce1dc..5b66709bff2 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1059,9 +1059,10 @@ void ReportThreadAllocateBatch( { try { - UNUSED(server); UNUSED(tid); - KVStore::reportThreadAllocBatch(buffToStrView(name), data); + if (!server || !server->tmt || !server->tmt->getKVStore()) + return; + server->tmt->getKVStore()->reportThreadAllocBatch(buffToStrView(name), data); } catch (...) { diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index b16499c4dee..e5038fe1fc9 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -68,17 +68,11 @@ KVStore::KVStore(Context & context) { // default config about compact-log: rows 40k, bytes 32MB, gap 200. LOG_INFO(log, "KVStore inited, eager_raft_log_gc_enabled={}", eager_raft_log_gc_enabled); - using namespace std::chrono_literals; - monitoring_thread = new std::thread([&]() { - while (true) - { - std::unique_lock l(monitoring_mut); - monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; }); - if (is_terminated) - return; - recordThreadAllocInfo(); - } - }); + joint_memory_allocation_map = context.getJointThreadInfoJeallocMap(); + if (joint_memory_allocation_map == nullptr) + { + LOG_WARNING(log, "JointThreadInfoJeallocMap is not inited from context"); + } } void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper) @@ -436,7 +430,6 @@ void KVStore::StoreMeta::update(Base && base_) KVStore::~KVStore() { LOG_INFO(log, "Destroy KVStore"); - stopThreadAllocInfo(); releaseReadIndexWorkers(); LOG_INFO(log, "Destroy KVStore Finished"); } @@ -496,126 +489,15 @@ RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const return region_manager.genRegionTaskLock(region_id); } -static std::string getThreadNameAggPrefix(const std::string_view & s) -{ - if (auto pos = s.find_last_of('-'); pos != std::string::npos) - { - return std::string(s.begin(), s.begin() + pos); - } - return std::string(s.begin(), s.end()); -} - -void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) -{ - // Many threads have empty name, better just not handle. - if (thdname.empty()) - return; - std::string tname(thdname.begin(), thdname.end()); - switch (type) - { - case ReportThreadAllocateInfoType::Reset: - { - auto & metrics = TiFlashMetrics::instance(); - metrics.registerProxyThreadMemory(getThreadNameAggPrefix(tname)); - { - std::unique_lock l(memory_allocation_mut); - memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); - } - break; - } - case ReportThreadAllocateInfoType::Remove: - { - std::unique_lock l(memory_allocation_mut); - memory_allocation_map.erase(tname); - break; - } - case ReportThreadAllocateInfoType::AllocPtr: - { - std::shared_lock l(memory_allocation_mut); - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) - { - return; - } - it->second.allocated_ptr = value; - break; - } - case ReportThreadAllocateInfoType::DeallocPtr: - { - std::shared_lock l(memory_allocation_mut); - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) - { - return; - } - it->second.deallocated_ptr = value; - break; - } - } -} - -static const std::unordered_set RECORD_WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"}; -/// For those everlasting threads, we can directly access their allocatedp/allocatedp. -void KVStore::recordThreadAllocInfo() +void KVStore::reportThreadAllocInfo(std::string_view v, ReportThreadAllocateInfoType type, uint64_t value) { - std::shared_lock l(memory_allocation_mut); - std::unordered_map agg_remaining; - for (const auto & [k, v] : memory_allocation_map) - { - auto agg_thread_name = getThreadNameAggPrefix(std::string_view(k.data(), k.size())); - // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. - if (RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) - { - auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); - it->second += v.remaining(); - } - } - for (const auto & [k, v] : agg_remaining) - { - auto & tiflash_metrics = TiFlashMetrics::instance(); - tiflash_metrics.setProxyThreadMemory(k, v); - } + joint_memory_allocation_map->reportThreadAllocInfoForKVStore(v, type, value); } -/// For those threads with shorter life, we must only update in their call chain. -void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) +void KVStore::reportThreadAllocBatch(std::string_view v, ReportThreadAllocateInfoBatch data) { - // Many threads have empty name, better just not handle. - if (name.empty()) - return; - // TODO(jemalloc-trace) Could be costy. - auto k = getThreadNameAggPrefix(name); - int64_t v = 0; - if (data.alloc > data.dealloc) - { - v = data.alloc - data.dealloc; - } - else - { - v = -(data.dealloc - data.alloc); - } - auto & tiflash_metrics = TiFlashMetrics::instance(); - tiflash_metrics.setProxyThreadMemory(k, v); -} - -void KVStore::stopThreadAllocInfo() -{ - { - std::unique_lock lk(monitoring_mut); - if (monitoring_thread == nullptr) - return; - is_terminated = true; - monitoring_cv.notify_all(); - } - LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join"); - monitoring_thread->join(); - delete monitoring_thread; - monitoring_thread = nullptr; + joint_memory_allocation_map->reportThreadAllocBatchForKVStore(v, data); } } // namespace DB diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 599e788b57f..162dbb21d22 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -48,9 +49,6 @@ namespace tests class KVStoreTestBase; } // namespace tests -enum class ReportThreadAllocateInfoType : uint64_t; -struct ReportThreadAllocateInfoBatch; - class IAST; using ASTPtr = std::shared_ptr; using ASTs = std::vector; @@ -122,26 +120,6 @@ struct ProxyConfigSummary size_t snap_handle_pool_size = 0; }; -struct ThreadInfoJealloc -{ - uint64_t allocated_ptr{0}; - uint64_t deallocated_ptr{0}; - - uint64_t allocated() const - { - if (allocated_ptr == 0) - return 0; - return *reinterpret_cast(allocated_ptr); - } - uint64_t deallocated() const - { - if (deallocated_ptr == 0) - return 0; - return *reinterpret_cast(deallocated_ptr); - } - int64_t remaining() const { return static_cast(allocated()) - static_cast(deallocated()); } -}; - /// KVStore manages raft replication and transactions. /// - Holds all regions in this TiFlash store. /// - Manages region -> table mapping. @@ -171,9 +149,8 @@ class KVStore final : private boost::noncopyable // Proxy will validate and refit the config items from the toml file. const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } void reportThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); - static void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); - void recordThreadAllocInfo(); - void stopThreadAllocInfo(); + void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); + JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const { return joint_memory_allocation_map; } public: // Region Management void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *); @@ -438,13 +415,7 @@ class KVStore final : private boost::noncopyable std::atomic ongoing_prehandle_task_count{0}; ProxyConfigSummary proxy_config_summary; - mutable std::shared_mutex memory_allocation_mut; - std::unordered_map memory_allocation_map; - - bool is_terminated{false}; - mutable std::mutex monitoring_mut; - std::condition_variable monitoring_cv; - std::thread * monitoring_thread{nullptr}; + JointThreadInfoJeallocMapPtr joint_memory_allocation_map; }; /// Encapsulation of lock guard of task mutex in KVStore diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index d483c038503..7d36094e3d5 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -912,43 +912,64 @@ try std::string_view(name.data(), name.size()), ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); auto & tiflash_metrics = TiFlashMetrics::instance(); - ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "test1"), 1); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, "test1"), 2); std::string namee = ""; kvs.reportThreadAllocBatch( std::string_view(namee.data(), namee.size()), ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); - EXPECT_ANY_THROW(tiflash_metrics.getProxyThreadMemory("")); + EXPECT_ANY_THROW(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "")); std::thread t([&]() { - kvs.reportThreadAllocInfo(std::string_view(name.data(), name.size()), ReportThreadAllocateInfoType::Reset, 0); + // For names not in `PROXY_RECORD_WHITE_LIST_THREAD_PREFIX`, the record operation will not update. + std::string name1 = "test11-1"; + kvs.reportThreadAllocInfo(std::string_view(name1.data(), name1.size()), ReportThreadAllocateInfoType::Reset, 0); uint64_t mock = 999; uint64_t alloc_ptr = reinterpret_cast(&mock); kvs.reportThreadAllocInfo( - std::string_view(name.data(), name.size()), + std::string_view(name1.data(), name1.size()), ReportThreadAllocateInfoType::AllocPtr, alloc_ptr); - kvs.recordThreadAllocInfo(); - ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); + kvs.joint_memory_allocation_map->recordThreadAllocInfoForKVStore(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "test11"), 0); + // recordThreadAllocInfoForKVStore can't override if not all alloc/dealloc are provided. std::string name2 = "ReadIndexWkr-1"; kvs.reportThreadAllocInfo(std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::Reset, 0); + kvs.reportThreadAllocBatch( + std::string_view(name2.data(), name2.size()), + ReportThreadAllocateInfoBatch{.alloc = 111, .dealloc = 222}); kvs.reportThreadAllocInfo( std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::AllocPtr, alloc_ptr); - kvs.recordThreadAllocInfo(); - ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("ReadIndexWkr"), 999); + kvs.joint_memory_allocation_map->recordThreadAllocInfoForKVStore(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "ReadIndexWkr"), 111); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, "ReadIndexWkr"), 222); + + // recordThreadAllocInfoForKVStore will override if all alloc/dealloc are both provided, + // because the infomation from pointer is always the newest. uint64_t mock2 = 998; uint64_t dealloc_ptr = reinterpret_cast(&mock2); kvs.reportThreadAllocInfo( std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::DeallocPtr, dealloc_ptr); - kvs.recordThreadAllocInfo(); - ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("ReadIndexWkr"), 1); + kvs.joint_memory_allocation_map->recordThreadAllocInfoForKVStore(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "ReadIndexWkr"), 999); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, "ReadIndexWkr"), 998); kvs.reportThreadAllocInfo( std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::Remove, 0); + + + kvs.reportThreadAllocInfo(std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::Reset, 0); + kvs.reportThreadAllocBatch( + std::string_view(name2.data(), name2.size()), + ReportThreadAllocateInfoBatch{.alloc = 111, .dealloc = 222}); + kvs.joint_memory_allocation_map->recordThreadAllocInfoForKVStore(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "ReadIndexWkr"), 111); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, "ReadIndexWkr"), 222); }); t.join(); } diff --git a/dbms/src/Storages/KVStore/tests/kvstore_helper.h b/dbms/src/Storages/KVStore/tests/kvstore_helper.h index c6af95f0c62..af7e3102ec1 100644 --- a/dbms/src/Storages/KVStore/tests/kvstore_helper.h +++ b/dbms/src/Storages/KVStore/tests/kvstore_helper.h @@ -111,6 +111,7 @@ class KVStoreTestBase : public ::testing::Test auto & global_ctx = TiFlashTestEnv::getGlobalContext(); global_ctx.tryReleaseWriteNodePageStorageForTest(); global_ctx.initializeWriteNodePageStorageIfNeed(*path_pool); + global_ctx.initializeJointThreadInfoJeallocMap(); } KVStore & reloadKVSFromDisk(bool with_reset = true) { diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index a906a48c3c1..6a937f47031 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -158,6 +158,7 @@ void TiFlashTestEnv::addGlobalContext( global_context->initializeGlobalPageIdAllocator(); global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool()); + global_context->initializeJointThreadInfoJeallocMap(); LOG_INFO(Logger::get(), "Storage mode : {}", static_cast(global_context->getPageStorageRunMode())); TiFlashRaftConfig raft_config; @@ -199,6 +200,7 @@ ContextPtr TiFlashTestEnv::getContext(const DB::Settings & settings, Strings tes global_contexts[0]->initializeGlobalStoragePoolIfNeed(context.getPathPool()); global_contexts[0]->tryReleaseWriteNodePageStorageForTest(); global_contexts[0]->initializeWriteNodePageStorageIfNeed(context.getPathPool()); + global_contexts[0]->initializeJointThreadInfoJeallocMap(); context.getSettingsRef() = settings; return std::make_shared(context); }