Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions ydb/core/protos/sys_view.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ message TPartitionStats {

optional uint64 ByKeyFilterSize = 21;
optional uint32 FollowerId = 22;

optional uint64 LocksAcquired = 23;
optional uint64 LocksWholeShard = 24;
optional uint64 LocksBroken = 25;
}

message TPartitionStatsResult {
Expand Down Expand Up @@ -133,8 +137,10 @@ enum EStatsType {
METRICS_ONE_HOUR = 8;
TOP_REQUEST_UNITS_ONE_MINUTE = 9;
TOP_REQUEST_UNITS_ONE_HOUR = 10;
TOP_PARTITIONS_ONE_MINUTE = 11;
TOP_PARTITIONS_ONE_HOUR = 12;
TOP_PARTITIONS_BY_CPU_ONE_MINUTE = 11;
TOP_PARTITIONS_BY_CPU_ONE_HOUR = 12;
TOP_PARTITIONS_BY_TLI_ONE_MINUTE = 13;
TOP_PARTITIONS_BY_TLI_ONE_HOUR = 14;
}

message TEvGetQueryStats {
Expand Down Expand Up @@ -608,6 +614,9 @@ message TTopPartitionsInfo {
optional uint64 IndexSize = 8;
optional uint32 InFlightTxCount = 9;
optional uint32 FollowerId = 10;
optional uint64 LocksAcquired = 11;
optional uint64 LocksWholeShard = 12;
optional uint64 LocksBroken = 13;
}

message TTopPartitionsEntry {
Expand All @@ -634,6 +643,7 @@ message TEvGetTopPartitionsResponse {

// partitions stats collector -> SVP
message TEvSendTopPartitions {
repeated TTopPartitionsInfo Partitions = 1;
repeated TTopPartitionsInfo PartitionsByCpu = 1;
repeated TTopPartitionsInfo PartitionsByTli = 3;
optional uint64 TimeUs = 2;
}
6 changes: 4 additions & 2 deletions ydb/core/sys_view/common/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ class TSystemViewResolver : public ISystemViewResolver {
RegisterColumnTableSystemView<Schema::PrimaryIndexGranuleStats>(TablePrimaryIndexGranuleStatsName);
RegisterColumnTableSystemView<Schema::PrimaryIndexOptimizerStats>(TablePrimaryIndexOptimizerStatsName);

RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);
RegisterSystemView<Schema::TopPartitions>(TopPartitionsByCpu1MinuteName);
RegisterSystemView<Schema::TopPartitions>(TopPartitionsByCpu1HourName);
RegisterSystemView<Schema::TopPartitionsTli>(TopPartitionsByTli1MinuteName);
RegisterSystemView<Schema::TopPartitionsTli>(TopPartitionsByTli1HourName);

RegisterPgTablesSystemViews();

Expand Down
46 changes: 43 additions & 3 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ constexpr TStringBuf TablePrimaryIndexPortionStatsName = "primary_index_portion_
constexpr TStringBuf TablePrimaryIndexGranuleStatsName = "primary_index_granule_stats";
constexpr TStringBuf TablePrimaryIndexOptimizerStatsName = "primary_index_optimizer_stats";

constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute";
constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour";
constexpr TStringBuf TopPartitionsByCpu1MinuteName = "top_partitions_one_minute";
constexpr TStringBuf TopPartitionsByCpu1HourName = "top_partitions_one_hour";

constexpr TStringBuf TopPartitionsByTli1MinuteName = "top_partitions_by_tli_one_minute";
constexpr TStringBuf TopPartitionsByTli1HourName = "top_partitions_by_tli_one_hour";

constexpr TStringBuf PgTablesName = "pg_tables";
constexpr TStringBuf InformationSchemaTablesName = "tables";
Expand Down Expand Up @@ -93,6 +96,9 @@ struct Schema : NIceDb::Schema {
struct LastTtlRowsProcessed : Column<25, NScheme::NTypeIds::Uint64> {};
struct LastTtlRowsErased : Column<26, NScheme::NTypeIds::Uint64> {};
struct FollowerId : Column<27, NScheme::NTypeIds::Uint32> {};
struct LocksAcquired : Column<28, NScheme::NTypeIds::Uint64> {};
struct LocksWholeShard : Column<29, NScheme::NTypeIds::Uint64> {};
struct LocksBroken : Column<30, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<OwnerId, PathId, PartIdx, FollowerId>;
using TColumns = TableColumns<
Expand Down Expand Up @@ -122,7 +128,11 @@ struct Schema : NIceDb::Schema {
LastTtlRunTime,
LastTtlRowsProcessed,
LastTtlRowsErased,
FollowerId>;
FollowerId,
LocksAcquired,
LocksWholeShard,
LocksBroken
>;
};

struct Nodes : Table<2> {
Expand Down Expand Up @@ -757,6 +767,36 @@ struct Schema : NIceDb::Schema {
QueryCpuLimitPercentPerNode,
QueryMemoryLimitPercentPerNode>;
};

struct TopPartitionsTli : Table<23> {
struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {};
struct Rank : Column<2, NScheme::NTypeIds::Uint32> {};
struct TabletId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Path : Column<4, NScheme::NTypeIds::Utf8> {};
struct LocksAcquired : Column<5, NScheme::NTypeIds::Uint64> {};
struct LocksWholeShard : Column<6, NScheme::NTypeIds::Uint64> {};
struct LocksBroken : Column<7, NScheme::NTypeIds::Uint64> {};
struct NodeId : Column<8, NScheme::NTypeIds::Uint32> {};
struct DataSize : Column<9, NScheme::NTypeIds::Uint64> {};
struct RowCount : Column<10, NScheme::NTypeIds::Uint64> {};
struct IndexSize : Column<11, NScheme::NTypeIds::Uint64> {};
struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<IntervalEnd, Rank>;
using TColumns = TableColumns<
IntervalEnd,
Rank,
TabletId,
Path,
LocksAcquired,
LocksWholeShard,
LocksBroken,
NodeId,
DataSize,
RowCount,
IndexSize,
FollowerId>;
};
};

bool MaybeSystemViewPath(const TVector<TString>& path);
Expand Down
134 changes: 104 additions & 30 deletions ydb/core/sys_view/partition_stats/partition_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
SVLOG_D("NSysView::TPartitionStatsCollector bootstrapped");

if (AppData()->UsePartitionStatsCollectorForTests) {
OverloadedPartitionBound = 0.0;
OverloadedByCpuPartitionBound = 0.0;
OverloadedByTliPartitionBound = 0;
ProcessOverloadedInterval = TDuration::Seconds(1);
}

Expand Down Expand Up @@ -88,25 +89,32 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& oldPartitions = table.Partitions;
std::unordered_map<TShardIdx, TPartitionStats> newPartitions;
std::set<TOverloadedFollower> overloaded;
std::set<TFollowerStats> overloadedByCpu, overloadedByTli;

for (auto shardIdx : ev->Get()->ShardIndices) {
auto old = oldPartitions.find(shardIdx);
if (old != oldPartitions.end()) {
newPartitions[shardIdx] = old->second;

for (const auto& followerStat: old->second.FollowerStats) {
if (IsPartitionOverloaded(followerStat.second))
overloaded.insert({shardIdx, followerStat.first});
if (IsPartitionOverloadedByCpu(followerStat.second))
overloadedByCpu.insert({shardIdx, followerStat.first});
if (IsPartitionOverloadedByTli(followerStat.second))
overloadedByTli.insert({shardIdx, followerStat.first});
}
}
}

if (!overloaded.empty()) {
tables.Overloaded[pathId].swap(overloaded);
if (!overloadedByCpu.empty()) {
tables.OverloadedByCpu[pathId].swap(overloadedByCpu);
} else {
tables.Overloaded.erase(pathId);
tables.OverloadedByCpu.erase(pathId);
}
if (!overloadedByTli.empty()) {
tables.OverloadedByTli[pathId].swap(overloadedByTli);
} else {
tables.OverloadedByTli.erase(pathId);
}

oldPartitions.swap(newPartitions);
table.ShardIndices.swap(ev->Get()->ShardIndices);
Expand All @@ -125,7 +133,8 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& tables = DomainTables[domainKey];
tables.Stats.erase(pathId);
tables.Overloaded.erase(pathId);
tables.OverloadedByCpu.erase(pathId);
tables.OverloadedByTli.erase(pathId);
}

void Handle(TEvSysView::TEvSendPartitionStats::TPtr& ev) {
Expand Down Expand Up @@ -153,18 +162,29 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& followerStats = partitionStats.FollowerStats[followerId];

TOverloadedFollower overloadedFollower = {shardIdx, followerId};
if (IsPartitionOverloaded(newStats)) {
tables.Overloaded[pathId].insert(overloadedFollower);
TFollowerStats overloadedFollower = {shardIdx, followerId};
if (IsPartitionOverloadedByCpu(newStats)) {
tables.OverloadedByCpu[pathId].insert(overloadedFollower);
} else {
auto overloadedFound = tables.Overloaded.find(pathId);
if (overloadedFound != tables.Overloaded.end()) {
auto overloadedFound = tables.OverloadedByCpu.find(pathId);
if (overloadedFound != tables.OverloadedByCpu.end()) {
overloadedFound->second.erase(overloadedFollower);
if (overloadedFound->second.empty()) {
tables.Overloaded.erase(pathId);
tables.OverloadedByCpu.erase(pathId);
}
}
}
if (IsPartitionOverloadedByTli(newStats)) {
tables.OverloadedByTli[pathId].insert(overloadedFollower);
} else {
auto overloadedFound = tables.OverloadedByTli.find(pathId);
if (overloadedFound != tables.OverloadedByTli.end()) {
overloadedFound->second.erase(overloadedFollower);
if (overloadedFound->second.empty()) {
tables.OverloadedByTli.erase(pathId);
}
}
}

if (followerStats.HasTtlStats()) {
newStats.MutableTtlStats()->Swap(followerStats.MutableTtlStats());
Expand Down Expand Up @@ -396,37 +416,54 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
}
auto& domainTables = domainFound->second;

struct TPartition {
struct TPartitionByCpu {
TPathId PathId;
TShardIdx ShardIdx;
ui32 FollowerId;
double CPUCores;
};
std::vector<TPartition> sorted;
std::vector<TPartitionByCpu> sortedByCpu;

for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) {
for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) {
struct TPartitionByTli {
TPathId PathId;
TShardIdx ShardIdx;
ui32 FollowerId;
ui64 LocksBroken;
};
std::vector<TPartitionByTli> sortedByTli;

for (const auto& [pathId, overloadedFollowers] : domainTables.OverloadedByCpu) {
for (const TFollowerStats& overloadedFollower : overloadedFollowers) {
const auto& table = domainTables.Stats[pathId];
const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId);
sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()});
sortedByCpu.emplace_back(TPartitionByCpu{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()});
}
}
for (const auto& [pathId, overloadedFollowers] : domainTables.OverloadedByTli) {
for (const TFollowerStats& overloadedFollower : overloadedFollowers) {
const auto& table = domainTables.Stats[pathId];
const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId);
sortedByTli.emplace_back(TPartitionByTli{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetLocksBroken()});
}
}

std::sort(sorted.begin(), sorted.end(),
std::sort(sortedByCpu.begin(), sortedByCpu.end(),
[] (const auto& l, const auto& r) { return l.CPUCores > r.CPUCores; });
std::sort(sortedByTli.begin(), sortedByTli.end(),
[] (const auto& l, const auto& r) { return l.LocksBroken > r.LocksBroken; });

auto now = TActivationContext::Now();
auto nowUs = now.MicroSeconds();

size_t count = 0;
auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>();
for (const auto& entry : sorted) {
for (const auto& entry : sortedByCpu) {
const auto& table = domainTables.Stats[entry.PathId];
const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats;
const auto& partition = followerStats.at(entry.FollowerId);
const auto& leaderPartition = followerStats.at(0);

auto* result = sendEvent->Record.AddPartitions();
auto* result = sendEvent->Record.AddPartitionsByCpu();
result->SetTabletId(partition.GetTabletId());
result->SetPath(table.Path);
result->SetPeakTimeUs(nowUs);
Expand All @@ -442,11 +479,34 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
break;
}
}
for (const auto& entry : sortedByTli) {
const auto& table = domainTables.Stats[entry.PathId];
const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats;
const auto& partition = followerStats.at(entry.FollowerId);
const auto& leaderPartition = followerStats.at(0);

auto* result = sendEvent->Record.AddPartitionsByTli();
result->SetTabletId(partition.GetTabletId());
result->SetPath(table.Path);
result->SetLocksAcquired(partition.GetLocksAcquired());
result->SetLocksWholeShard(partition.GetLocksWholeShard());
result->SetLocksBroken(partition.GetLocksBroken());
result->SetNodeId(partition.GetNodeId());
result->SetDataSize(leaderPartition.GetDataSize());
result->SetRowCount(leaderPartition.GetRowCount());
result->SetIndexSize(leaderPartition.GetIndexSize());
result->SetFollowerId(partition.GetFollowerId());

if (++count == TOP_PARTITIONS_COUNT) {
break;
}
}

sendEvent->Record.SetTimeUs(nowUs);

SVLOG_D("NSysView::TPartitionStatsCollector: TEvProcessOverloaded "
<< "top size# " << sorted.size()
<< ", top size by CPU # " << sortedByCpu.size()
<< ", top size by TLI # " << sortedByTli.size()
<< ", time# " << now);

Send(MakePipePerNodeCacheID(false),
Expand All @@ -467,8 +527,11 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
TBase::PassAway();
}

bool IsPartitionOverloaded(const NKikimrSysView::TPartitionStats& stats) const {
return stats.GetCPUCores() >= OverloadedPartitionBound;
bool IsPartitionOverloadedByCpu(const NKikimrSysView::TPartitionStats& stats) const {
return stats.GetCPUCores() >= OverloadedByCpuPartitionBound;
}
bool IsPartitionOverloadedByTli(const NKikimrSysView::TPartitionStats& stats) const {
return stats.GetLocksBroken() >= OverloadedByTliPartitionBound;
}

private:
Expand All @@ -478,7 +541,8 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
TPathId DomainKey;
ui64 SysViewProcessorId = 0;

double OverloadedPartitionBound = 0.7;
double OverloadedByCpuPartitionBound = 0.7;
ui64 OverloadedByTliPartitionBound = 1;
TDuration ProcessOverloadedInterval = TDuration::Seconds(15);

typedef ui32 TFollowerId;
Expand All @@ -493,22 +557,23 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
TString Path;
};

struct TOverloadedFollower {
struct TFollowerStats {
TShardIdx ShardIdx;
TFollowerId FollowerId;

bool operator<(const TOverloadedFollower &other) const {
bool operator<(const TFollowerStats &other) const {
return std::tie(ShardIdx, FollowerId) < std::tie(other.ShardIdx, other.FollowerId);
}

bool operator==(const TOverloadedFollower &other) const {
bool operator==(const TFollowerStats &other) const {
return std::tie(ShardIdx, FollowerId) == std::tie(other.ShardIdx, other.FollowerId);
}
};

struct TDomainTables {
std::map<TPathId, TTableStats> Stats;
std::unordered_map<TPathId, std::set<TOverloadedFollower>> Overloaded;
std::unordered_map<TPathId, std::set<TFollowerStats>> OverloadedByCpu;
std::unordered_map<TPathId, std::set<TFollowerStats>> OverloadedByTli;
};
std::unordered_map<TPathId, TDomainTables> DomainTables;

Expand Down Expand Up @@ -714,6 +779,15 @@ class TPartitionStatsScan : public TScanActorBase<TPartitionStatsScan> {
insert({TSchema::FollowerId::ColumnId, [] (const TPartitionStatsResult&, const TPartitionStats&, const TPartitionStats& stats) {
return TCell::Make<ui32>(stats.GetFollowerId());
}});
insert({TSchema::LocksAcquired::ColumnId, [] (const TPartitionStatsResult&, const TPartitionStats&, const TPartitionStats& stats) {
return TCell::Make<ui64>(stats.GetLocksAcquired());
}});
insert({TSchema::LocksWholeShard::ColumnId, [] (const TPartitionStatsResult&, const TPartitionStats&, const TPartitionStats& stats) {
return TCell::Make<ui64>(stats.GetLocksWholeShard());
}});
insert({TSchema::LocksBroken::ColumnId, [] (const TPartitionStatsResult&, const TPartitionStats&, const TPartitionStats& stats) {
return TCell::Make<ui64>(stats.GetLocksBroken());
}});
}
};
static TExtractorsMap extractors;
Expand Down
Loading
Loading