diff --git a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp index b0d860b1ae3..b1fd26eb16b 100644 --- a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp +++ b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp @@ -149,7 +149,7 @@ void JointThreadInfoJeallocMap::recordThreadAllocInfoForProxy() { auto agg_thread_name = getThreadNameAggPrefix(k, '-'); // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. - if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.has_ptr()) + if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.hasPtr()) { agg_allocate[agg_thread_name] += v.allocated(); agg_deallocate[agg_thread_name] += v.deallocated(); @@ -157,7 +157,6 @@ void JointThreadInfoJeallocMap::recordThreadAllocInfoForProxy() } } - // TODO: maybe we should move the following code to `recordThreadAllocInfo` auto & tiflash_metrics = TiFlashMetrics::instance(); for (const auto & [k, v] : agg_allocate) { @@ -236,27 +235,30 @@ void JointThreadInfoJeallocMap::reportThreadAllocInfoForStorage( void JointThreadInfoJeallocMap::recordThreadAllocInfoForStorage() { - std::shared_lock l(memory_allocation_mut); std::unordered_map agg_allocate; std::unordered_map agg_deallocate; - for (const auto & [k, v] : storage_map) + { - auto agg_thread_name = getThreadNameAggPrefix(k, v.aggregate_delimer); - // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. - if (v.has_ptr()) + std::shared_lock l(memory_allocation_mut); + for (const auto & [k, v] : storage_map) { - agg_allocate[agg_thread_name] += v.allocated(); - agg_deallocate[agg_thread_name] += v.deallocated(); + auto agg_thread_name = getThreadNameAggPrefix(k, v.aggregate_delimer); + // Some thread may have shorter lifetime, we can't use this timed task here to upgrade. + if (v.hasPtr()) + { + agg_allocate[agg_thread_name] += v.allocated(); + agg_deallocate[agg_thread_name] += v.deallocated(); + } } } + + auto & tiflash_metrics = TiFlashMetrics::instance(); for (const auto & [k, v] : agg_allocate) { - auto & tiflash_metrics = TiFlashMetrics::instance(); tiflash_metrics.setStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, v); } for (const auto & [k, v] : agg_deallocate) { - auto & tiflash_metrics = TiFlashMetrics::instance(); tiflash_metrics.setStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, v); } } diff --git a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h index ab921259b04..7d4a500a4e2 100644 --- a/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h +++ b/dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include namespace DB @@ -30,14 +31,14 @@ struct ReportThreadAllocateInfoBatch; struct ThreadInfoJealloc { - ThreadInfoJealloc(char aggregate_delimer_) + explicit ThreadInfoJealloc(char aggregate_delimer_) : aggregate_delimer(aggregate_delimer_) {} char aggregate_delimer = '-'; uint64_t allocated_ptr{0}; uint64_t deallocated_ptr{0}; - bool has_ptr() const { return allocated_ptr != 0 && deallocated_ptr != 0; } + bool hasPtr() const { return allocated_ptr != 0 && deallocated_ptr != 0; } uint64_t allocated() const { diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 66fe28c0832..f39ab86f8a8 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -23,6 +23,7 @@ #include #include #include +#include // Included for `USE_JEMALLOC` #include @@ -1056,6 +1057,7 @@ try } CATCH +#if USE_JEMALLOC // following tests depends on jemalloc TEST(FFIJemallocTest, JemallocThread) try { @@ -1063,16 +1065,16 @@ try char * a = new char[888888]; std::thread t1([&]() { auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); - ASSERT(allocated != 0); + ASSERT_TRUE(allocated != nullptr); ASSERT_EQ(*allocated, 0); - ASSERT(deallocated != 0); + ASSERT_TRUE(deallocated != nullptr); ASSERT_EQ(*deallocated, 0); }); t1.join(); auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs(); - ASSERT(allocated != 0); + ASSERT_TRUE(allocated != nullptr); ASSERT_GE(*allocated, 888888); - ASSERT(deallocated != 0); + ASSERT_TRUE(deallocated != nullptr); delete[] a; }); t2.join(); @@ -1084,37 +1086,43 @@ try { using namespace std::chrono_literals; auto & ctx = TiFlashTestEnv::getGlobalContext(); - auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE; - std::atomic_bool b; auto & pool = ctx.getBackgroundPool(); + const auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE; + std::atomic_bool b = false; auto t = pool.addTask( [&]() { auto * x = new int[1000]; + LOG_INFO(Logger::get(), "allocated"); while (!b.load()) { std::this_thread::sleep_for(1500ms); } delete[] x; + LOG_INFO(Logger::get(), "released"); return false; }, false, 5 * 60 * 1000); std::this_thread::sleep_for(500ms); + JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap(); jm.recordThreadAllocInfo(); - LOG_INFO(DB::Logger::get(), "size {}", size); + LOG_INFO(DB::Logger::get(), "bg pool size={}", size); UInt64 r = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg"); ASSERT_GE(r, sizeof(int) * 1000); jm.accessStorageMap([size](const JointThreadInfoJeallocMap::AllocMap & m) { // There are some other bg thread pools - ASSERT_GE(m.size(), size); + ASSERT_GE(m.size(), size) << m.size(); }); jm.accessProxyMap([](const JointThreadInfoJeallocMap::AllocMap & m) { ASSERT_EQ(m.size(), 0); }); + b.store(true); + ctx.getBackgroundPool().removeTask(t); } CATCH +#endif TEST(ProxyMode, Normal) try diff --git a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp index 02a67dda4ef..e905ac701e2 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp @@ -956,7 +956,7 @@ try while (kvs.getOngoingPrehandleTaskCount() != 3) { loop += 1; - ASSERT(loop < 30); + ASSERT_TRUE(loop < 30); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } ASSERT_EQ(kvs.prehandling_trace.ongoing_prehandle_subtask_count.load(), 3);