From 5987e73f1107b14fb6be44bc66382b5deae074da Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Wed, 13 Mar 2024 18:27:17 +0800 Subject: [PATCH 01/37] z Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.h | 6 +++ dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 20 +++++++ dbms/src/Storages/KVStore/FFI/ProxyFFI.h | 5 +- dbms/src/Storages/KVStore/KVStore.cpp | 61 ++++++++++++++++++++++ dbms/src/Storages/KVStore/KVStore.h | 28 ++++++++++ 5 files changed, 119 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 47d401b149a..1943d982e61 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -188,6 +188,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva Histogram, \ F(type_sync_schema_apply_duration, {{"type", "sync_schema_duration"}}, ExpBuckets{0.001, 2, 20}), \ F(type_sync_table_schema_apply_duration, {{"type", "sync_table_schema_duration"}}, ExpBuckets{0.001, 2, 20})) \ + M(tiflash_raft_proxy_thread_memory_usage, "Memory Usage of Proxy by thread", Gauge, \ + F(type_raftstore, {"type", "raftstore"}), \ + F(type_apply_low, {"type", "apply_low"}), \ + F(type_sst_importer, {"type", "sst_importer"}), \ + F(type_region_task, {"type", "region_task"}), \ + F(type_apply, {"type", "apply"})) \ M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ M(tiflash_stale_read_count, "Total number of stale read", Counter) \ M(tiflash_raft_read_index_duration_seconds, \ diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 05c60dc3ab9..e103abddc1e 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1030,4 +1030,24 @@ BaseBuffView GetLockByKey(const EngineStoreServerWrap * server, uint64_t region_ } } +void ReportThreadAllocateInfo(EngineStoreServerWrap * server, BaseBuffView name, uint64_t type, uint64_t value) +{ + try + { + LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 1"); + if(server == nullptr) return; + LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 2"); + if(server->tmt == nullptr) return; + LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 3"); + if(server->tmt->getKVStore() == nullptr) return; + LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 4"); + server->tmt->getKVStore()->registerThreadAllocInfo(buffToStrView(name), type, value); + } + catch (...) + { + tryLogCurrentFatalException(__PRETTY_FUNCTION__); + exit(-1); + } +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h index fa87b53b97e..85998207ba6 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h @@ -206,6 +206,9 @@ FapSnapshotState QueryFapSnapshotState( uint64_t term); void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id); bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id); +void ReportThreadAllocateInfo(EngineStoreServerWrap *, + BaseBuffView name, uint64_t type, + uint64_t value); } inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap * tiflash_instance_wrap) @@ -257,7 +260,7 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap .fn_fast_add_peer = FastAddPeer, .fn_query_fap_snapshot_state = QueryFapSnapshotState, .fn_clear_fap_snapshot = ClearFapSnapshot, - .fn_kvstore_region_exists = KvstoreRegionExists, + .fn_report_thread_allocate_info = ReportThreadAllocateInfo, }; } diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 2bf4b1cfcd4..0956582b6ff 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -68,6 +68,17 @@ KVStore::KVStore(Context & context) { // default config about compact-log: rows 40k, bytes 32MB, gap 200. LOG_INFO(log, "KVStore inited, eager_raft_log_gc_enabled={}", eager_raft_log_gc_enabled); + using namespace std::chrono_literals; + monitoring_thread = new std::thread([&](){ + while(true) { + std::unique_lock l(monitoring_mut); + monitoring_cv.wait_for(l, 5000ms, [&](){ + return is_terminated; + }); + if (is_terminated) return; + reportThreadAllocInfo(); + } + }); } void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper) @@ -425,6 +436,13 @@ void KVStore::StoreMeta::update(Base && base_) KVStore::~KVStore() { LOG_INFO(log, "Destroy KVStore"); + { + std::unique_lock lk(monitoring_mut); + is_terminated = true; + monitoring_cv.notify_all(); + } + monitoring_thread->join(); + delete monitoring_thread; releaseReadIndexWorkers(); } @@ -483,4 +501,47 @@ RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const return region_manager.genRegionTaskLock(region_id); } +static std::string getThreadNameAggPrefix(const std::string & s) { + if(auto pos = s.find_last_of('-'); pos != std::string::npos) { + return s.substr(0, pos); + } + return s; +} + +void KVStore::registerThreadAllocInfo(std::string_view thdname, uint64_t type, uint64_t value) { + if (value == 0) return; + std::unique_lock l(memory_allocation_mut); + LOG_INFO(DB::Logger::get(), "!!!!!!! RRR register"); + auto [it, ok] = memory_allocation_map.emplace(thdname, ThreadInfoJealloc()); + if (type == 0) { + it->second.allocated_ptr = value; + } else if (type == 1) { + it->second.deallocated_ptr = value; + } +} + +void KVStore::reportThreadAllocInfo() { + std::shared_lock l(memory_allocation_mut); + std::unordered_map agg_remaining; + for (const auto & [k, v]: memory_allocation_map) { + auto agg_thread_name = getThreadNameAggPrefix(k); + auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); + it->second += v.remaining(); + } + + for (const auto & [k, v]: agg_remaining) { + if(k == "raftstore") { + GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_raftstore).Set(v); + } else if(k == "apply") { + GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply).Set(v); + } else if(k == "apply-low") { + GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply_low).Set(v); + } else if(k == "sst-importer") { + GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_sst_importer).Set(v); + } else if(k == "region-task") { + GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_region_task).Set(v); + } + } +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 4efe6e28d4c..1a678aaf50e 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -119,6 +119,24 @@ struct ProxyConfigSummary size_t snap_handle_pool_size = 0; }; +struct ThreadInfoJealloc { + uint64_t allocated_ptr{0}; + uint64_t deallocated_ptr{0}; + + uint64_t allocated() const { + if(allocated_ptr == 0) return 0; + return *reinterpret_cast(allocated_ptr); + } + uint64_t deallocated() const { + if(deallocated_ptr == 0) return 0; + return *reinterpret_cast(deallocated_ptr); + } + int64_t remaining() const { + RUNTIME_CHECK(allocated_ptr != 0); + return static_cast(allocated()) - static_cast(deallocated()); + } +}; + /// KVStore manages raft replication and transactions. /// - Holds all regions in this TiFlash store. /// - Manages region -> table mapping. @@ -147,6 +165,8 @@ class KVStore final : private boost::noncopyable FileUsageStatistics getFileUsageStatistics() const; // Proxy will validate and refit the config items from the toml file. const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } + void registerThreadAllocInfo(std::string_view, uint64_t type, uint64_t value); + void reportThreadAllocInfo(); public: // Region Management void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *); @@ -410,6 +430,14 @@ class KVStore final : private boost::noncopyable // we can't have access to these codes though. std::atomic ongoing_prehandle_task_count{0}; ProxyConfigSummary proxy_config_summary; + + mutable std::shared_mutex memory_allocation_mut; + std::unordered_map memory_allocation_map; + + bool is_terminated{false}; + mutable std::mutex monitoring_mut; + std::condition_variable monitoring_cv; + std::thread * monitoring_thread{nullptr}; }; /// Encapsulation of lock guard of task mutex in KVStore From 79d847c66c3a57c1763ec3d92e84f5e76251cc63 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 14 Mar 2024 14:10:03 +0800 Subject: [PATCH 02/37] instant Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.cpp | 2 + dbms/src/Common/TiFlashMetrics.h | 12 +- dbms/src/Server/Server.cpp | 2 + dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 38 ++++-- dbms/src/Storages/KVStore/FFI/ProxyFFI.h | 10 +- dbms/src/Storages/KVStore/KVStore.cpp | 135 +++++++++++++++------ dbms/src/Storages/KVStore/KVStore.h | 32 +++-- 7 files changed, 170 insertions(+), 61 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index 2747df404ce..e85cb569413 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -58,6 +58,8 @@ TiFlashMetrics::TiFlashMetrics() .Name("tiflash_storage_sync_replica_ru") .Help("RU for synchronous replica of keyspace") .Register(*registry); + registered_raft_proxy_thread_memory_usage_family + = &prometheus::BuildGauge().Name(raft_proxy_thread_memory_usage).Help("").Register(*registry); } void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 1943d982e61..fad1c3ae86f 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -34,6 +34,7 @@ namespace DB { +class KVStore; constexpr size_t RAFT_REGION_BIG_WRITE_THRES = 2 * 1024; constexpr size_t RAFT_REGION_BIG_WRITE_MAX = 4 * 1024 * 1024; // raft-entry-max-size = 8MiB static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Invalid RAFT_REGION_BIG_WRITE_THRES"); @@ -188,12 +189,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva Histogram, \ F(type_sync_schema_apply_duration, {{"type", "sync_schema_duration"}}, ExpBuckets{0.001, 2, 20}), \ F(type_sync_table_schema_apply_duration, {{"type", "sync_table_schema_duration"}}, ExpBuckets{0.001, 2, 20})) \ - M(tiflash_raft_proxy_thread_memory_usage, "Memory Usage of Proxy by thread", Gauge, \ - F(type_raftstore, {"type", "raftstore"}), \ - F(type_apply_low, {"type", "apply_low"}), \ - F(type_sst_importer, {"type", "sst_importer"}), \ - F(type_region_task, {"type", "region_task"}), \ - F(type_apply, {"type", "apply"})) \ M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ M(tiflash_stale_read_count, "Total number of stale read", Counter) \ M(tiflash_raft_read_index_duration_seconds, \ @@ -1096,6 +1091,7 @@ class TiFlashMetrics UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id); private: + friend class KVStore; TiFlashMetrics(); prometheus::Counter * getReplicaSyncRUCounter(UInt32 keyspace_id, std::unique_lock &); @@ -1104,6 +1100,7 @@ class TiFlashMetrics static constexpr auto profile_events_prefix = "tiflash_system_profile_event_"; static constexpr auto current_metrics_prefix = "tiflash_system_current_metric_"; static constexpr auto async_metrics_prefix = "tiflash_system_asynchronous_metric_"; + static constexpr auto raft_proxy_thread_memory_usage = "tiflash_raft_proxy_thread_memory_usage"; std::shared_ptr registry = std::make_shared(); // Here we add a ProcessCollector to collect cpu/rss/vsize/start_time information. @@ -1124,6 +1121,9 @@ class TiFlashMetrics std::mutex replica_sync_ru_mtx; std::unordered_map registered_keyspace_sync_replica_ru; + prometheus::Family * registered_raft_proxy_thread_memory_usage_family; + std::unordered_map registered_raft_proxy_thread_memory_usage_metrics; + public: #define MAKE_METRIC_MEMBER_M(family_name, help, type, ...) \ MetricFamily family_name \ diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 25c61a4984c..0e4b3add9ac 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1708,6 +1708,8 @@ int Server::main(const std::vector & /*args*/) LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); exit(-1); } + LOG_INFO(log, "Stop collecting metrics"); + tmt_context.getKVStore()->stopThreadAllocInfo(); LOG_INFO(log, "Set store context status Stopping"); tmt_context.setStatusStopping(); { diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index e103abddc1e..e37673855c2 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1030,17 +1030,20 @@ BaseBuffView GetLockByKey(const EngineStoreServerWrap * server, uint64_t region_ } } -void ReportThreadAllocateInfo(EngineStoreServerWrap * server, BaseBuffView name, uint64_t type, uint64_t value) +void ReportThreadAllocateInfo( + EngineStoreServerWrap * server, + BaseBuffView name, + ReportThreadAllocateInfoType type, + uint64_t value) { try { - LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 1"); - if(server == nullptr) return; - LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 2"); - if(server->tmt == nullptr) return; - LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 3"); - if(server->tmt->getKVStore() == nullptr) return; - LOG_INFO(DB::Logger::get(), "!!!!!!! RRR report 4"); + if (server == nullptr) + return; + if (server->tmt == nullptr) + return; + if (server->tmt->getKVStore() == nullptr) + return; server->tmt->getKVStore()->registerThreadAllocInfo(buffToStrView(name), type, value); } catch (...) @@ -1050,4 +1053,23 @@ void ReportThreadAllocateInfo(EngineStoreServerWrap * server, BaseBuffView name, } } +void ReportThreadAllocateBatch(EngineStoreServerWrap * server, BaseBuffView name, ReportThreadAllocateInfoBatch data) +{ + try + { + if (server == nullptr) + return; + if (server->tmt == nullptr) + return; + if (server->tmt->getKVStore() == nullptr) + return; + server->tmt->getKVStore()->registerThreadAllocBatch(buffToStrView(name), data); + } + catch (...) + { + tryLogCurrentFatalException(__PRETTY_FUNCTION__); + exit(-1); + } +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h index 85998207ba6..86ea62c2d21 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h @@ -206,9 +206,12 @@ FapSnapshotState QueryFapSnapshotState( uint64_t term); void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id); bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id); -void ReportThreadAllocateInfo(EngineStoreServerWrap *, - BaseBuffView name, uint64_t type, - uint64_t value); +void ReportThreadAllocateInfo( + EngineStoreServerWrap *, + BaseBuffView name, + ReportThreadAllocateInfoType type, + uint64_t value); +void ReportThreadAllocateBatch(EngineStoreServerWrap *, BaseBuffView name, ReportThreadAllocateInfoBatch data); } inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap * tiflash_instance_wrap) @@ -261,6 +264,7 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap .fn_query_fap_snapshot_state = QueryFapSnapshotState, .fn_clear_fap_snapshot = ClearFapSnapshot, .fn_report_thread_allocate_info = ReportThreadAllocateInfo, + .fn_report_thread_allocate_batch = ReportThreadAllocateBatch, }; } diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 0956582b6ff..297c5936076 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -69,13 +69,13 @@ KVStore::KVStore(Context & context) // default config about compact-log: rows 40k, bytes 32MB, gap 200. LOG_INFO(log, "KVStore inited, eager_raft_log_gc_enabled={}", eager_raft_log_gc_enabled); using namespace std::chrono_literals; - monitoring_thread = new std::thread([&](){ - while(true) { + monitoring_thread = new std::thread([&]() { + while (true) + { std::unique_lock l(monitoring_mut); - monitoring_cv.wait_for(l, 5000ms, [&](){ - return is_terminated; - }); - if (is_terminated) return; + monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; }); + if (is_terminated) + return; reportThreadAllocInfo(); } }); @@ -436,13 +436,7 @@ void KVStore::StoreMeta::update(Base && base_) KVStore::~KVStore() { LOG_INFO(log, "Destroy KVStore"); - { - std::unique_lock lk(monitoring_mut); - is_terminated = true; - monitoring_cv.notify_all(); - } - monitoring_thread->join(); - delete monitoring_thread; + stopThreadAllocInfo(); releaseReadIndexWorkers(); } @@ -501,47 +495,118 @@ RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const return region_manager.genRegionTaskLock(region_id); } -static std::string getThreadNameAggPrefix(const std::string & s) { - if(auto pos = s.find_last_of('-'); pos != std::string::npos) { +static std::string getThreadNameAggPrefix(const std::string & s) +{ + if (auto pos = s.find_last_of('-'); pos != std::string::npos) + { return s.substr(0, pos); } return s; } -void KVStore::registerThreadAllocInfo(std::string_view thdname, uint64_t type, uint64_t value) { - if (value == 0) return; +static std::string getThreadNameAggPrefix(const std::string_view & s) +{ + if (auto pos = s.find_last_of('-'); pos != std::string::npos) + { + return std::string(s.begin(), s.begin() + pos); + } + return std::string(s.begin(), s.end()); +} + +void KVStore::registerThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) +{ + if (value == 0) + return; std::unique_lock l(memory_allocation_mut); - LOG_INFO(DB::Logger::get(), "!!!!!!! RRR register"); - auto [it, ok] = memory_allocation_map.emplace(thdname, ThreadInfoJealloc()); - if (type == 0) { + std::string tname(thdname.begin(), thdname.end()); + if (type == ReportThreadAllocateInfoType::Reset) + { + memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); + } + else if (type == ReportThreadAllocateInfoType::AllocPtr) + { + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) + { + return; + } it->second.allocated_ptr = value; - } else if (type == 1) { + } + else if (type == ReportThreadAllocateInfoType::DeallocPtr) + { + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) + { + return; + } it->second.deallocated_ptr = value; } } -void KVStore::reportThreadAllocInfo() { +void KVStore::reportThreadAllocInfo() +{ std::shared_lock l(memory_allocation_mut); std::unordered_map agg_remaining; - for (const auto & [k, v]: memory_allocation_map) { + for (const auto & [k, v] : memory_allocation_map) + { auto agg_thread_name = getThreadNameAggPrefix(k); auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); it->second += v.remaining(); } - for (const auto & [k, v]: agg_remaining) { - if(k == "raftstore") { - GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_raftstore).Set(v); - } else if(k == "apply") { - GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply).Set(v); - } else if(k == "apply-low") { - GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply_low).Set(v); - } else if(k == "sst-importer") { - GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_sst_importer).Set(v); - } else if(k == "region-task") { - GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_region_task).Set(v); - } + // auto & tiflash_metrics = TiFlashMetrics::instance(); + // for (const auto & [k, v]: agg_remaining) { + // if(startsWith(k, "raftstore")) { + // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_raftstore).Set(v); + // } else if(startsWith(k, "apply-low")) { + // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply_low).Set(v); + // } else if(startsWith(k, "apply")) { + // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply).Set(v); + // } else if(startsWith(k, "sst-importer")) { + // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_sst_importer).Set(v); + // } else if(startsWith(k, "region-task")) { + // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_region_task).Set(v); + // } + // if (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) + // { + // // Add new keyspace store usage metric + // tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( + // k, + // &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add( + // {{"type", k}})); + // } + // tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); + // } +} + + +void KVStore::registerThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) +{ + auto & tiflash_metrics = TiFlashMetrics::instance(); + auto k = getThreadNameAggPrefix(name); + int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); + if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + // Add new keyspace store usage metric + tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); } + tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); +} + +void KVStore::stopThreadAllocInfo() +{ + { + std::unique_lock lk(monitoring_mut); + if (monitoring_thread == nullptr) + return; + is_terminated = true; + monitoring_cv.notify_all(); + } + monitoring_thread->join(); + delete monitoring_thread; + monitoring_thread = nullptr; } } // namespace DB diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 1a678aaf50e..5d3041dee71 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -48,6 +48,9 @@ namespace tests class KVStoreTestBase; } // namespace tests +enum class ReportThreadAllocateInfoType : uint64_t; +struct ReportThreadAllocateInfoBatch; + class IAST; using ASTPtr = std::shared_ptr; using ASTs = std::vector; @@ -119,20 +122,29 @@ struct ProxyConfigSummary size_t snap_handle_pool_size = 0; }; -struct ThreadInfoJealloc { +struct ThreadInfoJealloc +{ uint64_t allocated_ptr{0}; uint64_t deallocated_ptr{0}; - uint64_t allocated() const { - if(allocated_ptr == 0) return 0; + uint64_t allocated() const + { + if (allocated_ptr == 0) + return 0; return *reinterpret_cast(allocated_ptr); } - uint64_t deallocated() const { - if(deallocated_ptr == 0) return 0; + uint64_t deallocated() const + { + if (deallocated_ptr == 0) + return 0; return *reinterpret_cast(deallocated_ptr); } - int64_t remaining() const { - RUNTIME_CHECK(allocated_ptr != 0); + int64_t remaining() const + { + if (deallocated_ptr == 0) + return 0; + if (allocated_ptr == 0) + return 0; return static_cast(allocated()) - static_cast(deallocated()); } }; @@ -165,8 +177,10 @@ class KVStore final : private boost::noncopyable FileUsageStatistics getFileUsageStatistics() const; // Proxy will validate and refit the config items from the toml file. const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } - void registerThreadAllocInfo(std::string_view, uint64_t type, uint64_t value); + void registerThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); + void registerThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); void reportThreadAllocInfo(); + void stopThreadAllocInfo(); public: // Region Management void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *); @@ -433,7 +447,7 @@ class KVStore final : private boost::noncopyable mutable std::shared_mutex memory_allocation_mut; std::unordered_map memory_allocation_map; - + bool is_terminated{false}; mutable std::mutex monitoring_mut; std::condition_variable monitoring_cv; From 279e7a91523fa7ff8fce1805f07a49216bc3cefa Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 14 Mar 2024 15:07:38 +0800 Subject: [PATCH 03/37] fix Signed-off-by: CalvinNeo --- contrib/tiflash-proxy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index cafa770ba0e..24ce15aebec 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit cafa770ba0ed490b0d6354df4b7c30dbd862f0c2 +Subproject commit 24ce15aebec4cb5626468632b5c2d85bf37b9952 From 51aa25bf007fc8be033b53782f813df74a4a161c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 14 Mar 2024 15:07:58 +0800 Subject: [PATCH 04/37] fix Signed-off-by: CalvinNeo --- contrib/tiflash-proxy-cmake/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/contrib/tiflash-proxy-cmake/CMakeLists.txt b/contrib/tiflash-proxy-cmake/CMakeLists.txt index d48ef1601d2..374e6269637 100644 --- a/contrib/tiflash-proxy-cmake/CMakeLists.txt +++ b/contrib/tiflash-proxy-cmake/CMakeLists.txt @@ -14,10 +14,10 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG) set(_TIFLASH_PROXY_BUILD_PROFILE "debug") - set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make debug) else() set(_TIFLASH_PROXY_BUILD_PROFILE "release") - set(_TIFLASH_PROXY_MAKE_COMMAND make release) + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make release) endif() set(_TIFLASH_PROXY_SOURCE_DIR "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy") From d8ea43bf7ce1248eac6b0caeb8c6b30bcba56fec Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 14 Mar 2024 18:44:55 +0800 Subject: [PATCH 05/37] zzzzz Signed-off-by: CalvinNeo --- CMakeLists.txt | 4 ++ contrib/tiflash-proxy | 2 +- contrib/tiflash-proxy-cmake/CMakeLists.txt | 12 +++- dbms/src/Common/MemoryTrace.h | 36 ++++++++++ dbms/src/Server/Server.cpp | 6 ++ dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 9 ++- dbms/src/Storages/KVStore/FFI/ProxyFFI.h | 3 +- dbms/src/Storages/KVStore/KVStore.cpp | 71 +++++++++---------- dbms/src/Storages/KVStore/KVStore.h | 10 +-- .../KVStore/MultiRaft/Disagg/FastAddPeer.cpp | 2 +- .../Storages/KVStore/Read/ReadIndexWorker.cpp | 19 ++++- .../Storages/KVStore/Read/ReadIndexWorker.h | 7 ++ 12 files changed, 128 insertions(+), 53 deletions(-) create mode 100644 dbms/src/Common/MemoryTrace.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c1ec82990c..477661aa4d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -373,6 +373,10 @@ else () message (STATUS "Set jemalloc narenas ${JEMALLOC_NARENAS}") endif () +if (ENABLE_JEMALLOC) + add_compile_definitions(WITH_JEMALLOC) +endif () + option (TEST_LLVM_COVERAGE "Enables flags for test coverage" OFF) if (TEST_LLVM_COVERAGE AND CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fprofile-instr-generate -fcoverage-mapping -DTIFLASH_LLVM_COVERAGE=1") diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 24ce15aebec..c58aa770b59 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 24ce15aebec4cb5626468632b5c2d85bf37b9952 +Subproject commit c58aa770b59ae446131fa73e5f0c01dfffc8f5f4 diff --git a/contrib/tiflash-proxy-cmake/CMakeLists.txt b/contrib/tiflash-proxy-cmake/CMakeLists.txt index 374e6269637..107ffc3c0a6 100644 --- a/contrib/tiflash-proxy-cmake/CMakeLists.txt +++ b/contrib/tiflash-proxy-cmake/CMakeLists.txt @@ -14,10 +14,18 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG" OR SAN_DEBUG) set(_TIFLASH_PROXY_BUILD_PROFILE "debug") - set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make debug) + if (ENABLE_JEMALLOC) + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make debug) + else() + set(_TIFLASH_PROXY_MAKE_COMMAND make debug) + endif() else() set(_TIFLASH_PROXY_BUILD_PROFILE "release") - set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make release) + if (ENABLE_JEMALLOC) + set(_TIFLASH_PROXY_MAKE_COMMAND ENABLE_FEATURES="external-jemalloc" make release) + else() + set(_TIFLASH_PROXY_MAKE_COMMAND make release) + endif() endif() set(_TIFLASH_PROXY_SOURCE_DIR "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy") diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h new file mode 100644 index 00000000000..545ceb6303a --- /dev/null +++ b/dbms/src/Common/MemoryTrace.h @@ -0,0 +1,36 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#ifdef WITH_JEMALLOC +#include +#endif + +namespace DB { +#ifdef WITH_JEMALLOC +std::tuple getAllocDeallocPtr() { + uint64_t * ptr1 = nullptr; + uint64_t size1 = sizeof ptr1; + mallctl("thread.allocatedp", (void*)&ptr1, &size1, NULL, 0); + uint64_t * ptr2 = nullptr; + uint64_t size2 = sizeof ptr2; + mallctl("thread.deallocatedp", (void*)&ptr2, &size2, NULL, 0); + return std::make_tuple(ptr1, ptr2); +} +#else +std::tuple getAllocDeallocPtr() { + return std::make_tuple(nullptr, nullptr); +} +#endif +} \ No newline at end of file diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 0e4b3add9ac..145145228b4 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1029,6 +1029,12 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "UniPS is not enabled for proxy, page_version={}", STORAGE_FORMAT_CURRENT.page); } + #ifdef WITH_JEMALLOC + LOG_INFO(log, "Using Jemalloc for TiFlash"); + #else + LOG_INFO(log, "Not using Jemalloc for TiFlash"); + #endif + RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log); if (proxy_conf.is_proxy_runnable) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index e37673855c2..a65b1af0d76 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1032,19 +1032,21 @@ BaseBuffView GetLockByKey(const EngineStoreServerWrap * server, uint64_t region_ void ReportThreadAllocateInfo( EngineStoreServerWrap * server, + uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoType type, uint64_t value) { try { + UNUSED(tid); if (server == nullptr) return; if (server->tmt == nullptr) return; if (server->tmt->getKVStore() == nullptr) return; - server->tmt->getKVStore()->registerThreadAllocInfo(buffToStrView(name), type, value); + server->tmt->getKVStore()->reportThreadAllocInfo(buffToStrView(name), type, value); } catch (...) { @@ -1053,17 +1055,18 @@ void ReportThreadAllocateInfo( } } -void ReportThreadAllocateBatch(EngineStoreServerWrap * server, BaseBuffView name, ReportThreadAllocateInfoBatch data) +void ReportThreadAllocateBatch(EngineStoreServerWrap * server, uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoBatch data) { try { + UNUSED(tid); if (server == nullptr) return; if (server->tmt == nullptr) return; if (server->tmt->getKVStore() == nullptr) return; - server->tmt->getKVStore()->registerThreadAllocBatch(buffToStrView(name), data); + server->tmt->getKVStore()->reportThreadAllocBatch(buffToStrView(name), data); } catch (...) { diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h index 86ea62c2d21..68da7b2ee48 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h @@ -208,10 +208,11 @@ void ClearFapSnapshot(EngineStoreServerWrap * server, uint64_t region_id); bool KvstoreRegionExists(EngineStoreServerWrap * server, uint64_t region_id); void ReportThreadAllocateInfo( EngineStoreServerWrap *, + uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoType type, uint64_t value); -void ReportThreadAllocateBatch(EngineStoreServerWrap *, BaseBuffView name, ReportThreadAllocateInfoBatch data); +void ReportThreadAllocateBatch(EngineStoreServerWrap *, uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoBatch data); } inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap * tiflash_instance_wrap) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 297c5936076..09b65075fc6 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -76,7 +76,7 @@ KVStore::KVStore(Context & context) monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; }); if (is_terminated) return; - reportThreadAllocInfo(); + recordThreadAllocInfo(); } }); } @@ -513,18 +513,22 @@ static std::string getThreadNameAggPrefix(const std::string_view & s) return std::string(s.begin(), s.end()); } -void KVStore::registerThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) +void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) { - if (value == 0) - return; std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); if (type == ReportThreadAllocateInfoType::Reset) { memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); } + else if (type == ReportThreadAllocateInfoType::Remove) + { + memory_allocation_map.erase(tname); + } else if (type == ReportThreadAllocateInfoType::AllocPtr) { + if (value == 0) + return; auto it = memory_allocation_map.find(tname); if unlikely (it == memory_allocation_map.end()) { @@ -534,6 +538,8 @@ void KVStore::registerThreadAllocInfo(std::string_view thdname, ReportThreadAllo } else if (type == ReportThreadAllocateInfoType::DeallocPtr) { + if (value == 0) + return; auto it = memory_allocation_map.find(tname); if unlikely (it == memory_allocation_map.end()) { @@ -543,48 +549,41 @@ void KVStore::registerThreadAllocInfo(std::string_view thdname, ReportThreadAllo } } -void KVStore::reportThreadAllocInfo() +static const std::unordered_set WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"}; + +/// For those everlasting threads, we can directly access their allocatedp/allocatedp. +void KVStore::recordThreadAllocInfo() { std::shared_lock l(memory_allocation_mut); - std::unordered_map agg_remaining; + std::unordered_map agg_remaining; for (const auto & [k, v] : memory_allocation_map) { auto agg_thread_name = getThreadNameAggPrefix(k); - auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); - it->second += v.remaining(); + // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. + if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) { + auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); + it->second += v.remaining(); + } } + for (const auto & [k, v] : agg_remaining) { + auto & tiflash_metrics = TiFlashMetrics::instance(); + if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + // Add new keyspace store usage metric + tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + } + tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); + } +} - // auto & tiflash_metrics = TiFlashMetrics::instance(); - // for (const auto & [k, v]: agg_remaining) { - // if(startsWith(k, "raftstore")) { - // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_raftstore).Set(v); - // } else if(startsWith(k, "apply-low")) { - // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply_low).Set(v); - // } else if(startsWith(k, "apply")) { - // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_apply).Set(v); - // } else if(startsWith(k, "sst-importer")) { - // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_sst_importer).Set(v); - // } else if(startsWith(k, "region-task")) { - // GET_METRIC(tiflash_raft_proxy_thread_memory_usage, type_region_task).Set(v); - // } - // if (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) - // { - // // Add new keyspace store usage metric - // tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( - // k, - // &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add( - // {{"type", k}})); - // } - // tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); - // } -} - - -void KVStore::registerThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) +/// For those threads with shorter life, we must only update in their call chain. +void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) { - auto & tiflash_metrics = TiFlashMetrics::instance(); auto k = getThreadNameAggPrefix(name); int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); + auto & tiflash_metrics = TiFlashMetrics::instance(); if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) { // Add new keyspace store usage metric diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 5d3041dee71..a3684aaec1e 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -141,10 +141,6 @@ struct ThreadInfoJealloc } int64_t remaining() const { - if (deallocated_ptr == 0) - return 0; - if (allocated_ptr == 0) - return 0; return static_cast(allocated()) - static_cast(deallocated()); } }; @@ -177,9 +173,9 @@ class KVStore final : private boost::noncopyable FileUsageStatistics getFileUsageStatistics() const; // Proxy will validate and refit the config items from the toml file. const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } - void registerThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); - void registerThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); - void reportThreadAllocInfo(); + void reportThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); + void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); + void recordThreadAllocInfo(); void stopThreadAllocInfo(); public: // Region Management diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 21a3b9b94fc..2e6dbd45e72 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -630,7 +630,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Increment(); auto job_func = [server, region_id, new_peer_id, fap_ctx, current_time]() { std::string origin_name = getThreadName(); - SCOPE_EXIT({ setThreadName(origin_name.c_str()); }); + SCOPE_EXIT({setThreadName(origin_name.c_str()); }); setThreadName("fap-builder"); return FastAddPeerImpl( fap_ctx, diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index 214c7479abf..067bef0e387 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -774,9 +775,11 @@ void ReadIndexWorker::runOneRound(SteadyClock::duration min_dur) ReadIndexWorker::ReadIndexWorker( const TiFlashRaftProxyHelper & proxy_helper_, + KVStore & kvstore_, size_t id_, AsyncWaker::NotifierPtr notifier_) : proxy_helper(proxy_helper_) + , kvstore(kvstore_) , id(id_) , read_index_notify_ctrl(std::make_shared(notifier_)) {} @@ -798,16 +801,19 @@ ReadIndexWorker & ReadIndexWorkerManager::getWorkerByRegion(RegionID region_id) ReadIndexWorkerManager::ReadIndexWorkerManager( const TiFlashRaftProxyHelper & proxy_helper_, + KVStore & kvstore_, size_t workers_cnt, ReadIndexWorkerManager::FnGetTickTime && fn_min_dur_handle_region, size_t runner_cnt) : proxy_helper(proxy_helper_) + , kvstore(kvstore_) , logger(Logger::get("ReadIndexWorkers")) { for (size_t i = 0; i < runner_cnt; ++i) runners.emplace_back(std::make_unique( i, runner_cnt, + kvstore, workers, logger, fn_min_dur_handle_region, @@ -822,7 +828,7 @@ ReadIndexWorkerManager::ReadIndexWorkerManager( { for (size_t wid = rid; wid < workers_cnt; wid += runner_cnt) { - workers[wid] = std::make_unique(proxy_helper, wid, runners[rid]->global_notifier); + workers[wid] = std::make_unique(proxy_helper, kvstore, wid, runners[rid]->global_notifier); } } } @@ -946,6 +952,7 @@ BatchReadIndexRes KVStore::batchReadIndex(const std::vector ReadIndexWorkerManager::newReadIndexWorkerManager( const TiFlashRaftProxyHelper & proxy_helper, + KVStore & kvstore, size_t cap, ReadIndexWorkerManager::FnGetTickTime && fn_min_dur_handle_region, size_t runner_cnt) @@ -953,7 +960,7 @@ std::unique_ptr ReadIndexWorkerManager::newReadIndexWork #ifdef ADD_TEST_DEBUG_LOG_FMT global_logger_for_test = &Poco::Logger::get("TestReadIndexWork"); #endif - return std::make_unique(proxy_helper, cap, std::move(fn_min_dur_handle_region), runner_cnt); + return std::make_unique(proxy_helper, kvstore, cap, std::move(fn_min_dur_handle_region), runner_cnt); } void KVStore::initReadIndexWorkers( @@ -970,6 +977,7 @@ void KVStore::initReadIndexWorkers( LOG_INFO(log, "Start to initialize read-index workers: worker count {}, runner count {}", worker_cnt, runner_cnt); auto * ptr = ReadIndexWorkerManager::newReadIndexWorkerManager( *proxy_helper, + *this, worker_cnt, std::move(fn_min_dur_handle_region), runner_cnt) @@ -1041,6 +1049,10 @@ void ReadIndexWorkerManager::ReadIndexRunner::asyncRun() work_thread = std::make_unique([this]() { std::string name = fmt::format("ReadIndexWkr-{}", id); setThreadName(name.data()); + auto [ptr_a, ptr_d] = getAllocDeallocPtr(); + kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::Reset, 0); + kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::AllocPtr, reinterpret_cast(ptr_a)); + kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::DeallocPtr, reinterpret_cast(ptr_d)); LOG_INFO(logger, "Start read-index runner {}", id); while (true) { @@ -1050,6 +1062,7 @@ void ReadIndexWorkerManager::ReadIndexRunner::asyncRun() if (state.load(std::memory_order_acquire) != State::Running) break; } + kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::Remove, 0); LOG_INFO(logger, "Start to stop read-index runner {}", id); }); } @@ -1057,12 +1070,14 @@ void ReadIndexWorkerManager::ReadIndexRunner::asyncRun() ReadIndexWorkerManager::ReadIndexRunner::ReadIndexRunner( size_t id_, size_t runner_cnt_, + KVStore & kvstore_, ReadIndexWorkers & workers_, LoggerPtr logger_, FnGetTickTime fn_min_dur_handle_region_, AsyncWaker::NotifierPtr global_notifier_) : id(id_) , runner_cnt(runner_cnt_) + , kvstore(kvstore_) , workers(workers_) , logger(std::move(logger_)) , fn_min_dur_handle_region(std::move(fn_min_dur_handle_region_)) diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h index 538fd8a9828..724250fe998 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.h @@ -85,6 +85,7 @@ class ReadIndexWorkerManager : boost::noncopyable explicit ReadIndexWorkerManager( const TiFlashRaftProxyHelper & proxy_helper_, + KVStore & kvstore_, size_t workers_cnt, FnGetTickTime && fn_min_dur_handle_region_, size_t runner_cnt); @@ -100,6 +101,7 @@ class ReadIndexWorkerManager : boost::noncopyable static std::unique_ptr newReadIndexWorkerManager( const TiFlashRaftProxyHelper & proxy_helper, + KVStore & kvstore, size_t cap, FnGetTickTime && fn_min_dur_handle_region, size_t runner_cnt = 1); @@ -135,6 +137,7 @@ class ReadIndexWorkerManager : boost::noncopyable ReadIndexRunner( size_t id_, size_t runner_cnt_, + KVStore & kvstore_, ReadIndexWorkers & workers_, LoggerPtr logger_, FnGetTickTime fn_min_dur_handle_region_, @@ -142,6 +145,7 @@ class ReadIndexWorkerManager : boost::noncopyable const size_t id; const size_t runner_cnt; + KVStore & kvstore; ReadIndexWorkers & workers; LoggerPtr logger; const FnGetTickTime fn_min_dur_handle_region; @@ -153,6 +157,7 @@ class ReadIndexWorkerManager : boost::noncopyable private: const TiFlashRaftProxyHelper & proxy_helper; + KVStore & kvstore; /// Each runner is mapped to a part of workers(worker_id % runner_cnt == runner_id). std::vector> runners; /// Each worker controls read-index process of region(region_id % worker_cnt == worker_id). @@ -284,6 +289,7 @@ struct ReadIndexWorker explicit ReadIndexWorker( const TiFlashRaftProxyHelper & proxy_helper_, + KVStore & kvstore_, size_t id_, AsyncWaker::NotifierPtr notifier_); @@ -311,6 +317,7 @@ struct ReadIndexWorker // static std::atomic max_read_index_history; const TiFlashRaftProxyHelper & proxy_helper; + KVStore & kvstore; const size_t id; DataMap data_map; From e0705b1770e76c3ee73b415fe9bc1e9a6941fb45 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 14 Mar 2024 23:19:26 +0800 Subject: [PATCH 06/37] fmt Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.h | 15 +++++++++------ dbms/src/Server/Server.cpp | 10 +++++----- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 6 +++++- dbms/src/Storages/KVStore/FFI/ProxyFFI.h | 6 +++++- dbms/src/Storages/KVStore/KVStore.cpp | 6 ++++-- dbms/src/Storages/KVStore/KVStore.h | 5 +---- .../KVStore/MultiRaft/Disagg/FastAddPeer.cpp | 2 +- .../Storages/KVStore/Read/ReadIndexWorker.cpp | 17 +++++++++++++---- 8 files changed, 43 insertions(+), 24 deletions(-) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index 545ceb6303a..67b1f19a902 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -17,20 +17,23 @@ #include #endif -namespace DB { +namespace DB +{ #ifdef WITH_JEMALLOC -std::tuple getAllocDeallocPtr() { +std::tuple getAllocDeallocPtr() +{ uint64_t * ptr1 = nullptr; uint64_t size1 = sizeof ptr1; - mallctl("thread.allocatedp", (void*)&ptr1, &size1, NULL, 0); + mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); uint64_t * ptr2 = nullptr; uint64_t size2 = sizeof ptr2; - mallctl("thread.deallocatedp", (void*)&ptr2, &size2, NULL, 0); + mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); return std::make_tuple(ptr1, ptr2); } #else -std::tuple getAllocDeallocPtr() { +std::tuple getAllocDeallocPtr() +{ return std::make_tuple(nullptr, nullptr); } #endif -} \ No newline at end of file +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 145145228b4..25a0e4d6f36 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1029,11 +1029,11 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "UniPS is not enabled for proxy, page_version={}", STORAGE_FORMAT_CURRENT.page); } - #ifdef WITH_JEMALLOC - LOG_INFO(log, "Using Jemalloc for TiFlash"); - #else - LOG_INFO(log, "Not using Jemalloc for TiFlash"); - #endif +#ifdef WITH_JEMALLOC + LOG_INFO(log, "Using Jemalloc for TiFlash"); +#else + LOG_INFO(log, "Not using Jemalloc for TiFlash"); +#endif RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log); diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index a65b1af0d76..f6624bdf795 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1055,7 +1055,11 @@ void ReportThreadAllocateInfo( } } -void ReportThreadAllocateBatch(EngineStoreServerWrap * server, uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoBatch data) +void ReportThreadAllocateBatch( + EngineStoreServerWrap * server, + uint64_t tid, + BaseBuffView name, + ReportThreadAllocateInfoBatch data) { try { diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h index 68da7b2ee48..1ccffab631a 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.h +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.h @@ -212,7 +212,11 @@ void ReportThreadAllocateInfo( BaseBuffView name, ReportThreadAllocateInfoType type, uint64_t value); -void ReportThreadAllocateBatch(EngineStoreServerWrap *, uint64_t tid, BaseBuffView name, ReportThreadAllocateInfoBatch data); +void ReportThreadAllocateBatch( + EngineStoreServerWrap *, + uint64_t tid, + BaseBuffView name, + ReportThreadAllocateInfoBatch data); } inline EngineStoreServerHelper GetEngineStoreServerHelper(EngineStoreServerWrap * tiflash_instance_wrap) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 09b65075fc6..7b88b8372fd 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -560,12 +560,14 @@ void KVStore::recordThreadAllocInfo() { auto agg_thread_name = getThreadNameAggPrefix(k); // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. - if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) { + if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) + { auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); it->second += v.remaining(); } } - for (const auto & [k, v] : agg_remaining) { + for (const auto & [k, v] : agg_remaining) + { auto & tiflash_metrics = TiFlashMetrics::instance(); if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) { diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index a3684aaec1e..ea08f65c2e9 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -139,10 +139,7 @@ struct ThreadInfoJealloc return 0; return *reinterpret_cast(deallocated_ptr); } - int64_t remaining() const - { - return static_cast(allocated()) - static_cast(deallocated()); - } + int64_t remaining() const { return static_cast(allocated()) - static_cast(deallocated()); } }; /// KVStore manages raft replication and transactions. diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 2e6dbd45e72..21a3b9b94fc 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -630,7 +630,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u GET_METRIC(tiflash_fap_task_state, type_queueing_stage).Increment(); auto job_func = [server, region_id, new_peer_id, fap_ctx, current_time]() { std::string origin_name = getThreadName(); - SCOPE_EXIT({setThreadName(origin_name.c_str()); }); + SCOPE_EXIT({ setThreadName(origin_name.c_str()); }); setThreadName("fap-builder"); return FastAddPeerImpl( fap_ctx, diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index 067bef0e387..54c015f49cc 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include +#include #include #include #include @@ -828,7 +828,8 @@ ReadIndexWorkerManager::ReadIndexWorkerManager( { for (size_t wid = rid; wid < workers_cnt; wid += runner_cnt) { - workers[wid] = std::make_unique(proxy_helper, kvstore, wid, runners[rid]->global_notifier); + workers[wid] + = std::make_unique(proxy_helper, kvstore, wid, runners[rid]->global_notifier); } } } @@ -960,7 +961,12 @@ std::unique_ptr ReadIndexWorkerManager::newReadIndexWork #ifdef ADD_TEST_DEBUG_LOG_FMT global_logger_for_test = &Poco::Logger::get("TestReadIndexWork"); #endif - return std::make_unique(proxy_helper, kvstore, cap, std::move(fn_min_dur_handle_region), runner_cnt); + return std::make_unique( + proxy_helper, + kvstore, + cap, + std::move(fn_min_dur_handle_region), + runner_cnt); } void KVStore::initReadIndexWorkers( @@ -1052,7 +1058,10 @@ void ReadIndexWorkerManager::ReadIndexRunner::asyncRun() auto [ptr_a, ptr_d] = getAllocDeallocPtr(); kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::Reset, 0); kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::AllocPtr, reinterpret_cast(ptr_a)); - kvstore.reportThreadAllocInfo(name, ReportThreadAllocateInfoType::DeallocPtr, reinterpret_cast(ptr_d)); + kvstore.reportThreadAllocInfo( + name, + ReportThreadAllocateInfoType::DeallocPtr, + reinterpret_cast(ptr_d)); LOG_INFO(logger, "Start read-index runner {}", id); while (true) { From c73606a793f8ea51125b370eaa1e1b2939bc99fc Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 00:04:12 +0800 Subject: [PATCH 07/37] h Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.h | 1 + dbms/src/Storages/KVStore/KVStore.cpp | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index 67b1f19a902..92c9b838df4 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -13,6 +13,7 @@ // limitations under the License. +#include #ifdef WITH_JEMALLOC #include #endif diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 7b88b8372fd..998f5c6a6a9 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -515,6 +515,8 @@ static std::string getThreadNameAggPrefix(const std::string_view & s) void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) { + // Many threads have empty name, better just not handle. + if (thdname.empty()) return; std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); if (type == ReportThreadAllocateInfoType::Reset) @@ -583,6 +585,9 @@ void KVStore::recordThreadAllocInfo() /// For those threads with shorter life, we must only update in their call chain. void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) { + // Many threads have empty name, better just not handle. + if (name.empty()) return; + // TODO(o11y) Could be costy. auto k = getThreadNameAggPrefix(name); int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); auto & tiflash_metrics = TiFlashMetrics::instance(); From aede44c6f7eb5b2cc485311e0d3c8f8eef17cfd6 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 00:09:48 +0800 Subject: [PATCH 08/37] f Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 998f5c6a6a9..b3223aa9f16 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -516,7 +516,8 @@ static std::string getThreadNameAggPrefix(const std::string_view & s) void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAllocateInfoType type, uint64_t value) { // Many threads have empty name, better just not handle. - if (thdname.empty()) return; + if (thdname.empty()) + return; std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); if (type == ReportThreadAllocateInfoType::Reset) @@ -586,7 +587,8 @@ void KVStore::recordThreadAllocInfo() void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocateInfoBatch data) { // Many threads have empty name, better just not handle. - if (name.empty()) return; + if (name.empty()) + return; // TODO(o11y) Could be costy. auto k = getThreadNameAggPrefix(name); int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); From 33c06a978689eace1586a068e35d633f94880cde Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 10:50:08 +0800 Subject: [PATCH 09/37] co Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.h | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index 92c9b838df4..b44bfd227c4 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#pragma once #include #ifdef WITH_JEMALLOC @@ -20,9 +21,9 @@ namespace DB { -#ifdef WITH_JEMALLOC std::tuple getAllocDeallocPtr() { +#ifdef WITH_JEMALLOC uint64_t * ptr1 = nullptr; uint64_t size1 = sizeof ptr1; mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); @@ -30,11 +31,8 @@ std::tuple getAllocDeallocPtr() uint64_t size2 = sizeof ptr2; mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); return std::make_tuple(ptr1, ptr2); -} #else -std::tuple getAllocDeallocPtr() -{ return std::make_tuple(nullptr, nullptr); -} #endif +} } // namespace DB \ No newline at end of file From c4336995cd4e32ae457124247a74297746064261 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 15 Mar 2024 11:04:43 +0800 Subject: [PATCH 10/37] Update dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index f6624bdf795..35a460d9114 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1064,11 +1064,7 @@ void ReportThreadAllocateBatch( try { UNUSED(tid); - if (server == nullptr) - return; - if (server->tmt == nullptr) - return; - if (server->tmt->getKVStore() == nullptr) + if (!server || !server->tmt || !server->tmt->getKVStore()) return; server->tmt->getKVStore()->reportThreadAllocBatch(buffToStrView(name), data); } From c2a78f725400bc1df8db9756cbacaabb4c525d57 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 11:09:02 +0800 Subject: [PATCH 11/37] fix Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index b3223aa9f16..42ae171caeb 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -589,7 +589,7 @@ void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocate // Many threads have empty name, better just not handle. if (name.empty()) return; - // TODO(o11y) Could be costy. + // TODO(jemalloc-trace) Could be costy. auto k = getThreadNameAggPrefix(name); int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); auto & tiflash_metrics = TiFlashMetrics::instance(); From dc8159d1bc8e3d22250d1dba817139194070de50 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 11:23:00 +0800 Subject: [PATCH 12/37] t1 Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.h | 42 +++++++++---------- .../Storages/KVStore/Read/ReadIndexWorker.cpp | 19 +++++++++ 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index b44bfd227c4..fd70342229f 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -12,27 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma once +// #pragma once -#include -#ifdef WITH_JEMALLOC -#include -#endif +// #include +// #ifdef WITH_JEMALLOC +// #include +// #endif -namespace DB -{ -std::tuple getAllocDeallocPtr() -{ -#ifdef WITH_JEMALLOC - uint64_t * ptr1 = nullptr; - uint64_t size1 = sizeof ptr1; - mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); - uint64_t * ptr2 = nullptr; - uint64_t size2 = sizeof ptr2; - mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); - return std::make_tuple(ptr1, ptr2); -#else - return std::make_tuple(nullptr, nullptr); -#endif -} +// namespace DB +// { +// std::tuple getAllocDeallocPtr() +// { +// #ifdef WITH_JEMALLOC +// uint64_t * ptr1 = nullptr; +// uint64_t size1 = sizeof ptr1; +// mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); +// uint64_t * ptr2 = nullptr; +// uint64_t size2 = sizeof ptr2; +// mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); +// return std::make_tuple(ptr1, ptr2); +// #else +// return std::make_tuple(nullptr, nullptr); +// #endif +// } } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index 54c015f49cc..d4085f10735 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -30,6 +30,25 @@ #include +#ifdef WITH_JEMALLOC +#include +#endif + +std::tuple getAllocDeallocPtr() +{ +#ifdef WITH_JEMALLOC + uint64_t * ptr1 = nullptr; + uint64_t size1 = sizeof ptr1; + mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); + uint64_t * ptr2 = nullptr; + uint64_t size2 = sizeof ptr2; + mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); + return std::make_tuple(ptr1, ptr2); +#else + return std::make_tuple(nullptr, nullptr); +#endif +} + namespace DB { // #define ADD_TEST_DEBUG_LOG_FMT From 8232c978f71bd4899f17cdce4d12f298a3e53f24 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 11:36:51 +0800 Subject: [PATCH 13/37] t1 Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index fd70342229f..35f2e0c8233 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -35,4 +35,4 @@ // return std::make_tuple(nullptr, nullptr); // #endif // } -} // namespace DB \ No newline at end of file +// } // namespace DB \ No newline at end of file From b662e97537725ab6d1b68024f2cfc602089c135f Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 12:05:40 +0800 Subject: [PATCH 14/37] reset Signed-off-by: CalvinNeo --- dbms/src/Common/MemoryTrace.cpp | 34 +++++++++++++++++++ dbms/src/Common/MemoryTrace.h | 28 ++++----------- dbms/src/Server/Server.cpp | 2 +- .../Storages/KVStore/Read/ReadIndexWorker.cpp | 18 ---------- 4 files changed, 41 insertions(+), 41 deletions(-) create mode 100644 dbms/src/Common/MemoryTrace.cpp diff --git a/dbms/src/Common/MemoryTrace.cpp b/dbms/src/Common/MemoryTrace.cpp new file mode 100644 index 00000000000..ee2787b8a14 --- /dev/null +++ b/dbms/src/Common/MemoryTrace.cpp @@ -0,0 +1,34 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#ifdef USE_JEMALLOC +#include +#endif + +std::tuple getAllocDeallocPtr() +{ +#ifdef USE_JEMALLOC + uint64_t * ptr1 = nullptr; + uint64_t size1 = sizeof ptr1; + je_mallctl("thread.allocatedp", (void *)&ptr1, &size1, nullptr, 0); + uint64_t * ptr2 = nullptr; + uint64_t size2 = sizeof ptr2; + je_mallctl("thread.deallocatedp", (void *)&ptr2, &size2, nullptr, 0); + return std::make_tuple(ptr1, ptr2); +#else + return std::make_tuple(nullptr, nullptr); +#endif +} diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryTrace.h index 35f2e0c8233..fc6303ac3b5 100644 --- a/dbms/src/Common/MemoryTrace.h +++ b/dbms/src/Common/MemoryTrace.h @@ -12,27 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -// #pragma once +#pragma once -// #include -// #ifdef WITH_JEMALLOC -// #include -// #endif +#include -// namespace DB -// { -// std::tuple getAllocDeallocPtr() -// { -// #ifdef WITH_JEMALLOC -// uint64_t * ptr1 = nullptr; -// uint64_t size1 = sizeof ptr1; -// mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); -// uint64_t * ptr2 = nullptr; -// uint64_t size2 = sizeof ptr2; -// mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); -// return std::make_tuple(ptr1, ptr2); -// #else -// return std::make_tuple(nullptr, nullptr); -// #endif -// } -// } // namespace DB \ No newline at end of file +namespace DB +{ +std::tuple getAllocDeallocPtr(); +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 25a0e4d6f36..3adf764b567 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1029,7 +1029,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "UniPS is not enabled for proxy, page_version={}", STORAGE_FORMAT_CURRENT.page); } -#ifdef WITH_JEMALLOC +#ifdef USE_JEMALLOC LOG_INFO(log, "Using Jemalloc for TiFlash"); #else LOG_INFO(log, "Not using Jemalloc for TiFlash"); diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index d4085f10735..54d56ed1100 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -30,24 +30,6 @@ #include -#ifdef WITH_JEMALLOC -#include -#endif - -std::tuple getAllocDeallocPtr() -{ -#ifdef WITH_JEMALLOC - uint64_t * ptr1 = nullptr; - uint64_t size1 = sizeof ptr1; - mallctl("thread.allocatedp", (void *)&ptr1, &size1, NULL, 0); - uint64_t * ptr2 = nullptr; - uint64_t size2 = sizeof ptr2; - mallctl("thread.deallocatedp", (void *)&ptr2, &size2, NULL, 0); - return std::make_tuple(ptr1, ptr2); -#else - return std::make_tuple(nullptr, nullptr); -#endif -} namespace DB { From c3fb1a45c10219cf17ddeff18b75fac68b8cd760 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 12:17:19 +0800 Subject: [PATCH 15/37] fix Signed-off-by: CalvinNeo --- CMakeLists.txt | 4 ---- dbms/src/Common/MemoryTrace.cpp | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 477661aa4d3..4c1ec82990c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -373,10 +373,6 @@ else () message (STATUS "Set jemalloc narenas ${JEMALLOC_NARENAS}") endif () -if (ENABLE_JEMALLOC) - add_compile_definitions(WITH_JEMALLOC) -endif () - option (TEST_LLVM_COVERAGE "Enables flags for test coverage" OFF) if (TEST_LLVM_COVERAGE AND CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fprofile-instr-generate -fcoverage-mapping -DTIFLASH_LLVM_COVERAGE=1") diff --git a/dbms/src/Common/MemoryTrace.cpp b/dbms/src/Common/MemoryTrace.cpp index ee2787b8a14..24368e68559 100644 --- a/dbms/src/Common/MemoryTrace.cpp +++ b/dbms/src/Common/MemoryTrace.cpp @@ -18,6 +18,8 @@ #include #endif +namespace DB +{ std::tuple getAllocDeallocPtr() { #ifdef USE_JEMALLOC @@ -32,3 +34,4 @@ std::tuple getAllocDeallocPtr() return std::make_tuple(nullptr, nullptr); #endif } +} // namespace DB \ No newline at end of file From f8f50395ecf4b45b3d9b1b9e44e45222cd8389f6 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 12:26:13 +0800 Subject: [PATCH 16/37] z Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 52 ++++++++++++++------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 42ae171caeb..cf2a4c1d9bf 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -520,35 +520,37 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca return; std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); - if (type == ReportThreadAllocateInfoType::Reset) - { - memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); - } - else if (type == ReportThreadAllocateInfoType::Remove) - { - memory_allocation_map.erase(tname); - } - else if (type == ReportThreadAllocateInfoType::AllocPtr) - { - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) + switch (type) { + case ReportThreadAllocateInfoType::Reset: + memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); + break; + case ReportThreadAllocateInfoType::Remove: + memory_allocation_map.erase(tname); + break; + case ReportThreadAllocateInfoType::AllocPtr: { - return; + if (value == 0) + return; + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) + { + return; + } + it->second.allocated_ptr = value; + break; } - it->second.allocated_ptr = value; - } - else if (type == ReportThreadAllocateInfoType::DeallocPtr) - { - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) + case ReportThreadAllocateInfoType::DeallocPtr: { - return; + if (value == 0) + return; + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) + { + return; + } + it->second.deallocated_ptr = value; + break; } - it->second.deallocated_ptr = value; } } From 22f54b2797053849f2e98c73425239355b34970c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 12:29:38 +0800 Subject: [PATCH 17/37] update proxy Signed-off-by: CalvinNeo --- contrib/tiflash-proxy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index c58aa770b59..5f1958ea684 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit c58aa770b59ae446131fa73e5f0c01dfffc8f5f4 +Subproject commit 5f1958ea6840550e2ea6ddb190b95814af692ce7 From 54635f6857209b2d88065f23e9373c39fbf303a3 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 15 Mar 2024 15:33:56 +0800 Subject: [PATCH 18/37] Update dbms/src/Server/Server.cpp Co-authored-by: JaySon --- dbms/src/Server/Server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 3adf764b567..fa62eee5c3e 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1714,7 +1714,7 @@ int Server::main(const std::vector & /*args*/) LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen"); exit(-1); } - LOG_INFO(log, "Stop collecting metrics"); + LOG_INFO(log, "Stop collecting thread alloc metrics"); tmt_context.getKVStore()->stopThreadAllocInfo(); LOG_INFO(log, "Set store context status Stopping"); tmt_context.setStatusStopping(); From 1f5b6b19c25d8038847eec25eb2803b2a0889814 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 15:46:08 +0800 Subject: [PATCH 19/37] add tests Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.cpp | 6 ++ dbms/src/Common/TiFlashMetrics.h | 1 + dbms/src/Storages/KVStore/KVStore.cpp | 66 ++++++++----------- .../KVStore/tests/gtest_new_kvstore.cpp | 46 +++++++++++++ .../KVStore/tests/gtest_read_index_worker.cpp | 13 ++-- 5 files changed, 91 insertions(+), 41 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index e85cb569413..df25bf8f381 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -98,4 +98,10 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id) registered_keyspace_sync_replica_ru_family->Remove(itr->second); registered_keyspace_sync_replica_ru.erase(itr); } + +double TiFlashMetrics::getProxyThreadMemory(const std::string & k) +{ + return registered_raft_proxy_thread_memory_usage_metrics[k]->Value(); +} + } // namespace DB diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index fad1c3ae86f..748c54900ea 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -1089,6 +1089,7 @@ class TiFlashMetrics void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru); UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id); + double getProxyThreadMemory(const std::string & k); private: friend class KVStore; diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index cf2a4c1d9bf..7d9edf632c0 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -495,15 +495,6 @@ RegionTaskLock KVStore::genRegionTaskLock(UInt64 region_id) const return region_manager.genRegionTaskLock(region_id); } -static std::string getThreadNameAggPrefix(const std::string & s) -{ - if (auto pos = s.find_last_of('-'); pos != std::string::npos) - { - return s.substr(0, pos); - } - return s; -} - static std::string getThreadNameAggPrefix(const std::string_view & s) { if (auto pos = s.find_last_of('-'); pos != std::string::npos) @@ -520,37 +511,38 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca return; std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); - switch (type) { - case ReportThreadAllocateInfoType::Reset: - memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); - break; - case ReportThreadAllocateInfoType::Remove: - memory_allocation_map.erase(tname); - break; - case ReportThreadAllocateInfoType::AllocPtr: + switch (type) + { + case ReportThreadAllocateInfoType::Reset: + memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); + break; + case ReportThreadAllocateInfoType::Remove: + memory_allocation_map.erase(tname); + break; + case ReportThreadAllocateInfoType::AllocPtr: + { + if (value == 0) + return; + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) { - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) - { - return; - } - it->second.allocated_ptr = value; - break; + return; } - case ReportThreadAllocateInfoType::DeallocPtr: + it->second.allocated_ptr = value; + break; + } + case ReportThreadAllocateInfoType::DeallocPtr: + { + if (value == 0) + return; + auto it = memory_allocation_map.find(tname); + if unlikely (it == memory_allocation_map.end()) { - if (value == 0) - return; - auto it = memory_allocation_map.find(tname); - if unlikely (it == memory_allocation_map.end()) - { - return; - } - it->second.deallocated_ptr = value; - break; + return; } + it->second.deallocated_ptr = value; + break; + } } } @@ -563,7 +555,7 @@ void KVStore::recordThreadAllocInfo() std::unordered_map agg_remaining; for (const auto & [k, v] : memory_allocation_map) { - auto agg_thread_name = getThreadNameAggPrefix(k); + auto agg_thread_name = getThreadNameAggPrefix(std::string_view(k.begin(), k.end())); // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index cddfd30c894..6ff4429e662 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -902,4 +902,50 @@ try } CATCH +TEST_F(RegionKVStoreTest, MemoryTrace) +try +{ + KVStore & kvs = getKVS(); + std::string name = "test1-1"; + kvs.reportThreadAllocBatch( + std::string_view(name.begin(), name.end()), + ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); + auto & tiflash_metrics = TiFlashMetrics::instance(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); + std::thread t([&]() { + kvs.reportThreadAllocInfo(std::string_view(name.begin(), name.end()), ReportThreadAllocateInfoType::Reset, 0); + uint64_t mock = 999; + uint64_t alloc_ptr = reinterpret_cast(&mock); + kvs.reportThreadAllocInfo( + std::string_view(name.begin(), name.end()), + ReportThreadAllocateInfoType::AllocPtr, + alloc_ptr); + kvs.recordThreadAllocInfo(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); + + std::string name2 = "ReadIndexWkr-1"; + kvs.reportThreadAllocInfo(std::string_view(name2.begin(), name2.end()), ReportThreadAllocateInfoType::Reset, 0); + kvs.reportThreadAllocInfo( + std::string_view(name2.begin(), name2.end()), + ReportThreadAllocateInfoType::AllocPtr, + alloc_ptr); + kvs.recordThreadAllocInfo(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("ReadIndexWkr"), 999); + uint64_t mock2 = 998; + uint64_t dealloc_ptr = reinterpret_cast(&mock2); + kvs.reportThreadAllocInfo( + std::string_view(name2.begin(), name2.end()), + ReportThreadAllocateInfoType::DeallocPtr, + dealloc_ptr); + kvs.recordThreadAllocInfo(); + ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("ReadIndexWkr"), 1); + kvs.reportThreadAllocInfo( + std::string_view(name2.begin(), name2.end()), + ReportThreadAllocateInfoType::Remove, + 0); + }); + t.join(); +} +CATCH + } // namespace DB::tests diff --git a/dbms/src/Storages/KVStore/tests/gtest_read_index_worker.cpp b/dbms/src/Storages/KVStore/tests/gtest_read_index_worker.cpp index aafc92ba48b..83ef7c29646 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_read_index_worker.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_read_index_worker.cpp @@ -53,14 +53,15 @@ class ReadIndexTest : public ::testing::Test void ReadIndexTest::testError() { // test error - + auto & ctx = TiFlashTestEnv::getGlobalContext(); + KVStore kvs = KVStore{ctx}; MockRaftStoreProxy proxy_instance; TiFlashRaftProxyHelper proxy_helper; { proxy_helper = MockRaftStoreProxy::setRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance}); proxy_instance.init(10); } - auto manager = ReadIndexWorkerManager::newReadIndexWorkerManager(proxy_helper, 5, [&]() { + auto manager = ReadIndexWorkerManager::newReadIndexWorkerManager(proxy_helper, kvs, 5, [&]() { return std::chrono::milliseconds(10); }); { @@ -274,7 +275,8 @@ void ReadIndexTest::testBasic() void ReadIndexTest::testNormal() { // test normal - + auto & ctx = TiFlashTestEnv::getGlobalContext(); + KVStore kvs = KVStore{ctx}; MockRaftStoreProxy proxy_instance; TiFlashRaftProxyHelper proxy_helper; { @@ -283,6 +285,7 @@ void ReadIndexTest::testNormal() } auto manager = ReadIndexWorkerManager::newReadIndexWorkerManager( proxy_helper, + kvs, 5, [&]() { return std::chrono::milliseconds(10); }, 3); @@ -461,13 +464,15 @@ void ReadIndexTest::testNormal() void ReadIndexTest::testBatch() { // test batch + auto & ctx = TiFlashTestEnv::getGlobalContext(); + KVStore kvs = KVStore{ctx}; MockRaftStoreProxy proxy_instance; TiFlashRaftProxyHelper proxy_helper; { proxy_helper = MockRaftStoreProxy::setRaftStoreProxyFFIHelper(RaftStoreProxyPtr{&proxy_instance}); proxy_instance.init(10); } - auto manager = ReadIndexWorkerManager::newReadIndexWorkerManager(proxy_helper, 5, [&]() { + auto manager = ReadIndexWorkerManager::newReadIndexWorkerManager(proxy_helper, kvs, 5, [&]() { return std::chrono::milliseconds(10); }); // DO NOT run manager and mock proxy in other threads. From c99a30633c52b3e3c1dd7561a8b0a996f5fea6d5 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 16:27:18 +0800 Subject: [PATCH 20/37] fix Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.cpp | 18 +++++++++++++++++- dbms/src/Common/TiFlashMetrics.h | 4 ++-- dbms/src/Storages/KVStore/KVStore.cpp | 18 ++---------------- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index df25bf8f381..5b3a05ed8a2 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -101,7 +101,23 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id) double TiFlashMetrics::getProxyThreadMemory(const std::string & k) { - return registered_raft_proxy_thread_memory_usage_metrics[k]->Value(); + std::unique_lock lock(proxy_thread_ru_mtx); + auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k); + RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end()); + return it->second->Value(); +} + +void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v) +{ + std::unique_lock lock(proxy_thread_ru_mtx); + if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + // Add new keyspace store usage metric + registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + } + registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); } } // namespace DB diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 748c54900ea..35f8d8d7e46 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -34,7 +34,6 @@ namespace DB { -class KVStore; constexpr size_t RAFT_REGION_BIG_WRITE_THRES = 2 * 1024; constexpr size_t RAFT_REGION_BIG_WRITE_MAX = 4 * 1024 * 1024; // raft-entry-max-size = 8MiB static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Invalid RAFT_REGION_BIG_WRITE_THRES"); @@ -1089,10 +1088,10 @@ class TiFlashMetrics void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru); UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id); + void setProxyThreadMemory(const std::string & k, Int64 v); double getProxyThreadMemory(const std::string & k); private: - friend class KVStore; TiFlashMetrics(); prometheus::Counter * getReplicaSyncRUCounter(UInt32 keyspace_id, std::unique_lock &); @@ -1123,6 +1122,7 @@ class TiFlashMetrics std::unordered_map registered_keyspace_sync_replica_ru; prometheus::Family * registered_raft_proxy_thread_memory_usage_family; + std::mutex proxy_thread_ru_mtx; std::unordered_map registered_raft_proxy_thread_memory_usage_metrics; public: diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 7d9edf632c0..9b057c1f5dc 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -566,14 +566,7 @@ void KVStore::recordThreadAllocInfo() for (const auto & [k, v] : agg_remaining) { auto & tiflash_metrics = TiFlashMetrics::instance(); - if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) - { - // Add new keyspace store usage metric - tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); - } - tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); + tiflash_metrics.setProxyThreadMemory(k, v); } } @@ -587,14 +580,7 @@ void KVStore::reportThreadAllocBatch(std::string_view name, ReportThreadAllocate auto k = getThreadNameAggPrefix(name); int64_t v = static_cast(data.alloc) - static_cast(data.dealloc); auto & tiflash_metrics = TiFlashMetrics::instance(); - if unlikely (!tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) - { - // Add new keyspace store usage metric - tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - &tiflash_metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); - } - tiflash_metrics.registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); + tiflash_metrics.setProxyThreadMemory(k, v); } void KVStore::stopThreadAllocInfo() From 41c865d2797e42c6f8478d9c81e4089011c598a8 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Fri, 15 Mar 2024 17:37:00 +0800 Subject: [PATCH 21/37] Update dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp Co-authored-by: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> --- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 35a460d9114..c159a7eaa35 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1040,12 +1040,8 @@ void ReportThreadAllocateInfo( try { UNUSED(tid); - if (server == nullptr) - return; - if (server->tmt == nullptr) - return; - if (server->tmt->getKVStore() == nullptr) - return; + if (!server || !server->tmt || !server->tmt->getKVStore()) + return; server->tmt->getKVStore()->reportThreadAllocInfo(buffToStrView(name), type, value); } catch (...) From f9c4ed3dc5c1def98ee5593e986e3f03277e6a48 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 18:40:33 +0800 Subject: [PATCH 22/37] f Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index c159a7eaa35..5b66709bff2 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1041,7 +1041,7 @@ void ReportThreadAllocateInfo( { UNUSED(tid); if (!server || !server->tmt || !server->tmt->getKVStore()) - return; + return; server->tmt->getKVStore()->reportThreadAllocInfo(buffToStrView(name), type, value); } catch (...) From 1050cdfb68538cd8bed94e04028f888a6914567e Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 22:05:54 +0800 Subject: [PATCH 23/37] f Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 9b057c1f5dc..a95d9cb6714 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -555,7 +555,7 @@ void KVStore::recordThreadAllocInfo() std::unordered_map agg_remaining; for (const auto & [k, v] : memory_allocation_map) { - auto agg_thread_name = getThreadNameAggPrefix(std::string_view(k.begin(), k.end())); + auto agg_thread_name = getThreadNameAggPrefix(std::string_view(k.data(), k.size())); // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) { From 0f39d2488e2babf65526aaf0c1cc3f665103a924 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Mar 2024 22:42:33 +0800 Subject: [PATCH 24/37] f Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 4 +--- dbms/src/Storages/KVStore/KVStore.h | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 5b66709bff2..6cc8a85943a 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1060,9 +1060,7 @@ void ReportThreadAllocateBatch( try { UNUSED(tid); - if (!server || !server->tmt || !server->tmt->getKVStore()) - return; - server->tmt->getKVStore()->reportThreadAllocBatch(buffToStrView(name), data); + KVStore::reportThreadAllocBatch(buffToStrView(name), data); } catch (...) { diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index ea08f65c2e9..599e788b57f 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -171,7 +171,7 @@ class KVStore final : private boost::noncopyable // Proxy will validate and refit the config items from the toml file. const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } void reportThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); - void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); + static void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); void recordThreadAllocInfo(); void stopThreadAllocInfo(); From 9a5a426181b7d8eae7a162c38287fd661bceb1d6 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 10:45:53 +0800 Subject: [PATCH 25/37] x Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.cpp | 12 +++++------- dbms/src/Storages/KVStore/KVStore.cpp | 10 ++++++++++ dbms/src/Storages/KVStore/KVStore.h | 1 + 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index 5b3a05ed8a2..80e780ec89a 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -101,7 +101,7 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id) double TiFlashMetrics::getProxyThreadMemory(const std::string & k) { - std::unique_lock lock(proxy_thread_ru_mtx); + std::shared_lock lock(proxy_thread_ru_mtx); auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k); RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end()); return it->second->Value(); @@ -109,15 +109,13 @@ double TiFlashMetrics::getProxyThreadMemory(const std::string & k) void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v) { - std::unique_lock lock(proxy_thread_ru_mtx); + std::shared_lock lock(proxy_thread_ru_mtx); if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) { - // Add new keyspace store usage metric - registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + // New metrics added through `Reset`. + return; } registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); } -} // namespace DB +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index a95d9cb6714..936648c7d4d 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -514,6 +514,16 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca switch (type) { case ReportThreadAllocateInfoType::Reset: + { + std::unique_lock lock(proxy_thread_ru_mtx); + if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + // Add new keyspace store usage metric + registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + } + } memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); break; case ReportThreadAllocateInfoType::Remove: diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 599e788b57f..0091d643dca 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -438,6 +438,7 @@ class KVStore final : private boost::noncopyable std::atomic ongoing_prehandle_task_count{0}; ProxyConfigSummary proxy_config_summary; + // TODO: Use CAS+HazPtr to remove the lock. mutable std::shared_mutex memory_allocation_mut; std::unordered_map memory_allocation_map; From d9d141dcfe39d2263aed8743ba2b60fa665d2c98 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 10:48:24 +0800 Subject: [PATCH 26/37] x Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.h | 2 +- dbms/src/Storages/KVStore/KVStore.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 35f8d8d7e46..dedac008ee3 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -1122,7 +1122,7 @@ class TiFlashMetrics std::unordered_map registered_keyspace_sync_replica_ru; prometheus::Family * registered_raft_proxy_thread_memory_usage_family; - std::mutex proxy_thread_ru_mtx; + std::shared_mutex proxy_thread_ru_mtx; std::unordered_map registered_raft_proxy_thread_memory_usage_metrics; public: diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 0091d643dca..6fb6c339fd7 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -438,7 +438,7 @@ class KVStore final : private boost::noncopyable std::atomic ongoing_prehandle_task_count{0}; ProxyConfigSummary proxy_config_summary; - // TODO: Use CAS+HazPtr to remove the lock. + // TODO: Use CAS+HazPtr to remove the lock and proxy_thread_ru_mtx. mutable std::shared_mutex memory_allocation_mut; std::unordered_map memory_allocation_map; From b15004a60306a7d3b226254c9be64ba0f5707c32 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 11:00:14 +0800 Subject: [PATCH 27/37] x Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.h | 1 + dbms/src/Storages/KVStore/KVStore.cpp | 28 ++++++++++++++++++--------- dbms/src/Storages/KVStore/KVStore.h | 1 - 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index dedac008ee3..bcf188018e9 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -1121,6 +1121,7 @@ class TiFlashMetrics std::mutex replica_sync_ru_mtx; std::unordered_map registered_keyspace_sync_replica_ru; + // TODO: Use CAS+HazPtr to remove proxy_thread_ru_mtx. prometheus::Family * registered_raft_proxy_thread_memory_usage_family; std::shared_mutex proxy_thread_ru_mtx; std::unordered_map registered_raft_proxy_thread_memory_usage_metrics; diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 936648c7d4d..0f0092bfb56 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -509,28 +509,37 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca // Many threads have empty name, better just not handle. if (thdname.empty()) return; - std::unique_lock l(memory_allocation_mut); std::string tname(thdname.begin(), thdname.end()); switch (type) { case ReportThreadAllocateInfoType::Reset: { - std::unique_lock lock(proxy_thread_ru_mtx); - if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) { - // Add new keyspace store usage metric - registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + auto & metrics = TiFlashMetrics::instance(); + std::unique_lock lock(metrics.proxy_thread_ru_mtx); + if unlikely (!metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + // Add new keyspace store usage metric + metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + &metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + } + } + { + std::unique_lock l(memory_allocation_mut); + memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); } - } - memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); break; + } case ReportThreadAllocateInfoType::Remove: + { + std::unique_lock l(memory_allocation_mut); memory_allocation_map.erase(tname); break; + } case ReportThreadAllocateInfoType::AllocPtr: { + std::shared_lock l(memory_allocation_mut); if (value == 0) return; auto it = memory_allocation_map.find(tname); @@ -543,6 +552,7 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca } case ReportThreadAllocateInfoType::DeallocPtr: { + std::shared_lock l(memory_allocation_mut); if (value == 0) return; auto it = memory_allocation_map.find(tname); diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 6fb6c339fd7..599e788b57f 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -438,7 +438,6 @@ class KVStore final : private boost::noncopyable std::atomic ongoing_prehandle_task_count{0}; ProxyConfigSummary proxy_config_summary; - // TODO: Use CAS+HazPtr to remove the lock and proxy_thread_ru_mtx. mutable std::shared_mutex memory_allocation_mut; std::unordered_map memory_allocation_map; From a97525b53e829b56a4eda454cd08a9d5075bef1d Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Mon, 18 Mar 2024 11:27:41 +0800 Subject: [PATCH 28/37] Update dbms/src/Common/TiFlashMetrics.cpp Co-authored-by: JaySon --- dbms/src/Common/TiFlashMetrics.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index 80e780ec89a..aa4632ba113 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -103,7 +103,7 @@ double TiFlashMetrics::getProxyThreadMemory(const std::string & k) { std::shared_lock lock(proxy_thread_ru_mtx); auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k); - RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end()); + RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end(), k); return it->second->Value(); } From b45c47ae6783d1bb34ff8333d513b47b0ffa7e7a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 11:40:36 +0800 Subject: [PATCH 29/37] x Signed-off-by: CalvinNeo --- dbms/src/Common/TiFlashMetrics.cpp | 15 +++++++++++++-- dbms/src/Common/TiFlashMetrics.h | 5 +++-- dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp | 1 + dbms/src/Storages/KVStore/KVStore.cpp | 13 ++----------- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/dbms/src/Common/TiFlashMetrics.cpp b/dbms/src/Common/TiFlashMetrics.cpp index 80e780ec89a..96b542a061c 100644 --- a/dbms/src/Common/TiFlashMetrics.cpp +++ b/dbms/src/Common/TiFlashMetrics.cpp @@ -101,7 +101,7 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id) double TiFlashMetrics::getProxyThreadMemory(const std::string & k) { - std::shared_lock lock(proxy_thread_ru_mtx); + std::shared_lock lock(proxy_thread_report_mtx); auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k); RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end()); return it->second->Value(); @@ -109,7 +109,7 @@ double TiFlashMetrics::getProxyThreadMemory(const std::string & k) void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v) { - std::shared_lock lock(proxy_thread_ru_mtx); + std::shared_lock lock(proxy_thread_report_mtx); if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) { // New metrics added through `Reset`. @@ -118,4 +118,15 @@ void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v) registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v); } +void TiFlashMetrics::registerProxyThreadMemory(const std::string & k) +{ + std::unique_lock lock(proxy_thread_report_mtx); + if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k)) + { + registered_raft_proxy_thread_memory_usage_metrics.emplace( + k, + ®istered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); + } +} + } // namespace DB \ No newline at end of file diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index bcf188018e9..27ed6fa60ca 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -1090,6 +1090,7 @@ class TiFlashMetrics UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id); void setProxyThreadMemory(const std::string & k, Int64 v); double getProxyThreadMemory(const std::string & k); + void registerProxyThreadMemory(const std::string & k); private: TiFlashMetrics(); @@ -1121,9 +1122,9 @@ class TiFlashMetrics std::mutex replica_sync_ru_mtx; std::unordered_map registered_keyspace_sync_replica_ru; - // TODO: Use CAS+HazPtr to remove proxy_thread_ru_mtx. + // TODO: Use CAS+HazPtr to remove proxy_thread_report_mtx, or hash some slots here. prometheus::Family * registered_raft_proxy_thread_memory_usage_family; - std::shared_mutex proxy_thread_ru_mtx; + std::shared_mutex proxy_thread_report_mtx; std::unordered_map registered_raft_proxy_thread_memory_usage_metrics; public: diff --git a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp index 6cc8a85943a..f979acce1dc 100644 --- a/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp +++ b/dbms/src/Storages/KVStore/FFI/ProxyFFI.cpp @@ -1059,6 +1059,7 @@ void ReportThreadAllocateBatch( { try { + UNUSED(server); UNUSED(tid); KVStore::reportThreadAllocBatch(buffToStrView(name), data); } diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 0f0092bfb56..e113aa3267c 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -514,17 +514,8 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca { case ReportThreadAllocateInfoType::Reset: { - { - auto & metrics = TiFlashMetrics::instance(); - std::unique_lock lock(metrics.proxy_thread_ru_mtx); - if unlikely (!metrics.registered_raft_proxy_thread_memory_usage_metrics.count(k)) - { - // Add new keyspace store usage metric - metrics.registered_raft_proxy_thread_memory_usage_metrics.emplace( - k, - &metrics.registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}})); - } - } + auto & metrics = TiFlashMetrics::instance(); + metrics.registerProxyThreadMemory(tname); { std::unique_lock l(memory_allocation_mut); memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc()); From 5fa20a1ccb74fbc98c083ee9a0a47d3c13fdbb56 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 12:07:19 +0800 Subject: [PATCH 30/37] df Signed-off-by: CalvinNeo --- dbms/src/Common/{MemoryTrace.h => MemoryAllocTrace.h} | 0 dbms/src/Common/{MemoryTrace.cpp => MemoryAllocrace.cpp} | 2 +- dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename dbms/src/Common/{MemoryTrace.h => MemoryAllocTrace.h} (100%) rename dbms/src/Common/{MemoryTrace.cpp => MemoryAllocrace.cpp} (96%) diff --git a/dbms/src/Common/MemoryTrace.h b/dbms/src/Common/MemoryAllocTrace.h similarity index 100% rename from dbms/src/Common/MemoryTrace.h rename to dbms/src/Common/MemoryAllocTrace.h diff --git a/dbms/src/Common/MemoryTrace.cpp b/dbms/src/Common/MemoryAllocrace.cpp similarity index 96% rename from dbms/src/Common/MemoryTrace.cpp rename to dbms/src/Common/MemoryAllocrace.cpp index 24368e68559..be6b6abeea3 100644 --- a/dbms/src/Common/MemoryTrace.cpp +++ b/dbms/src/Common/MemoryAllocrace.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #ifdef USE_JEMALLOC #include diff --git a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp index 54d56ed1100..443d96320cd 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndexWorker.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include From b384af101d7f3151204ff35ee1fec98657aad3a9 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 13:31:23 +0800 Subject: [PATCH 31/37] df2 Signed-off-by: CalvinNeo --- .../src/Storages/KVStore/tests/gtest_new_kvstore.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 6ff4429e662..a11e56d62c1 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -908,10 +908,16 @@ try KVStore & kvs = getKVS(); std::string name = "test1-1"; kvs.reportThreadAllocBatch( - std::string_view(name.begin(), name.end()), + std::string_view(name.data(), name.size()), ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); auto & tiflash_metrics = TiFlashMetrics::instance(); ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); + std::string namee = ""; + kvs.reportThreadAllocBatch( + std::string_view(namee.data(), namee.size()), + ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); + auto & tiflash_metrics = TiFlashMetrics::instance(); + EXPECT_ANY_THROW(tiflash_metrics.getProxyThreadMemory("")); std::thread t([&]() { kvs.reportThreadAllocInfo(std::string_view(name.begin(), name.end()), ReportThreadAllocateInfoType::Reset, 0); uint64_t mock = 999; @@ -926,7 +932,7 @@ try std::string name2 = "ReadIndexWkr-1"; kvs.reportThreadAllocInfo(std::string_view(name2.begin(), name2.end()), ReportThreadAllocateInfoType::Reset, 0); kvs.reportThreadAllocInfo( - std::string_view(name2.begin(), name2.end()), + std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::AllocPtr, alloc_ptr); kvs.recordThreadAllocInfo(); @@ -934,7 +940,7 @@ try uint64_t mock2 = 998; uint64_t dealloc_ptr = reinterpret_cast(&mock2); kvs.reportThreadAllocInfo( - std::string_view(name2.begin(), name2.end()), + std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::DeallocPtr, dealloc_ptr); kvs.recordThreadAllocInfo(); From 0759459b7c3d957d343f7d29d700c1606a11b0dc Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 13:38:10 +0800 Subject: [PATCH 32/37] fix Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 4 ++-- dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index e113aa3267c..aaf654cce11 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -557,7 +557,7 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca } } -static const std::unordered_set WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"}; +static const std::unordered_set RECORD_WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"}; /// For those everlasting threads, we can directly access their allocatedp/allocatedp. void KVStore::recordThreadAllocInfo() @@ -568,7 +568,7 @@ void KVStore::recordThreadAllocInfo() { auto agg_thread_name = getThreadNameAggPrefix(std::string_view(k.data(), k.size())); // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. - if (WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) + if (RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name)) { auto [it, ok] = agg_remaining.emplace(agg_thread_name, 0); it->second += v.remaining(); diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index a11e56d62c1..4743ba52fc2 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -916,7 +916,6 @@ try kvs.reportThreadAllocBatch( std::string_view(namee.data(), namee.size()), ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); - auto & tiflash_metrics = TiFlashMetrics::instance(); EXPECT_ANY_THROW(tiflash_metrics.getProxyThreadMemory("")); std::thread t([&]() { kvs.reportThreadAllocInfo(std::string_view(name.begin(), name.end()), ReportThreadAllocateInfoType::Reset, 0); From 25170e056ea26ec03e3af12423b219523389076c Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 14:44:41 +0800 Subject: [PATCH 33/37] proxy Signed-off-by: CalvinNeo --- contrib/tiflash-proxy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 5f1958ea684..15b1f5f50a3 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 5f1958ea6840550e2ea6ddb190b95814af692ce7 +Subproject commit 15b1f5f50a3acca291169551bdd67af710b00312 From d3356acf4e1bf1575fd63f71f0b6080bc4197089 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 15:36:50 +0800 Subject: [PATCH 34/37] fix2 Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 4743ba52fc2..1edddceb812 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -918,7 +918,7 @@ try ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); EXPECT_ANY_THROW(tiflash_metrics.getProxyThreadMemory("")); std::thread t([&]() { - kvs.reportThreadAllocInfo(std::string_view(name.begin(), name.end()), ReportThreadAllocateInfoType::Reset, 0); + kvs.reportThreadAllocInfo(std::string_view(name.data(), name.size()), ReportThreadAllocateInfoType::Reset, 0); uint64_t mock = 999; uint64_t alloc_ptr = reinterpret_cast(&mock); kvs.reportThreadAllocInfo( @@ -929,7 +929,7 @@ try ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("test1"), -1); std::string name2 = "ReadIndexWkr-1"; - kvs.reportThreadAllocInfo(std::string_view(name2.begin(), name2.end()), ReportThreadAllocateInfoType::Reset, 0); + kvs.reportThreadAllocInfo(std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::Reset, 0); kvs.reportThreadAllocInfo( std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::AllocPtr, @@ -945,7 +945,7 @@ try kvs.recordThreadAllocInfo(); ASSERT_EQ(tiflash_metrics.getProxyThreadMemory("ReadIndexWkr"), 1); kvs.reportThreadAllocInfo( - std::string_view(name2.begin(), name2.end()), + std::string_view(name2.data(), name2.size()), ReportThreadAllocateInfoType::Remove, 0); }); From 2eead9825934666371b62d392aca1cfbed5a9002 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 16:55:00 +0800 Subject: [PATCH 35/37] fix2 Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 1edddceb812..aa84737ddfc 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -922,7 +922,7 @@ try uint64_t mock = 999; uint64_t alloc_ptr = reinterpret_cast(&mock); kvs.reportThreadAllocInfo( - std::string_view(name.begin(), name.end()), + std::string_view(name.data(), name.size()), ReportThreadAllocateInfoType::AllocPtr, alloc_ptr); kvs.recordThreadAllocInfo(); From b41346b0f4692507301350a77d3eb967b10e4267 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 18:31:08 +0800 Subject: [PATCH 36/37] ft Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index aa84737ddfc..d483c038503 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -907,6 +907,7 @@ try { KVStore & kvs = getKVS(); std::string name = "test1-1"; + kvs.reportThreadAllocInfo(std::string_view(name.data(), name.size()), ReportThreadAllocateInfoType::Reset, 0); kvs.reportThreadAllocBatch( std::string_view(name.data(), name.size()), ReportThreadAllocateInfoBatch{.alloc = 1, .dealloc = 2}); From a9aaafac6f0c21c1a50a9f84b9fec10dd380e789 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Mar 2024 18:46:48 +0800 Subject: [PATCH 37/37] ft Signed-off-by: CalvinNeo --- dbms/src/Storages/KVStore/KVStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index aaf654cce11..364aa30e149 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -515,7 +515,7 @@ void KVStore::reportThreadAllocInfo(std::string_view thdname, ReportThreadAlloca case ReportThreadAllocateInfoType::Reset: { auto & metrics = TiFlashMetrics::instance(); - metrics.registerProxyThreadMemory(tname); + metrics.registerProxyThreadMemory(getThreadNameAggPrefix(tname)); { std::unique_lock l(memory_allocation_mut); memory_allocation_map.insert_or_assign(tname, ThreadInfoJealloc());