Skip to content

Commit

Permalink
Enhance metrics for KVStore write (#8472)
Browse files Browse the repository at this point in the history
close #8471
  • Loading branch information
CalvinNeo authored Dec 21, 2023
1 parent 7449d0e commit cc76e87
Show file tree
Hide file tree
Showing 19 changed files with 772 additions and 64 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
M(OpenFileForReadWrite) \
M(MemoryTracking) \
M(MemoryTrackingInBackgroundProcessingPool) \
M(MemoryTrackingKVStore) \
M(LogicalCPUCores) \
M(MemoryCapacity) \
M(PSMVCCNumSnapshots) \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace CurrentMetrics
extern const Metric MemoryTrackingQueryStorageTask;
extern const Metric MemoryTrackingFetchPages;
extern const Metric MemoryTrackingSharedColumnData;
extern const Metric MemoryTrackingKVStore;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
Expand Down Expand Up @@ -76,13 +77,17 @@ static String storageMemoryUsageDetail()
{
return fmt::format(
"non-query: peak={}, amount={}; "
"kvstore: peak={}, amount={}; "
"query-storage-task: peak={}, amount={}; "
"fetch-pages: peak={}, amount={}; "
"shared-column-data: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak())
: "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get())
: "0",
root_of_kvstore_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_kvstore_mem_trackers->getPeak())
: "0",
root_of_kvstore_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_kvstore_mem_trackers->get()) : "0",
sub_root_of_query_storage_task_mem_trackers
? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->getPeak())
: "0",
Expand Down Expand Up @@ -293,6 +298,7 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;

std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;
Expand Down Expand Up @@ -323,6 +329,8 @@ void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
shared_column_data_mem_tracker
= MemoryTracker::create(0, sub_root_of_query_storage_task_mem_trackers.get(), log_in_destructor);
shared_column_data_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingSharedColumnData);

root_of_kvstore_mem_trackers->setAmountMetric(CurrentMetrics::MemoryTrackingKVStore);
}

namespace CurrentMemoryTracker
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ extern thread_local MemoryTracker * current_memory_tracker;

extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers;

// Initialize in `initStorageMemoryTracker`.
// If a memory tracker of storage tasks is driven by query, it should inherit `sub_root_of_query_storage_task_mem_trackers`.
Expand Down
22 changes: 20 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

namespace DB
{
constexpr size_t RAFT_REGION_BIG_WRITE_THRES = 2 * 1024;
constexpr size_t RAFT_REGION_BIG_WRITE_MAX = 4 * 1024 * 1024; // raft-entry-max-size = 8MiB
static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Invalid RAFT_REGION_BIG_WRITE_THRES");
/// Central place to define metrics across all subsystems.
/// Refer to gtest_tiflash_metrics.cpp for more sample defines.
/// Usage:
Expand Down Expand Up @@ -485,12 +488,13 @@ namespace DB
M(tiflash_raft_raft_frequent_events_count, \
"Raft frequent event counter", \
Counter, \
F(type_write_commit, {{"type", "write_commit"}}), \
F(type_write, {{"type", "write"}})) \
M(tiflash_raft_region_flush_bytes, \
"Bucketed histogram of region flushed bytes", \
Histogram, \
F(type_flushed, {{"type", "flushed"}}, ExpBuckets{32, 2, 21}), \
F(type_unflushed, {{"type", "unflushed"}}, ExpBuckets{32, 2, 21})) \
F(type_flushed, {{"type", "flushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024}), \
F(type_unflushed, {{"type", "unflushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024})) \
M(tiflash_raft_entry_size, \
"Bucketed histogram entry size", \
Histogram, \
Expand All @@ -501,6 +505,20 @@ namespace DB
F(type_raft_snapshot, {{"type", "raft_snapshot"}}), \
F(type_dt_on_disk, {{"type", "dt_on_disk"}}), \
F(type_dt_total, {{"type", "dt_total"}})) \
M(tiflash_raft_throughput_bytes, \
"Raft handled bytes in global", \
Counter, \
F(type_write, {{"type", "write"}}), \
F(type_write_committed, {{"type", "write_committed"}})) \
M(tiflash_raft_write_flow_bytes, \
"Bucketed histogram of bytes for each write", \
Histogram, \
F(type_ingest_uncommitted, {{"type", "ingest_uncommitted"}}, ExpBucketsWithRange{16, 4, 64 * 1024}), \
F(type_snapshot_uncommitted, {{"type", "snapshot_uncommitted"}}, ExpBucketsWithRange{16, 4, 1024 * 1024}), \
F(type_write_committed, {{"type", "write_committed"}}, ExpBucketsWithRange{16, 2, 1024 * 1024}), \
F(type_big_write_to_region, \
{{"type", "big_write_to_region"}}, \
ExpBucketsWithRange{RAFT_REGION_BIG_WRITE_THRES, 4, RAFT_REGION_BIG_WRITE_MAX})) \
M(tiflash_raft_snapshot_total_bytes, \
"Bucketed snapshot total size", \
Histogram, \
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/Decode/DecodedTiKVKeyValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/TiKVHelpers/TiKVKeyspaceIDImpl.h>
#include <Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h>

namespace DB
{
Expand All @@ -37,4 +38,10 @@ std::string DecodedTiKVKey::makeKeyspacePrefix(KeyspaceID keyspace_id)
{
return TiKVKeyspaceID::makeKeyspacePrefix(keyspace_id);
}

HandleID RawTiDBPK::getHandleID() const
{
const auto & pk = *this;
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(pk->data()));
}
} // namespace DB
11 changes: 9 additions & 2 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,16 @@ DM::WriteResult RegionTable::writeBlockByRegion(

reportUpstreamLatency(*data_list_read);
auto write_result = writeRegionDataToStorage(context, region, *data_list_read, log);

auto prev_region_size = region->dataSize();
RemoveRegionCommitCache(region, *data_list_read, lock_region);

auto new_region_size = region->dataSize();
if likely (new_region_size <= prev_region_size)
{
auto committed_bytes = prev_region_size - new_region_size;
GET_METRIC(tiflash_raft_write_flow_bytes, type_write_committed).Observe(committed_bytes);
GET_METRIC(tiflash_raft_throughput_bytes, type_write_committed).Increment(committed_bytes);
GET_METRIC(tiflash_raft_raft_frequent_events_count, type_write_commit).Increment(1);
}
/// Save removed data to outer.
data_list_to_remove = std::move(*data_list_read);
return write_result;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ void KVStore::StoreMeta::update(Base && base_)

KVStore::~KVStore()
{
LOG_INFO(log, "Destroy KVStore");
releaseReadIndexWorkers();
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ void KVStore::onSnapshot(
manage_lock.index.add(new_region);
}

GET_METRIC(tiflash_raft_write_flow_bytes, type_snapshot_uncommitted).Observe(new_region->dataSize());
persistRegion(*new_region, &region_lock, PersistRegionReason::ApplySnapshotCurRegion, "");

tmt.getRegionTable().shrinkRegionRange(*new_region);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ void Region::finishIngestSSTByDTFile(RegionPtr && temp_region, UInt64 index, UIn
{
std::unique_lock<std::shared_mutex> lock(mutex);

auto uncommitted_ingest = temp_region->dataSize();
GET_METRIC(tiflash_raft_write_flow_bytes, type_ingest_uncommitted).Observe(uncommitted_ingest);
if (temp_region)
{
// Merge the uncommitted data from `temp_region`.
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(

size_t put_key_count = 0;
size_t del_key_count = 0;
// How many bytes has been written to KVStore(and maybe then been moved to underlying DeltaTree).
// We don't count DEL because it is only used to delete LOCK, which is small and not count in doInsert.
size_t write_size = 0;
size_t prev_size = dataSize();

SCOPE_EXIT({
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_write).Observe(watch.elapsedSeconds());
Expand All @@ -322,6 +326,10 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
GET_METRIC(tiflash_raft_raft_frequent_events_count, type_write).Increment(1);
GET_METRIC(tiflash_raft_process_keys, type_write_put).Increment(put_key_count);
GET_METRIC(tiflash_raft_process_keys, type_write_del).Increment(del_key_count);
auto after_size = dataSize();
if (after_size > prev_size + RAFT_REGION_BIG_WRITE_THRES)
GET_METRIC(tiflash_raft_write_flow_bytes, type_big_write_to_region).Observe(after_size - prev_size);
GET_METRIC(tiflash_raft_throughput_bytes, type_write).Increment(write_size);
});

if (cmds.len)
Expand Down Expand Up @@ -349,11 +357,11 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
if (is_v2)
{
// There may be orphan default key in a snapshot.
doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
}
else
{
doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny);
}
}
catch (Exception & e)
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));

if (!kv_pair)
return 0;

Expand All @@ -58,6 +57,7 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value));
// according to the process of pessimistic lock, just overwrite.
data.insert_or_assign(std::move(kv_pair.first), std::move(kv_pair.second));
// lock cf is not count into the size of RegionData, always return 0
return 0;
}

Expand Down Expand Up @@ -94,6 +94,8 @@ RegionDataRes RegionCFDataBase<Trait>::insert(std::pair<Key, Value> && kv_pair,
+ " new_val: " + prev_value.toDebugString(),
ErrorCodes::LOGICAL_ERROR);
}
// duplicated key is ignored
return 0;
}
else
{
Expand Down Expand Up @@ -122,7 +124,9 @@ size_t RegionCFDataBase<Trait>::calcTiKVKeyValueSize(const TiKVKey & key, const
return 0;
}
else
{
return key.dataSize() + value.dataSize();
}
}


Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct RegionCFDataBase

size_t getSize() const;

RegionCFDataBase() {}
RegionCFDataBase() = default;
RegionCFDataBase(RegionCFDataBase && region);
RegionCFDataBase & operator=(RegionCFDataBase && region);

Expand Down
59 changes: 45 additions & 14 deletions dbms/src/Storages/KVStore/MultiRaft/RegionData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,51 @@ extern const int LOGICAL_ERROR;
extern const int ILLFORMAT_RAFT_ROW;
} // namespace ErrorCodes

HandleID RawTiDBPK::getHandleID() const
void RegionData::reportAlloc(size_t delta)
{
const auto & pk = *this;
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(pk->data()));
root_of_kvstore_mem_trackers->alloc(delta, false);
}

void RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode)
void RegionData::reportDealloc(size_t delta)
{
root_of_kvstore_mem_trackers->free(delta);
}

void RegionData::reportDelta(size_t prev, size_t current)
{
if (current >= prev)
{
root_of_kvstore_mem_trackers->alloc(current - prev, false);
}
else
{
root_of_kvstore_mem_trackers->free(prev - current);
}
}

size_t RegionData::insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode)
{
switch (cf)
{
case ColumnFamilyType::Write:
{
cf_data_size += write_cf.insert(std::move(key), std::move(value), mode);
return;
auto delta = write_cf.insert(std::move(key), std::move(value), mode);
cf_data_size += delta;
reportAlloc(delta);
return delta;
}
case ColumnFamilyType::Default:
{
cf_data_size += default_cf.insert(std::move(key), std::move(value), mode);
return;
auto delta = default_cf.insert(std::move(key), std::move(value), mode);
cf_data_size += delta;
reportAlloc(delta);
return delta;
}
case ColumnFamilyType::Lock:
{
// lock cf is not count into the size of RegionData
lock_cf.insert(std::move(key), std::move(value), mode);
return;
return 0;
}
}
}
Expand All @@ -64,7 +85,9 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key)
auto pk = RecordKVFormat::getRawTiDBPK(raw_key);
Timestamp ts = RecordKVFormat::getTs(key);
// removed by gc, may not exist.
cf_data_size -= write_cf.remove(RegionWriteCFData::Key{pk, ts}, true);
auto delta = write_cf.remove(RegionWriteCFData::Key{pk, ts}, true);
cf_data_size -= delta;
reportDealloc(delta);
return;
}
case ColumnFamilyType::Default:
Expand All @@ -73,7 +96,9 @@ void RegionData::remove(ColumnFamilyType cf, const TiKVKey & key)
auto pk = RecordKVFormat::getRawTiDBPK(raw_key);
Timestamp ts = RecordKVFormat::getTs(key);
// removed by gc, may not exist.
cf_data_size -= default_cf.remove(RegionDefaultCFData::Key{pk, ts}, true);
auto delta = default_cf.remove(RegionDefaultCFData::Key{pk, ts}, true);
cf_data_size -= delta;
reportDealloc(delta);
return;
}
case ColumnFamilyType::Lock:
Expand All @@ -99,12 +124,16 @@ RegionData::WriteCFIter RegionData::removeDataByWriteIt(const WriteCFIter & writ

if (auto data_it = map.find({pk, decoded_val.prewrite_ts}); data_it != map.end())
{
cf_data_size -= RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second);
auto delta = RegionDefaultCFData::calcTiKVKeyValueSize(data_it->second);
cf_data_size -= delta;
map.erase(data_it);
reportDealloc(delta);
}
}

cf_data_size -= RegionWriteCFData::calcTiKVKeyValueSize(write_it->second);
auto delta = RegionWriteCFData::calcTiKVKeyValueSize(write_it->second);
cf_data_size -= delta;
reportDealloc(delta);

return write_cf.getDataMut().erase(write_it);
}
Expand Down Expand Up @@ -238,6 +267,7 @@ void RegionData::splitInto(const RegionRange & range, RegionData & new_region_da
size_t size_changed = 0;
size_changed += default_cf.splitInto(range, new_region_data.default_cf);
size_changed += write_cf.splitInto(range, new_region_data.write_cf);
// reportAlloc: Remember to track memory here if we have a region-wise metrics later.
size_changed += lock_cf.splitInto(range, new_region_data.lock_cf);
cf_data_size -= size_changed;
new_region_data.cf_data_size += size_changed;
Expand All @@ -248,6 +278,7 @@ void RegionData::mergeFrom(const RegionData & ori_region_data)
size_t size_changed = 0;
size_changed += default_cf.mergeFrom(ori_region_data.default_cf);
size_changed += write_cf.mergeFrom(ori_region_data.write_cf);
// reportAlloc: Remember to track memory here if we have a region-wise metrics later.
size_changed += lock_cf.mergeFrom(ori_region_data.lock_cf);
cf_data_size += size_changed;
}
Expand Down Expand Up @@ -328,6 +359,7 @@ RegionData & RegionData::operator=(RegionData && rhs)
write_cf = std::move(rhs.write_cf);
default_cf = std::move(rhs.default_cf);
lock_cf = std::move(rhs.lock_cf);
reportDelta(cf_data_size, rhs.cf_data_size.load());
cf_data_size = rhs.cf_data_size.load();
return *this;
}
Expand Down Expand Up @@ -360,7 +392,6 @@ uint64_t RegionData::OrphanKeysInfo::remainedKeyCount() const
return remained_keys.size();
}


void RegionData::OrphanKeysInfo::mergeFrom(const RegionData::OrphanKeysInfo & other)
{
// TODO support move.
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class RegionData
using WriteCFIter = RegionWriteCFData::Map::iterator;
using ConstWriteCFIter = RegionWriteCFData::Map::const_iterator;

void insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
static void reportAlloc(size_t delta);
static void reportDealloc(size_t delta);
static void reportDelta(size_t prev, size_t current);

size_t insert(ColumnFamilyType cf, TiKVKey && key, TiKVValue && value, DupCheck mode = DupCheck::Deny);
void remove(ColumnFamilyType cf, const TiKVKey & key);

WriteCFIter removeDataByWriteIt(const WriteCFIter & write_it);
Expand Down
Loading

0 comments on commit cc76e87

Please sign in to comment.