Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Expand the usage of thread-wise alloc/dealloc trace #9003

Merged
merged 31 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
438168b
use alloc and dealloc
CalvinNeo Apr 29, 2024
edb381b
introduce JointThreadAllocInfo
CalvinNeo Apr 29, 2024
a83a2a7
introduce JointThreadAllocInfo
CalvinNeo Apr 29, 2024
a0592ec
fmt
CalvinNeo Apr 29, 2024
2ffec8f
a
CalvinNeo Apr 29, 2024
45f4c28
a
CalvinNeo Apr 29, 2024
ebe343f
a
CalvinNeo Apr 29, 2024
33b3f45
Merge branch 'master' into monitor-memory-of-tiflash
purelind Apr 30, 2024
02f142e
z
CalvinNeo Apr 30, 2024
cc4015e
a
CalvinNeo Apr 30, 2024
8956376
Merge branch 'monitor-memory-of-tiflash' of github.com:CalvinNeo/tics…
CalvinNeo Apr 30, 2024
aaf6025
a
CalvinNeo Apr 30, 2024
14ac9d1
aaaa
CalvinNeo Apr 30, 2024
be868dc
fix some
CalvinNeo Apr 30, 2024
2828e95
ffff
CalvinNeo Apr 30, 2024
2ddc812
ffff
CalvinNeo Apr 30, 2024
977f4af
Update dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
CalvinNeo Apr 30, 2024
adeb38b
a
CalvinNeo Apr 30, 2024
ae0fb60
Merge branch 'monitor-memory-of-tiflash' of github.com:CalvinNeo/tics…
CalvinNeo Apr 30, 2024
b701584
a
CalvinNeo May 9, 2024
6651de5
Merge branch 'master' into monitor-memory-of-tiflash
CalvinNeo May 9, 2024
0bee517
all
CalvinNeo May 9, 2024
ad9b465
Merge branch 'monitor-memory-of-tiflash' of github.com:CalvinNeo/tics…
CalvinNeo May 9, 2024
6b0ad11
aaaaa
CalvinNeo May 10, 2024
ce9aa6c
aaaaa
CalvinNeo May 10, 2024
7537279
Merge branch 'master' into monitor-memory-of-tiflash
CalvinNeo May 10, 2024
5b218f4
a
CalvinNeo May 10, 2024
b2b9ff7
Merge branch 'monitor-memory-of-tiflash' of github.com:CalvinNeo/tics…
CalvinNeo May 10, 2024
7ae1793
nullptr
CalvinNeo May 10, 2024
3aaf244
Update dbms/src/Common/TiFlashMetrics.cpp
CalvinNeo May 10, 2024
64dfe3c
Update dbms/src/Common/TiFlashMetrics.cpp
CalvinNeo May 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,33 +99,59 @@ void TiFlashMetrics::removeReplicaSyncRUCounter(UInt32 keyspace_id)
registered_keyspace_sync_replica_ru.erase(itr);
}

double TiFlashMetrics::getProxyThreadMemory(const std::string & k)
static std::string genPrefix(TiFlashMetrics::MemoryAllocType type, const std::string & k)
{
if (type == TiFlashMetrics::MemoryAllocType::Alloc)
{
return "alloc_" + k;
}
else
{
return "dealloc_" + k;
}
}

double TiFlashMetrics::getProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k)
{
std::shared_lock lock(proxy_thread_report_mtx);
auto it = registered_raft_proxy_thread_memory_usage_metrics.find(k);

auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k));
RUNTIME_CHECK(it != registered_raft_proxy_thread_memory_usage_metrics.end(), k);
return it->second->Value();
}

void TiFlashMetrics::setProxyThreadMemory(const std::string & k, Int64 v)
void TiFlashMetrics::setProxyThreadMemory(TiFlashMetrics::MemoryAllocType type, const std::string & k, Int64 v)
{
std::shared_lock lock(proxy_thread_report_mtx);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k))
auto it = registered_raft_proxy_thread_memory_usage_metrics.find(genPrefix(type, k));
if unlikely (it == registered_raft_proxy_thread_memory_usage_metrics.end())
{
// New metrics added through `Reset`.
return;
}
registered_raft_proxy_thread_memory_usage_metrics[k]->Set(v);
it->second->Set(v);
}

void TiFlashMetrics::registerProxyThreadMemory(const std::string & k)
{
std::unique_lock lock(proxy_thread_report_mtx);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.count(k))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
k,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", k}}));
auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Alloc, k);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
prefix,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
{
auto prefix = genPrefix(TiFlashMetrics::MemoryAllocType::Dealloc, k);
if unlikely (!registered_raft_proxy_thread_memory_usage_metrics.contains(prefix))
{
registered_raft_proxy_thread_memory_usage_metrics.emplace(
prefix,
&registered_raft_proxy_thread_memory_usage_family->Add({{"type", prefix}}));
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1114,8 +1114,13 @@ class TiFlashMetrics

void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru);
UInt64 debugQueryReplicaSyncRU(UInt32 keyspace_id);
void setProxyThreadMemory(const std::string & k, Int64 v);
double getProxyThreadMemory(const std::string & k);
enum class MemoryAllocType
{
Alloc = 1,
Dealloc = 2,
};
void setProxyThreadMemory(MemoryAllocType type, const std::string & k, Int64 v);
double getProxyThreadMemory(MemoryAllocType type, const std::string & k);
void registerProxyThreadMemory(const std::string & k);

private:
Expand Down
21 changes: 20 additions & 1 deletion dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <IO/BaseFile/fwd.h>
#include <IO/Buffer/ReadBufferFromFile.h>
#include <IO/FileProvider/FileProvider.h>
#include <Interpreters/Context.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this line is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, I will add that back. however to my surprise, it compiles

#include <Interpreters/ISecurityManager.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
Expand Down Expand Up @@ -60,6 +59,7 @@
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
#include <Storages/IStorage.h>
#include <Storages/KVStore/BackgroundService.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/PageConstants.h>
Expand Down Expand Up @@ -180,6 +180,8 @@ struct ContextShared

/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.

JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map

class SessionKeyHash
{
public:
Expand Down Expand Up @@ -278,6 +280,11 @@ struct ContextShared
tmt_context->shutdown();
}

if (joint_memory_allocation_map)
{
joint_memory_allocation_map->stopThreadAllocInfo();
}

if (schema_sync_service)
{
schema_sync_service = nullptr;
Expand Down Expand Up @@ -1753,6 +1760,18 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}


void Context::initializeJointThreadInfoJeallocMap()
{
auto lock = getLock();
shared->joint_memory_allocation_map = std::make_shared<JointThreadInfoJeallocMap>();
}

JointThreadInfoJeallocMapPtr Context::getJointThreadInfoJeallocMap() const
{
return shared->joint_memory_allocation_map;
}

/**
* This PageStorage is initialized in two cases:
* 1. Not in disaggregated mode.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ using MockMPPServerInfo = DB::tests::MockMPPServerInfo;
class TiFlashSecurityConfig;
using TiFlashSecurityConfigPtr = std::shared_ptr<TiFlashSecurityConfig>;
class MockStorage;
struct JointThreadInfoJeallocMap;
using JointThreadInfoJeallocMapPtr = std::shared_ptr<JointThreadInfoJeallocMap>;

enum class PageStorageRunMode : UInt8;
namespace DM
Expand Down Expand Up @@ -458,6 +460,9 @@ class Context
bool tryUploadAllDataToRemoteStore() const;
void tryReleaseWriteNodePageStorageForTest();

void initializeJointThreadInfoJeallocMap();
JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const;

SharedContextDisaggPtr getSharedContextDisagg() const;

/// Call after initialization before using system logs. Call for global context.
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;

// Must init this before KVStore.
global_context->initializeJointThreadInfoJeallocMap();

/// Init File Provider
if (proxy_conf.is_proxy_runnable)
{
Expand Down Expand Up @@ -1717,8 +1720,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_ERROR(log, "Current status of engine-store is NOT Running, should not happen");
exit(-1);
}
LOG_INFO(log, "Stop collecting thread alloc metrics");
tmt_context.getKVStore()->stopThreadAllocInfo();
LOG_INFO(log, "Set store context status Stopping");
tmt_context.setStatusStopping();
{
Expand Down
192 changes: 192 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>

#include <mutex>
#include <thread>
#include <unordered_set>

namespace DB
{

JointThreadInfoJeallocMap::JointThreadInfoJeallocMap()
{
monitoring_thread = new std::thread([&]() {
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
setThreadName("ThreadMemoryTracer");
while (true)
{
using namespace std::chrono_literals;
std::unique_lock l(monitoring_mut);
monitoring_cv.wait_for(l, 5000ms, [&]() { return is_terminated; });
if (is_terminated)
return;
recordThreadAllocInfo();
}
});
}

void JointThreadInfoJeallocMap::recordThreadAllocInfo()
{
recordThreadAllocInfoForKVStore();
}

JointThreadInfoJeallocMap::~JointThreadInfoJeallocMap()
{
stopThreadAllocInfo();
}

static std::string getThreadNameAggPrefix(const std::string_view & s)
{
if (auto pos = s.find_last_of('-'); pos != std::string::npos)
{
return std::string(s.begin(), s.begin() + pos);
}
return std::string(s.begin(), s.end());
}

void JointThreadInfoJeallocMap::reportThreadAllocInfoImpl(
std::unordered_map<std::string, ThreadInfoJealloc> & m,
std::string_view thdname,
ReportThreadAllocateInfoType type,
uint64_t value)
{
// Many threads have empty name, better just not handle.
if (thdname.empty())
return;
std::string tname(thdname.begin(), thdname.end());
switch (type)
{
case ReportThreadAllocateInfoType::Reset:
{
auto & metrics = TiFlashMetrics::instance();
metrics.registerProxyThreadMemory(getThreadNameAggPrefix(tname));
{
std::unique_lock l(memory_allocation_mut);
m.insert_or_assign(tname, ThreadInfoJealloc());
}
break;
}
case ReportThreadAllocateInfoType::Remove:
{
std::unique_lock l(memory_allocation_mut);
m.erase(tname);
break;
}
case ReportThreadAllocateInfoType::AllocPtr:
{
std::shared_lock l(memory_allocation_mut);
if (value == 0)
return;
auto it = m.find(tname);
if unlikely (it == m.end())
{
return;
}
it->second.allocated_ptr = value;
break;
}
case ReportThreadAllocateInfoType::DeallocPtr:
{
std::shared_lock l(memory_allocation_mut);
if (value == 0)
return;
auto it = m.find(tname);
if unlikely (it == m.end())
{
return;
}
it->second.deallocated_ptr = value;
break;
}
}
}

void JointThreadInfoJeallocMap::reportThreadAllocInfoForKVStore(
std::string_view thdname,
ReportThreadAllocateInfoType type,
uint64_t value)
{
reportThreadAllocInfoImpl(kvstore_map, thdname, type, value);
}

static const std::unordered_set<std::string> PROXY_RECORD_WHITE_LIST_THREAD_PREFIX = {"ReadIndexWkr"};

void JointThreadInfoJeallocMap::recordThreadAllocInfoForKVStore()
{
std::shared_lock l(memory_allocation_mut);
std::unordered_map<std::string, uint64_t> agg_allocate;
std::unordered_map<std::string, uint64_t> agg_deallocate;
for (const auto & [k, v] : kvstore_map)
{
auto agg_thread_name = getThreadNameAggPrefix(k);
// Some thread may have shorter lifetime, we can't use this timed task here to upgrade.
if (PROXY_RECORD_WHITE_LIST_THREAD_PREFIX.contains(agg_thread_name) && v.has_ptr())
{
agg_allocate[agg_thread_name] += v.allocated();
agg_deallocate[agg_thread_name] += v.deallocated();
}
}
for (const auto & [k, v] : agg_allocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, v);
}
for (const auto & [k, v] : agg_deallocate)
{
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, v);
}
}

void JointThreadInfoJeallocMap::reportThreadAllocBatchForKVStore(
std::string_view name,
ReportThreadAllocateInfoBatch data)
{
// Many threads have empty name, better just not handle.
if (name.empty())
return;
// TODO(jemalloc-trace) Could be costy.
auto k = getThreadNameAggPrefix(name);
auto & tiflash_metrics = TiFlashMetrics::instance();
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Alloc, k, data.alloc);
tiflash_metrics.setProxyThreadMemory(TiFlashMetrics::MemoryAllocType::Dealloc, k, data.dealloc);
}

void JointThreadInfoJeallocMap::stopThreadAllocInfo()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call it in ContextShared::shutdown explicitly to make sure the monitoring_thread is stopped before TiFlashMetrics::instance is released

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I shall move joint_memory_allocation_map from Context to ContextShared?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems TiFlashMetrics::instance() is a singleton, I think maybe it actually overlives Context

TiFlashMetrics & TiFlashMetrics::instance()
{
    static TiFlashMetrics inst; // Instantiated on first use.
    return inst;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I shall move joint_memory_allocation_map from Context to ContextShared?

Yes, Context is a session level instance, the long-live instances should be in ContextShared

{
LOG_INFO(DB::Logger::get(), "Stop collecting thread alloc metrics");
{
std::unique_lock lk(monitoring_mut);
// Only one caller can successfully stop the thread.
if (is_terminated)
return;
if (monitoring_thread == nullptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances will this pointer be empty?

Copy link
Contributor

@JaySon-Huang JaySon-Huang May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when stopThreadAllocInfo may be called more than one time

return;
is_terminated = true;
monitoring_cv.notify_all();
}
LOG_INFO(DB::Logger::get(), "JointThreadInfoJeallocMap shutdown, wait thread alloc monitor join");
monitoring_thread->join();
{
std::unique_lock lk(monitoring_mut);
delete monitoring_thread;
monitoring_thread = nullptr;
}
}

} // namespace DB
Loading