Skip to content

Commit

Permalink
tests: Disable some tests when jemalloc is not enabled (#9093)
Browse files Browse the repository at this point in the history
ref #8835, close #9090

Disable some tests when jemalloc is not enabled
  • Loading branch information
JaySon-Huang authored May 27, 2024
1 parent a4fe694 commit 7fd8d05
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
24 changes: 13 additions & 11 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,14 @@ void JointThreadInfoJeallocMap::recordThreadAllocInfoForProxy()
{
auto agg_thread_name = getThreadNameAggPrefix(k, '-');
// Some thread may have shorter lifetime, we can't use this timed task here to upgrade.
if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.has_ptr())
if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.hasPtr())
{
agg_allocate[agg_thread_name] += v.allocated();
agg_deallocate[agg_thread_name] += v.deallocated();
}
}
}

// TODO: maybe we should move the following code to `recordThreadAllocInfo`
auto & tiflash_metrics = TiFlashMetrics::instance();
for (const auto & [k, v] : agg_allocate)
{
Expand Down Expand Up @@ -236,27 +235,30 @@ void JointThreadInfoJeallocMap::reportThreadAllocInfoForStorage(

void JointThreadInfoJeallocMap::recordThreadAllocInfoForStorage()
{
std::shared_lock l(memory_allocation_mut);
std::unordered_map<std::string, uint64_t> agg_allocate;
std::unordered_map<std::string, uint64_t> agg_deallocate;
for (const auto & [k, v] : storage_map)

{
auto agg_thread_name = getThreadNameAggPrefix(k, v.aggregate_delimer);
// Some thread may have shorter lifetime, we can't use this timed task here to upgrade.
if (v.has_ptr())
std::shared_lock l(memory_allocation_mut);
for (const auto & [k, v] : storage_map)
{
agg_allocate[agg_thread_name] += v.allocated();
agg_deallocate[agg_thread_name] += v.deallocated();
auto agg_thread_name = getThreadNameAggPrefix(k, v.aggregate_delimer);
// Some thread may have shorter lifetime, we can't use this timed task here to upgrade.
if (v.hasPtr())
{
agg_allocate[agg_thread_name] += v.allocated();
agg_deallocate[agg_thread_name] += v.deallocated();
}
}
}

auto & tiflash_metrics = TiFlashMetrics::instance();
for (const auto & [k, v] : agg_allocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, v);
}
for (const auto & [k, v] : agg_deallocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, v);
}
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <shared_mutex>
#include <thread>
#include <unordered_map>

namespace DB
Expand All @@ -30,14 +31,14 @@ struct ReportThreadAllocateInfoBatch;

struct ThreadInfoJealloc
{
ThreadInfoJealloc(char aggregate_delimer_)
explicit ThreadInfoJealloc(char aggregate_delimer_)
: aggregate_delimer(aggregate_delimer_)
{}
char aggregate_delimer = '-';
uint64_t allocated_ptr{0};
uint64_t deallocated_ptr{0};

bool has_ptr() const { return allocated_ptr != 0 && deallocated_ptr != 0; }
bool hasPtr() const { return allocated_ptr != 0 && deallocated_ptr != 0; }

uint64_t allocated() const
{
Expand Down
24 changes: 16 additions & 8 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/RegionQueryInfo.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
#include <common/config_common.h> // Included for `USE_JEMALLOC`

#include <limits>

Expand Down Expand Up @@ -1056,23 +1057,24 @@ try
}
CATCH

#if USE_JEMALLOC // following tests depends on jemalloc
TEST(FFIJemallocTest, JemallocThread)
try
{
std::thread t2([&]() {
char * a = new char[888888];
std::thread t1([&]() {
auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs();
ASSERT(allocated != 0);
ASSERT_TRUE(allocated != nullptr);
ASSERT_EQ(*allocated, 0);
ASSERT(deallocated != 0);
ASSERT_TRUE(deallocated != nullptr);
ASSERT_EQ(*deallocated, 0);
});
t1.join();
auto [allocated, deallocated] = JointThreadInfoJeallocMap::getPtrs();
ASSERT(allocated != 0);
ASSERT_TRUE(allocated != nullptr);
ASSERT_GE(*allocated, 888888);
ASSERT(deallocated != 0);
ASSERT_TRUE(deallocated != nullptr);
delete[] a;
});
t2.join();
Expand All @@ -1084,37 +1086,43 @@ try
{
using namespace std::chrono_literals;
auto & ctx = TiFlashTestEnv::getGlobalContext();
auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE;
std::atomic_bool b;
auto & pool = ctx.getBackgroundPool();
const auto size = TiFlashTestEnv::DEFAULT_BG_POOL_SIZE;
std::atomic_bool b = false;
auto t = pool.addTask(
[&]() {
auto * x = new int[1000];
LOG_INFO(Logger::get(), "allocated");
while (!b.load())
{
std::this_thread::sleep_for(1500ms);
}
delete[] x;
LOG_INFO(Logger::get(), "released");
return false;
},
false,
5 * 60 * 1000);
std::this_thread::sleep_for(500ms);

JointThreadInfoJeallocMap & jm = *ctx.getJointThreadInfoJeallocMap();
jm.recordThreadAllocInfo();

LOG_INFO(DB::Logger::get(), "size {}", size);
LOG_INFO(DB::Logger::get(), "bg pool size={}", size);
UInt64 r = TiFlashMetrics::instance().getStorageThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, "bg");
ASSERT_GE(r, sizeof(int) * 1000);
jm.accessStorageMap([size](const JointThreadInfoJeallocMap::AllocMap & m) {
// There are some other bg thread pools
ASSERT_GE(m.size(), size);
ASSERT_GE(m.size(), size) << m.size();
});
jm.accessProxyMap([](const JointThreadInfoJeallocMap::AllocMap & m) { ASSERT_EQ(m.size(), 0); });

b.store(true);

ctx.getBackgroundPool().removeTask(t);
}
CATCH
#endif

TEST(ProxyMode, Normal)
try
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ try
while (kvs.getOngoingPrehandleTaskCount() != 3)
{
loop += 1;
ASSERT(loop < 30);
ASSERT_TRUE(loop < 30);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
ASSERT_EQ(kvs.prehandling_trace.ongoing_prehandle_subtask_count.load(), 3);
Expand Down

0 comments on commit 7fd8d05

Please sign in to comment.