Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Track memory for PageDirectory #9134

Merged
merged 18 commits into from
Jun 13, 2024
5 changes: 5 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"The freshness of tiflash data with tikv data", \
Histogram, \
F(type_syncing_data_freshness, {{"type", "data_freshness"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_memory_usage_by_class, \
"TiFlash memory consumes by class", \
Gauge, \
F(type_uni_page_ids, {"type", "uni_page_ids"}), \
F(type_versioned_entries, {"type", "versioned_entries"})) \
M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \
M(tiflash_storage_command_count, \
"Total number of storage's command, such as delete range / shutdown /startup", \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/PageStorageMemorySummary.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/StorageDeltaMerge.h>
#include <common/config_common.h>
Expand Down Expand Up @@ -300,6 +301,7 @@ void AsynchronousMetrics::update()
set("LogNums", usage.total_log_file_num);
set("LogDiskBytes", usage.total_log_disk_size);
set("PagesInMem", usage.num_pages);
set("VersionedEntries", DB::PS::PageStorageMemorySummary::versioned_entry_or_delete_count.load());
}

if (context.getSharedContextDisagg()->isDisaggregatedStorageMode())
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/setThreadName.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/Page/V3/PageDirectory.h>

#include <magic_enum.hpp>
#include <mutex>
Expand Down Expand Up @@ -47,6 +48,7 @@ void JointThreadInfoJeallocMap::recordThreadAllocInfo()
{
recordThreadAllocInfoForProxy();
recordThreadAllocInfoForStorage();
recordClassdAlloc();
}

JointThreadInfoJeallocMap::~JointThreadInfoJeallocMap()
Expand Down Expand Up @@ -269,4 +271,13 @@ void JointThreadInfoJeallocMap::accessStorageMap(std::function<void(const AllocM
f(storage_map);
}

void JointThreadInfoJeallocMap::recordClassdAlloc()
{
GET_METRIC(tiflash_memory_usage_by_class, type_uni_page_ids)
.Set(PS::PageStorageMemorySummary::uni_page_id_bytes.load());
GET_METRIC(tiflash_memory_usage_by_class, type_versioned_entries)
.Set(PS::PageStorageMemorySummary::versioned_entry_or_delete_bytes.load());
}


} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class JointThreadInfoJeallocMap
uint64_t value,
char aggregate_delimer);

static void recordClassdAlloc();

private:
mutable std::shared_mutex memory_allocation_mut;
AllocMap proxy_map;
Expand Down
46 changes: 27 additions & 19 deletions dbms/src/Storages/KVStore/FFI/SSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MonoSSTReader : public SSTReader
BaseBuffView keyView() const override;
BaseBuffView valueView() const override;
void next() override;
SSTFormatKind sst_format_kind() const { return kind; };
SSTFormatKind sstFormatKind() const { return kind; };
size_t approxSize() const override;
std::vector<std::string> findSplitKeys(uint64_t splits_count) const override;
void seek(BaseBuffView && view) const override;
Expand Down Expand Up @@ -147,20 +147,27 @@ class MultiSSTReader : public SSTReader
}
size_t getSplitId() const override { return split_id; }

// Switch to next mono reader if current is drained,
// Switch to next mono reader if current SST is drained,
// and we have a next sst file to read.
void maybeNextReader() const
void maybeNextReader()
{
if (!mono->remained())
if (likely(mono->remained()))
return;

sst_idx++;
if (sst_idx < args.size())
{
current++;
if (current < args.size())
{
// We don't drop if mono is the last instance for safety,
// and it will be dropped as MultiSSTReader is dropped.
LOG_INFO(log, "Open sst file {}", buffToStrView(args[current].path));
mono = initer(proxy_helper, args[current], range, split_id);
}
// We don't drop if mono is the last instance for safety,
// and it will be dropped as MultiSSTReader is dropped.
LOG_INFO(
log,
"Open sst file {}, range={} sst_idx={} sst_tot={} split_id={}",
buffToStrView(args[sst_idx].path),
range->toDebugString(),
sst_idx,
args.size(),
split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
}
}

Expand All @@ -177,18 +184,19 @@ class MultiSSTReader : public SSTReader
, type(type_)
, initer(initer_)
, args(args_)
, current(0)
, sst_idx(0)
, range(range_)
, split_id(split_id_)
{
assert(args.size() > 0);
LOG_INFO(
log,
"Open sst file first {} range {} split_id={}",
buffToStrView(args[current].path),
"Open sst file first {}, range={} sst_tot={} split_id={}",
buffToStrView(args[sst_idx].path),
range->toDebugString(),
args.size(),
split_id);
mono = initer(proxy_helper, args[current], range, split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
}

~MultiSSTReader() override
Expand All @@ -202,12 +210,12 @@ class MultiSSTReader : public SSTReader
/// The instance is ill-formed if the size of `args` is zero.
mutable std::unique_ptr<R> mono;
const TiFlashRaftProxyHelper * proxy_helper;
ColumnFamilyType type;
const ColumnFamilyType type;
Initer initer;
std::vector<E> args;
mutable size_t current;
size_t sst_idx;
RegionRangeFilter range;
size_t split_id;
const size_t split_id;
};

} // namespace DB
25 changes: 12 additions & 13 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parall
{
LOG_DEBUG(
log,
"Prehandle resource meet, limit={}, current={}, region_id={}",
"Prehandle resource meet, limit={} current={} region_id={}",
parallel_subtask_limit,
ongoing_prehandle_subtask_count.load(),
region_id);
Expand Down Expand Up @@ -398,8 +398,7 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
LOG_INFO(
log,
"getSplitKey result {}, total_concurrency={} ongoing={} total_split_parts={} split_keys={} "
"region_range={} approx_bytes={} "
"region_id={}",
"region_range={} approx_bytes={} region_id={}",
fmt_buf.toString(),
total_concurrency,
ongoing_count,
Expand Down Expand Up @@ -448,7 +447,7 @@ static void runInParallel(
= executeTransform(log, prehandle_ctx, part_new_region, part_sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={}, "
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"split_id={} region_id={}",
limit_tag,
part_prehandle_result.stats.write_cf_keys,
Expand Down Expand Up @@ -476,8 +475,7 @@ static void runInParallel(
LOG_INFO(
log,
"Parallel prehandling error {}"
" write_cf_off={}"
" split_id={} region_id={}",
" write_cf_off={} split_id={} region_id={}",
e.message(),
processed_keys.write_cf,
extra_id,
Expand Down Expand Up @@ -509,10 +507,11 @@ void executeParallelTransform(
split_key_count);
LOG_INFO(
log,
"Parallel prehandling for single big region, range={}, split keys={}, region_id={}",
"Parallel prehandling for single big region, range={} split_keys={} region_id={} snaps={}",
new_region->getRange()->toDebugString(),
split_key_count,
new_region->id());
new_region->id(),
snaps.len);
Stopwatch watch;
// Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail.
auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5);
Expand Down Expand Up @@ -550,9 +549,8 @@ void executeParallelTransform(
auto [head_result, head_prehandle_result] = executeTransform(log, prehandle_ctx, new_region, sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit={} write_cf {} lock_cf={} default_cf={} dmfiles={} "
"error={}, split_id={}, "
"region_id={}",
"Finished extra parallel prehandle task, limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} "
"error={} split_id={} region_id={}",
sst_stream->getSoftLimit()->toDebugString(),
head_prehandle_result.stats.write_cf_keys,
head_prehandle_result.stats.lock_cf_keys,
Expand Down Expand Up @@ -714,9 +712,10 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
{
LOG_INFO(
log,
"Single threaded prehandling for single region, range={} region_id={}",
"Single threaded prehandling for single region, range={} region_id={} snaps={}",
new_region->getRange()->toDebugString(),
new_region->id());
new_region->id(),
snaps.len);
std::tie(result, prehandle_result) = executeTransform(log, prehandle_ctx, new_region, sst_stream);
}
else
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/KVStore/tests/region_kvstore_test.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/RegionQueryInfo.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/tests/region_kvstore_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ inline void validateSSTGeneration(
size_t split_id) -> std::unique_ptr<MonoSSTReader> {
auto parsed_kind = MockSSTGenerator::parseSSTViewKind(buffToStrView(snap.path));
auto reader = std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id);
assert(reader->sst_format_kind() == parsed_kind);
assert(reader->sstFormatKind() == parsed_kind);
return reader;
};
MultiSSTReader<MonoSSTReader, SSTView> reader{
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Page/PageStorageMemorySummary.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <atomic>

namespace DB::PS
{
struct PageStorageMemorySummary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names here are little verbose.

  • The name PageStorageMemorySummary has a slightly duplicated with the namespace PS.

  • The prefix mem_sum_ of mem_sum_uni_page_ids and mem_sum_versioned_entry_or_delete are duplicated with the struct name PageStorageMemorySummary. I suggest changing these names to:

    • mem_sum_uni_page_ids => uni_page_id_bytes.
    • mem_sum_versioned_entry_or_delete => versioned_entry_or_delete_bytes.
    • num_versioned_entry_or_delete => versioned_entry_or_delete_count.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
static inline std::atomic_int64_t uni_page_id_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_count{0};
};

} // namespace DB::PS
48 changes: 32 additions & 16 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,47 @@ struct EntryOrDelete
MultiVersionRefCount being_ref_count;
std::optional<PageEntryV3> entry;

static EntryOrDelete newDelete()
EntryOrDelete(const EntryOrDelete & other)
: being_ref_count(other.being_ref_count)
, entry(other.entry)
{
return EntryOrDelete{
.entry = std::nullopt,
};
};
static EntryOrDelete newNormalEntry(const PageEntryV3 & entry)
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
EntryOrDelete() { PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1); }
EntryOrDelete(std::optional<PageEntryV3> entry_)
: entry(std::move(entry_))
{
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
EntryOrDelete(MultiVersionRefCount being_ref_count_, std::optional<PageEntryV3> entry_)
: being_ref_count(being_ref_count_)
, entry(std::move(entry_))
{
return EntryOrDelete{
.entry = entry,
};
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
~EntryOrDelete()
{
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_sub(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_sub(sizeof(PageEntryV3));
}

static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); };
static EntryOrDelete newNormalEntry(const PageEntryV3 & entry) { return EntryOrDelete(entry); }
static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
{
return EntryOrDelete{
.being_ref_count = ori_entry.being_ref_count,
.entry = entry,
};
return EntryOrDelete(ori_entry.being_ref_count, entry);
}

static EntryOrDelete newFromRestored(PageEntryV3 entry, const PageVersion & ver, Int64 being_ref_count)
{
auto result = EntryOrDelete{
.entry = entry,
};
auto result = EntryOrDelete(std::move(entry));
result.being_ref_count.restoreFrom(ver, being_ref_count);
return result;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory/PageIdTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct PageIdTrait
static inline PageIdU64 getU64ID(const PageId & page_id) { return page_id.low; }
static inline Prefix getPrefix(const PageId & page_id) { return page_id.high; }
static inline PageIdU64 getPageMapKey(const PageId & page_id) { return page_id.low; }
static inline size_t getPageIDSize(const PageId & page_id) { return sizeof(page_id); }
};
} // namespace u128
namespace universal
Expand All @@ -45,6 +46,8 @@ struct PageIdTrait
static Prefix getPrefix(const PageId & page_id);

static inline PageId getPageMapKey(const PageId & page_id) { return page_id; }

static inline size_t getPageIDSize(const PageId & page_id) { return page_id.size(); }
};
} // namespace universal
} // namespace DB::PS::V3
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Page/PageStorageMemorySummary.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <fmt/format.h>
Expand Down
Loading