Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ydb/core/protos/counters_statistics_aggregator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ option java_package = "ru.yandex.kikimr.proto";
option (TabletTypeName) = "StatisticsAggregator";

enum ETxTypes {
TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}];
TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}];
TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}];
TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}];
TXTYPE_SCHEMESHARD_STATS = 3 [(TxTypeOpts) = {Name: "TxSchemeShardStats"}];
}
56 changes: 45 additions & 11 deletions ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,55 @@ package NKikimrStat;

option java_package = "ru.yandex.kikimr.proto";

message TEvBroadcastStatistics {
message TEntry {
optional NKikimrProto.TPathID PathId = 1;
optional uint64 RowCount = 2;
optional uint64 BytesSize = 3;
message TEvConfigureAggregator {
optional string Database = 1;
}

message TPathEntry {
optional NKikimrProto.TPathID PathId = 1;
optional uint64 RowCount = 2;
optional uint64 BytesSize = 3;
}

message TSchemeShardStats {
repeated TPathEntry Entries = 1;
}

// SS -> SA
message TEvConnectSchemeShard {
optional fixed64 SchemeShardId = 1;
}

// SS -> SA
message TEvSchemeShardStats {
optional fixed64 SchemeShardId = 1;
optional bytes Stats = 2; // serialized TSchemeShardStats
}

// nodes -> SA
message TEvConnectNode {
optional uint32 NodeId = 1;
repeated fixed64 NeedSchemeShards = 2;
message THaveEntry {
optional fixed64 SchemeShardId = 1;
optional uint64 Timestamp = 2;
}
repeated uint32 NodeIds = 1;
repeated TEntry Entries = 2;
repeated THaveEntry HaveSchemeShards = 3;
}

message TEvRegisterNode {
// nodes -> SA
message TEvRequestStats {
optional uint32 NodeId = 1;
optional bool HasStatistics = 2;
repeated fixed64 NeedSchemeShards = 2;
}

message TEvConfigureAggregator {
optional string Database = 1;
// SA -> nodes
message TEvPropagateStatistics {
repeated uint32 NodeIds = 1; // hierarchical propagation
message TStatsEntry {
optional fixed64 SchemeShardId = 1;
optional bytes Stats = 2; // serialized TSchemeShardStats
optional uint64 Timestamp = 3;
}
repeated TStatsEntry Entries = 2;
}
6 changes: 5 additions & 1 deletion ydb/core/statistics/aggregator/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
namespace NKikimr::NStat {

IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) {
return new TStatisticsAggregator(tablet, info);
return new TStatisticsAggregator(tablet, info, false);
}

IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info) {
return new TStatisticsAggregator(tablet, info, true);
}

} // NKikimr::NStat
2 changes: 2 additions & 0 deletions ydb/core/statistics/aggregator/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ namespace NKikimr::NStat {

IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info);

IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info);

} // NKikimr::NStat
247 changes: 243 additions & 4 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
#include "aggregator_impl.h"

#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/statistics/stat_service.h>

#include <library/cpp/monlib/service/pages/templates.h>

namespace NKikimr::NStat {

TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info)
TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info, bool forTests)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
{}
{
PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3);

auto seed = std::random_device{}();
RandomGenerator.seed(seed);
}

void TStatisticsAggregator::OnDetach(const TActorContext& ctx) {
Die(ctx);
Expand All @@ -29,8 +35,241 @@ void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx)
Y_UNUSED(ctx);
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvProcess::TPtr&) {
SA_LOG_D("[" << TabletID() << "] Handle TEvPrivate::TEvProcess");
void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev) {
auto pipeServerId = ev->Get()->ServerId;

SA_LOG_D("[" << TabletID() << "] EvServerConnected"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Все вызовы SA_LOG_D начинаются с ("[" << TabletID() << "]
как будто стоит сделать это частью макроса

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

это лучше поправить отдельным коммитом

<< ", pipe server id = " << pipeServerId);
}

void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev) {
auto pipeServerId = ev->Get()->ServerId;

SA_LOG_D("[" << TabletID() << "] EvServerDisconnected"
<< ", pipe server id = " << pipeServerId);

auto itNodeServer = NodePipes.find(pipeServerId);
if (itNodeServer != NodePipes.end()) {
auto nodeId = itNodeServer->second;
auto itNode = Nodes.find(nodeId);
if (itNode != Nodes.end()) {
--itNode->second;
if (itNode->second == 0) {
Nodes.erase(itNode);
}
}
NodePipes.erase(itNodeServer);
return;
}

auto itShardServer = SchemeShardPipes.find(pipeServerId);
if (itShardServer != SchemeShardPipes.end()) {
auto ssId = itShardServer->second;
auto itShard = SchemeShards.find(ssId);
if (itShard != SchemeShards.end()) {
--itShard->second;
if (itShard->second == 0) {
SchemeShards.erase(itShard);
}
}
SchemeShardPipes.erase(itShardServer);
return;
}
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
const auto& record = ev->Get()->Record;
const TNodeId nodeId = record.GetNodeId();
auto pipeServerId = ev->Recipient;

SA_LOG_D("[" << TabletID() << "] EvConnectNode"
<< ", pipe server id = " << pipeServerId
<< ", node id = " << nodeId
<< ", have schemeshards count = " << record.HaveSchemeShardsSize()
<< ", need schemeshards count = " << record.NeedSchemeShardsSize());

if (NodePipes.find(pipeServerId) == NodePipes.end()) {
NodePipes[pipeServerId] = nodeId;
++Nodes[nodeId];
}

for (const auto& ssEntry : record.GetHaveSchemeShards()) {
RequestedSchemeShards.insert(ssEntry.GetSchemeShardId());
}

if (!IsPropagateInFlight) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
IsPropagateInFlight = true;
}

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
ssIds.push_back(ssId);
RequestedSchemeShards.insert(ssId);
}

ProcessRequests(nodeId, ssIds);
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {
const auto& record = ev->Get()->Record;
const auto nodeId = record.GetNodeId();

SA_LOG_D("[" << TabletID() << "] EvRequestStats"
<< ", node id = " << nodeId
<< ", schemeshard count = " << record.NeedSchemeShardsSize());

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
ssIds.push_back(ssId);
}

ProcessRequests(nodeId, ssIds);
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& ev) {
const auto& record = ev->Get()->Record;
const TSSId schemeShardId = record.GetSchemeShardId();
auto pipeServerId = ev->Recipient;

if (SchemeShardPipes.find(pipeServerId) == SchemeShardPipes.end()) {
SchemeShardPipes[pipeServerId] = schemeShardId;
++SchemeShards[schemeShardId];
}

SA_LOG_D("[" << TabletID() << "] EvConnectSchemeShard"
<< ", pipe server id = " << pipeServerId
<< ", schemeshard id = " << schemeShardId);
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvFastPropagateCheck");

PropagateFastStatistics();

FastCheckInFlight = false;
FastCounter = StatsOptimizeFirstNodesCount;
FastNodes.clear();
FastSchemeShards.clear();
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvPropagate");

PropagateStatistics();

Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
}

void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
if (FastCounter > 0) {
--FastCounter;
SendStatisticsToNode(nodeId, ssIds);
} else {
FastNodes.insert(nodeId);
for (const auto& ssId : ssIds) {
FastSchemeShards.insert(ssId);
}
if (!FastCheckInFlight) {
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
FastCheckInFlight = true;
}
}
}

void TStatisticsAggregator::SendStatisticsToNode(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
SA_LOG_D("[" << TabletID() << "] SendStatisticsToNode()"
<< ", node id = " << nodeId
<< ", schemeshard count = " << ssIds.size());

std::vector<TNodeId> nodeIds;
nodeIds.push_back(nodeId);

PropagateStatisticsImpl(nodeIds, ssIds);
}

void TStatisticsAggregator::PropagateStatistics() {
SA_LOG_D("[" << TabletID() << "] PropagateStatistics()"
<< ", node count = " << Nodes.size()
<< ", schemeshard count = " << RequestedSchemeShards.size());

if (Nodes.empty() || RequestedSchemeShards.empty()) {
return;
}

std::vector<TNodeId> nodeIds;
nodeIds.reserve(Nodes.size());
for (const auto& [nodeId, _] : Nodes) {
nodeIds.push_back(nodeId);
}
std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator);

std::vector<TSSId> ssIds;
ssIds.reserve(RequestedSchemeShards.size());
for (const auto& ssId : RequestedSchemeShards) {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
}

void TStatisticsAggregator::PropagateFastStatistics() {
SA_LOG_D("[" << TabletID() << "] PropagateFastStatistics()"
<< ", node count = " << FastNodes.size()
<< ", schemeshard count = " << FastSchemeShards.size());

if (FastNodes.empty() || FastSchemeShards.empty()) {
return;
}

std::vector<TNodeId> nodeIds;
nodeIds.reserve(FastNodes.size());
for (const auto& nodeId : FastNodes) {
nodeIds.push_back(nodeId);
}
std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator);

std::vector<TSSId> ssIds;
ssIds.reserve(FastSchemeShards.size());
for (const auto& ssId : FastSchemeShards) {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
}

void TStatisticsAggregator::PropagateStatisticsImpl(
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
{
TNodeId leadingNodeId = nodeIds[0];

for (size_t index = 0; index < ssIds.size(); ) {
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
auto* record = propagate->MutableRecord();
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
for (size_t i = 1; i < nodeIds.size(); ++i) {
record->AddNodeIds(nodeIds[i]);
}
for (size_t size = 0; index < ssIds.size(); ++index) {
auto ssId = ssIds[index];
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStats.find(ssId);
if (itStats != BaseStats.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
} else {
entry->SetStats(TString()); // stats are not sent from SA yet
}
if (size >= StatsSizeLimitBytes) {
++index;
break;
}
}
Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());
}
}

void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
Expand Down
Loading