Skip to content

Commit

Permalink
KVStore: Expand the usage of thread-wise alloc/dealloc trace (#9003)
Browse files Browse the repository at this point in the history
close #8835
  • Loading branch information
CalvinNeo authored May 10, 2024
1 parent bddd270 commit 22417fd
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 186 deletions.
44 changes: 35 additions & 9 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,59 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id)
registered_keyspace_sync_replica_ru.erase(itr);
}

double TiFlashMetrics::getProxyThreadMemory(const std::string & k)
static std::string genPrefix(TiFlashMetrics::MemoryAllocType type, const std::string & k)
{
if (type == TiFlashMetrics::MemoryAllocType::Alloc)
{
return "alloc_" + k;
}
else
{
return "dealloc_" + k;
}
}

double TiFlashMetrics::getProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k)
{
std::shared_lock lock(proxy_thread_report_mtx);
auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k);

auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k));
RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end(), k);
return it->second->Value();
}

void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v)
void TiFlashMetrics::setProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k, Int64 v)
{
std::shared_lock lock(proxy_thread_report_mtx);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k))
auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k));
if unlikely (it == registered_raft_proxy_thread_memory_usage_metrics.end())
{
// New metrics added through `Reset`.
return;
}
registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v);
it->second->Set(v);
}

void TiFlashMetrics::registerProxyThreadMemory(const std::string & k)
{
std::unique_lock lock(proxy_thread_report_mtx);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
k,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}}));
auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Alloc, k);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
prefix,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
{
auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Dealloc, k);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
prefix,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1114,8 +1114,13 @@ class TiFlashMetrics

void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru);
UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id);
void setProxyThreadMemory(const std::string & k, Int64 v);
double getProxyThreadMemory(const std::string & k);
enum class MemoryAllocType
{
Alloc = 1,
Dealloc = 2,
};
void setProxyThreadMemory(MemoryAllocType type, const std::string & k, Int64 v);
double getProxyThreadMemory(MemoryAllocType type, const std::string & k);
void registerProxyThreadMemory(const std::string & k);

private:
Expand Down
21 changes: 20 additions & 1 deletion dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <IO/BaseFile/fwd.h>
#include <IO/Buffer/ReadBufferFromFile.h>
#include <IO/FileProvider/FileProvider.h>
#include <Interpreters/Context.h>
#include <Interpreters/ISecurityManager.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
Expand Down Expand Up @@ -60,6 +59,7 @@
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/KVStore/BackgroundService.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/PageConstants.h>
Expand Down Expand Up @@ -180,6 +180,8 @@ struct ContextShared

/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map

class SessionKeyHash
{
public:
Expand Down Expand Up @@ -278,6 +280,11 @@ struct ContextShared
tmt_context->shutdown();
}

if (joint_memory_allocation_map)
{
joint_memory_allocation_map->stopThreadAllocInfo();
}

if (schema_sync_service)
{
schema_sync_service = nullptr;
Expand Down Expand Up @@ -1753,6 +1760,18 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}


void Context::initializeJointThreadInfoJeallocMap()
{
auto lock = getLock();
shared->joint_memory_allocation_map = std::make_shared<JointThreadInfoJeallocMap>();
}

JointThreadInfoJeallocMapPtr Context::getJointThreadInfoJeallocMap() const
{
return shared->joint_memory_allocation_map;
}

/**
* This PageStorage is initialized in two cases:
* 1. Not in disaggregated mode.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ using MockMPPServerInfo = DB::tests::MockMPPServerInfo;
class TiFlashSecurityConfig;
using TiFlashSecurityConfigPtr = std::shared_ptr<TiFlashSecurityConfig>;
class MockStorage;
struct JointThreadInfoJeallocMap;
using JointThreadInfoJeallocMapPtr = std::shared_ptr<JointThreadInfoJeallocMap>;

enum class PageStorageRunMode : UInt8;
namespace DM
Expand Down Expand Up @@ -458,6 +460,9 @@ class Context
bool tryUploadAllDataToRemoteStore() const;
void tryReleaseWriteNodePageStorageForTest();

void initializeJointThreadInfoJeallocMap();
JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const;

SharedContextDisaggPtr getSharedContextDisagg() const;

/// Call after initialization before using system logs. Call for global context.
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;

// Must init this before KVStore.
global_context->initializeJointThreadInfoJeallocMap();

/// Init File Provider
if (proxy_conf.is_proxy_runnable)
{
Expand Down Expand Up @@ -1717,8 +1720,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen");
exit(-1);
}
LOG_INFO(log, "Stop collecting thread alloc metrics");
tmt_context.getKVStore()->stopThreadAllocInfo();
LOG_INFO(log, "Set store context status Stopping");
tmt_context.setStatusStopping();
{
Expand Down
192 changes: 192 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>

#include <mutex>
#include <thread>
#include <unordered_set>

namespace DB
{

JointThreadInfoJeallocMap::JointThreadInfoJeallocMap()
{
monitoring_thread = new std::thread([&]() {
setThreadName("ThreadMemoryTracer");
while (true)
{
using namespace std::chrono_literals;
std::unique_lock l(monitoring_mut);
monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; });
if (is_terminated)
return;
recordThreadAllocInfo();
}
});
}

void JointThreadInfoJeallocMap::recordThreadAllocInfo()
{
recordThreadAllocInfoForKVStore();
}

JointThreadInfoJeallocMap::~JointThreadInfoJeallocMap()
{
stopThreadAllocInfo();
}

static std::string getThreadNameAggPrefix(const std::string_view & s)
{
if (auto pos = s.find_last_of('-'); pos != std::string::npos)
{
return std::string(s.begin(), s.begin() + pos);
}
return std::string(s.begin(), s.end());
}

void JointThreadInfoJeallocMap::reportThreadAllocInfoImpl(
std::unordered_map<std::string, ThreadInfoJealloc> & m,
std::string_view thdname,
ReportThreadAllocateInfoType type,
uint64_t value)
{
// Many threads have empty name, better just not handle.
if (thdname.empty())
return;
std::string tname(thdname.begin(), thdname.end());
switch (type)
{
case ReportThreadAllocateInfoType::Reset:
{
auto & metrics = TiFlashMetrics::instance();
metrics.registerProxyThreadMemory(getThreadNameAggPrefix(tname));
{
std::unique_lock l(memory_allocation_mut);
m.insert_or_assign(tname, ThreadInfoJealloc());
}
break;
}
case ReportThreadAllocateInfoType::Remove:
{
std::unique_lock l(memory_allocation_mut);
m.erase(tname);
break;
}
case ReportThreadAllocateInfoType::AllocPtr:
{
std::shared_lock l(memory_allocation_mut);
if (value == 0)
return;
auto it = m.find(tname);
if unlikely (it == m.end())
{
return;
}
it->second.allocated_ptr = value;
break;
}
case ReportThreadAllocateInfoType::DeallocPtr:
{
std::shared_lock l(memory_allocation_mut);
if (value == 0)
return;
auto it = m.find(tname);
if unlikely (it == m.end())
{
return;
}
it->second.deallocated_ptr = value;
break;
}
}
}

void JointThreadInfoJeallocMap::reportThreadAllocInfoForKVStore(
std::string_view thdname,
ReportThreadAllocateInfoType type,
uint64_t value)
{
reportThreadAllocInfoImpl(kvstore_map, thdname, type, value);
}

static const std::unordered_set<std::string> PROXY_RECORD_WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"};

void JointThreadInfoJeallocMap::recordThreadAllocInfoForKVStore()
{
std::shared_lock l(memory_allocation_mut);
std::unordered_map<std::string, uint64_t> agg_allocate;
std::unordered_map<std::string, uint64_t> agg_deallocate;
for (const auto & [k, v] : kvstore_map)
{
auto agg_thread_name = getThreadNameAggPrefix(k);
// Some thread may have shorter lifetime, we can't use this timed task here to upgrade.
if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.has_ptr())
{
agg_allocate[agg_thread_name] += v.allocated();
agg_deallocate[agg_thread_name] += v.deallocated();
}
}
for (const auto & [k, v] : agg_allocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, v);
}
for (const auto & [k, v] : agg_deallocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, v);
}
}

void JointThreadInfoJeallocMap::reportThreadAllocBatchForKVStore(
std::string_view name,
ReportThreadAllocateInfoBatch data)
{
// Many threads have empty name, better just not handle.
if (name.empty())
return;
// TODO(jemalloc-trace) Could be costy.
auto k = getThreadNameAggPrefix(name);
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, data.alloc);
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, data.dealloc);
}

void JointThreadInfoJeallocMap::stopThreadAllocInfo()
{
LOG_INFO(DB::Logger::get(), "Stop collecting thread alloc metrics");
{
std::unique_lock lk(monitoring_mut);
// Only one caller can successfully stop the thread.
if (is_terminated)
return;
if (monitoring_thread == nullptr)
return;
is_terminated = true;
monitoring_cv.notify_all();
}
LOG_INFO(DB::Logger::get(), "JointThreadInfoJeallocMap shutdown, wait thread alloc monitor join");
monitoring_thread->join();
{
std::unique_lock lk(monitoring_mut);
delete monitoring_thread;
monitoring_thread = nullptr;
}
}

} // namespace DB
Loading

0 comments on commit 22417fd

Please sign in to comment.