diff --git a/ydb/core/protos/counters_statistics_aggregator.proto b/ydb/core/protos/counters_statistics_aggregator.proto index ba44b3a24590..d1def622c413 100644 --- a/ydb/core/protos/counters_statistics_aggregator.proto +++ b/ydb/core/protos/counters_statistics_aggregator.proto @@ -7,7 +7,8 @@ option java_package = "ru.yandex.kikimr.proto"; option (TabletTypeName) = "StatisticsAggregator"; enum ETxTypes { - TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}]; - TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}]; - TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}]; + TXTYPE_INIT_SCHEMA = 0 [(TxTypeOpts) = {Name: "TxInitSchema"}]; + TXTYPE_INIT = 1 [(TxTypeOpts) = {Name: "TxInit"}]; + TXTYPE_CONFIGURE = 2 [(TxTypeOpts) = {Name: "TxConfigure"}]; + TXTYPE_SCHEMESHARD_STATS = 3 [(TxTypeOpts) = {Name: "TxSchemeShardStats"}]; } diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto index 05e71ed292d5..b759cf34f39e 100644 --- a/ydb/core/protos/statistics.proto +++ b/ydb/core/protos/statistics.proto @@ -4,21 +4,55 @@ package NKikimrStat; option java_package = "ru.yandex.kikimr.proto"; -message TEvBroadcastStatistics { - message TEntry { - optional NKikimrProto.TPathID PathId = 1; - optional uint64 RowCount = 2; - optional uint64 BytesSize = 3; +message TEvConfigureAggregator { + optional string Database = 1; +} + +message TPathEntry { + optional NKikimrProto.TPathID PathId = 1; + optional uint64 RowCount = 2; + optional uint64 BytesSize = 3; +} + +message TSchemeShardStats { + repeated TPathEntry Entries = 1; +} + +// SS -> SA +message TEvConnectSchemeShard { + optional fixed64 SchemeShardId = 1; +} + +// SS -> SA +message TEvSchemeShardStats { + optional fixed64 SchemeShardId = 1; + optional bytes Stats = 2; // serialized TSchemeShardStats +} + +// nodes -> SA +message TEvConnectNode { + optional uint32 NodeId = 1; + repeated fixed64 NeedSchemeShards = 2; + message THaveEntry { + optional fixed64 SchemeShardId = 1; + optional uint64 Timestamp = 2; } - repeated uint32 NodeIds = 1; - repeated TEntry Entries = 2; + repeated THaveEntry HaveSchemeShards = 3; } -message TEvRegisterNode { +// nodes -> SA +message TEvRequestStats { optional uint32 NodeId = 1; - optional bool HasStatistics = 2; + repeated fixed64 NeedSchemeShards = 2; } -message TEvConfigureAggregator { - optional string Database = 1; +// SA -> nodes +message TEvPropagateStatistics { + repeated uint32 NodeIds = 1; // hierarchical propagation + message TStatsEntry { + optional fixed64 SchemeShardId = 1; + optional bytes Stats = 2; // serialized TSchemeShardStats + optional uint64 Timestamp = 3; + } + repeated TStatsEntry Entries = 2; } diff --git a/ydb/core/statistics/aggregator/aggregator.cpp b/ydb/core/statistics/aggregator/aggregator.cpp index 9ec84bfe9919..2d0ef573d8fd 100644 --- a/ydb/core/statistics/aggregator/aggregator.cpp +++ b/ydb/core/statistics/aggregator/aggregator.cpp @@ -5,7 +5,11 @@ namespace NKikimr::NStat { IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) { - return new TStatisticsAggregator(tablet, info); + return new TStatisticsAggregator(tablet, info, false); +} + +IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info) { + return new TStatisticsAggregator(tablet, info, true); } } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/aggregator.h b/ydb/core/statistics/aggregator/aggregator.h index 414c6195c802..aae2fc2a1988 100644 --- a/ydb/core/statistics/aggregator/aggregator.h +++ b/ydb/core/statistics/aggregator/aggregator.h @@ -7,4 +7,6 @@ namespace NKikimr::NStat { IActor* CreateStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info); +IActor* CreateStatisticsAggregatorForTests(const NActors::TActorId& tablet, TTabletStorageInfo* info); + } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index f1556276f434..e5d50e69c333 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -1,15 +1,21 @@ #include "aggregator_impl.h" #include +#include #include namespace NKikimr::NStat { -TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info) +TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info, bool forTests) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) -{} +{ + PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3); + + auto seed = std::random_device{}(); + RandomGenerator.seed(seed); +} void TStatisticsAggregator::OnDetach(const TActorContext& ctx) { Die(ctx); @@ -29,8 +35,241 @@ void TStatisticsAggregator::DefaultSignalTabletActive(const TActorContext& ctx) Y_UNUSED(ctx); } -void TStatisticsAggregator::Handle(TEvPrivate::TEvProcess::TPtr&) { - SA_LOG_D("[" << TabletID() << "] Handle TEvPrivate::TEvProcess"); +void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev) { + auto pipeServerId = ev->Get()->ServerId; + + SA_LOG_D("[" << TabletID() << "] EvServerConnected" + << ", pipe server id = " << pipeServerId); +} + +void TStatisticsAggregator::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev) { + auto pipeServerId = ev->Get()->ServerId; + + SA_LOG_D("[" << TabletID() << "] EvServerDisconnected" + << ", pipe server id = " << pipeServerId); + + auto itNodeServer = NodePipes.find(pipeServerId); + if (itNodeServer != NodePipes.end()) { + auto nodeId = itNodeServer->second; + auto itNode = Nodes.find(nodeId); + if (itNode != Nodes.end()) { + --itNode->second; + if (itNode->second == 0) { + Nodes.erase(itNode); + } + } + NodePipes.erase(itNodeServer); + return; + } + + auto itShardServer = SchemeShardPipes.find(pipeServerId); + if (itShardServer != SchemeShardPipes.end()) { + auto ssId = itShardServer->second; + auto itShard = SchemeShards.find(ssId); + if (itShard != SchemeShards.end()) { + --itShard->second; + if (itShard->second == 0) { + SchemeShards.erase(itShard); + } + } + SchemeShardPipes.erase(itShardServer); + return; + } +} + +void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) { + const auto& record = ev->Get()->Record; + const TNodeId nodeId = record.GetNodeId(); + auto pipeServerId = ev->Recipient; + + SA_LOG_D("[" << TabletID() << "] EvConnectNode" + << ", pipe server id = " << pipeServerId + << ", node id = " << nodeId + << ", have schemeshards count = " << record.HaveSchemeShardsSize() + << ", need schemeshards count = " << record.NeedSchemeShardsSize()); + + if (NodePipes.find(pipeServerId) == NodePipes.end()) { + NodePipes[pipeServerId] = nodeId; + ++Nodes[nodeId]; + } + + for (const auto& ssEntry : record.GetHaveSchemeShards()) { + RequestedSchemeShards.insert(ssEntry.GetSchemeShardId()); + } + + if (!IsPropagateInFlight) { + Schedule(PropagateInterval, new TEvPrivate::TEvPropagate()); + IsPropagateInFlight = true; + } + + std::vector ssIds; + ssIds.reserve(record.NeedSchemeShardsSize()); + for (const auto& ssId : record.GetNeedSchemeShards()) { + ssIds.push_back(ssId); + RequestedSchemeShards.insert(ssId); + } + + ProcessRequests(nodeId, ssIds); +} + +void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) { + const auto& record = ev->Get()->Record; + const auto nodeId = record.GetNodeId(); + + SA_LOG_D("[" << TabletID() << "] EvRequestStats" + << ", node id = " << nodeId + << ", schemeshard count = " << record.NeedSchemeShardsSize()); + + std::vector ssIds; + ssIds.reserve(record.NeedSchemeShardsSize()); + for (const auto& ssId : record.GetNeedSchemeShards()) { + ssIds.push_back(ssId); + } + + ProcessRequests(nodeId, ssIds); +} + +void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& ev) { + const auto& record = ev->Get()->Record; + const TSSId schemeShardId = record.GetSchemeShardId(); + auto pipeServerId = ev->Recipient; + + if (SchemeShardPipes.find(pipeServerId) == SchemeShardPipes.end()) { + SchemeShardPipes[pipeServerId] = schemeShardId; + ++SchemeShards[schemeShardId]; + } + + SA_LOG_D("[" << TabletID() << "] EvConnectSchemeShard" + << ", pipe server id = " << pipeServerId + << ", schemeshard id = " << schemeShardId); +} + +void TStatisticsAggregator::Handle(TEvPrivate::TEvFastPropagateCheck::TPtr&) { + SA_LOG_D("[" << TabletID() << "] EvFastPropagateCheck"); + + PropagateFastStatistics(); + + FastCheckInFlight = false; + FastCounter = StatsOptimizeFirstNodesCount; + FastNodes.clear(); + FastSchemeShards.clear(); +} + +void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) { + SA_LOG_D("[" << TabletID() << "] EvPropagate"); + + PropagateStatistics(); + + Schedule(PropagateInterval, new TEvPrivate::TEvPropagate()); +} + +void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector& ssIds) { + if (FastCounter > 0) { + --FastCounter; + SendStatisticsToNode(nodeId, ssIds); + } else { + FastNodes.insert(nodeId); + for (const auto& ssId : ssIds) { + FastSchemeShards.insert(ssId); + } + if (!FastCheckInFlight) { + Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck()); + FastCheckInFlight = true; + } + } +} + +void TStatisticsAggregator::SendStatisticsToNode(TNodeId nodeId, const std::vector& ssIds) { + SA_LOG_D("[" << TabletID() << "] SendStatisticsToNode()" + << ", node id = " << nodeId + << ", schemeshard count = " << ssIds.size()); + + std::vector nodeIds; + nodeIds.push_back(nodeId); + + PropagateStatisticsImpl(nodeIds, ssIds); +} + +void TStatisticsAggregator::PropagateStatistics() { + SA_LOG_D("[" << TabletID() << "] PropagateStatistics()" + << ", node count = " << Nodes.size() + << ", schemeshard count = " << RequestedSchemeShards.size()); + + if (Nodes.empty() || RequestedSchemeShards.empty()) { + return; + } + + std::vector nodeIds; + nodeIds.reserve(Nodes.size()); + for (const auto& [nodeId, _] : Nodes) { + nodeIds.push_back(nodeId); + } + std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator); + + std::vector ssIds; + ssIds.reserve(RequestedSchemeShards.size()); + for (const auto& ssId : RequestedSchemeShards) { + ssIds.push_back(ssId); + } + + PropagateStatisticsImpl(nodeIds, ssIds); +} + +void TStatisticsAggregator::PropagateFastStatistics() { + SA_LOG_D("[" << TabletID() << "] PropagateFastStatistics()" + << ", node count = " << FastNodes.size() + << ", schemeshard count = " << FastSchemeShards.size()); + + if (FastNodes.empty() || FastSchemeShards.empty()) { + return; + } + + std::vector nodeIds; + nodeIds.reserve(FastNodes.size()); + for (const auto& nodeId : FastNodes) { + nodeIds.push_back(nodeId); + } + std::shuffle(std::begin(nodeIds), std::end(nodeIds), RandomGenerator); + + std::vector ssIds; + ssIds.reserve(FastSchemeShards.size()); + for (const auto& ssId : FastSchemeShards) { + ssIds.push_back(ssId); + } + + PropagateStatisticsImpl(nodeIds, ssIds); +} + +void TStatisticsAggregator::PropagateStatisticsImpl( + const std::vector& nodeIds, const std::vector& ssIds) +{ + TNodeId leadingNodeId = nodeIds[0]; + + for (size_t index = 0; index < ssIds.size(); ) { + auto propagate = std::make_unique(); + auto* record = propagate->MutableRecord(); + record->MutableNodeIds()->Reserve(nodeIds.size() - 1); + for (size_t i = 1; i < nodeIds.size(); ++i) { + record->AddNodeIds(nodeIds[i]); + } + for (size_t size = 0; index < ssIds.size(); ++index) { + auto ssId = ssIds[index]; + auto* entry = record->AddEntries(); + entry->SetSchemeShardId(ssId); + auto itStats = BaseStats.find(ssId); + if (itStats != BaseStats.end()) { + entry->SetStats(itStats->second); + size += itStats->second.size(); + } else { + entry->SetStats(TString()); // stats are not sent from SA yet + } + if (size >= StatsSizeLimitBytes) { + ++index; + break; + } + } + Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release()); + } } void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) { diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index 18460fb4486d..5597fffb9b8a 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -11,6 +11,8 @@ #include +#include + namespace NKikimr::NStat { class TStatisticsAggregator : public TActor, public NTabletFlatExecutor::TTabletExecutedFlat { @@ -19,24 +21,30 @@ class TStatisticsAggregator : public TActor, public NTabl return NKikimrServices::TActivity::STATISTICS_AGGREGATOR; } - TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info); + TStatisticsAggregator(const NActors::TActorId& tablet, TTabletStorageInfo* info, bool forTests); private: + using TSSId = ui64; + using TNodeId = ui32; + using Schema = TAggregatorSchema; using TTxBase = NTabletFlatExecutor::TTransactionBase; struct TTxInitSchema; struct TTxInit; struct TTxConfigure; + struct TTxSchemeShardStats; struct TEvPrivate { enum EEv { - EvProcess = EventSpaceBegin(TEvents::ES_PRIVATE), + EvPropagate = EventSpaceBegin(TEvents::ES_PRIVATE), + EvFastPropagateCheck, EvEnd }; - struct TEvProcess : public TEventLocal {}; + struct TEvPropagate : public TEventLocal {}; + struct TEvFastPropagateCheck : public TEventLocal {}; }; private: @@ -50,26 +58,38 @@ class TStatisticsAggregator : public TActor, public NTabl NTabletFlatExecutor::ITransaction* CreateTxInit(); void Handle(TEvStatistics::TEvConfigureAggregator::TPtr& ev); - void Handle(TEvPrivate::TEvProcess::TPtr& ev); + void Handle(TEvStatistics::TEvSchemeShardStats::TPtr& ev); + void Handle(TEvPrivate::TEvPropagate::TPtr& ev); + void Handle(TEvStatistics::TEvConnectNode::TPtr& ev); + void Handle(TEvStatistics::TEvRequestStats::TPtr& ev); + void Handle(TEvStatistics::TEvConnectSchemeShard::TPtr& ev); + void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev); + void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev); + void Handle(TEvPrivate::TEvFastPropagateCheck::TPtr& ev); + + void ProcessRequests(TNodeId nodeId, const std::vector& ssIds); + void SendStatisticsToNode(TNodeId nodeId, const std::vector& ssIds); + void PropagateStatistics(); + void PropagateFastStatistics(); + void PropagateStatisticsImpl(const std::vector& nodeIds, const std::vector& ssIds); void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value); STFUNC(StateInit) { - switch(ev->GetTypeRewrite()) { - hFunc(TEvStatistics::TEvConfigureAggregator, Handle); - IgnoreFunc(TEvPrivate::TEvProcess); - default: - if (!HandleDefaultEvents(ev, SelfId())) { - LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "TStatisticsAggregator StateInit unexpected event 0x%08" PRIx32, ev->GetTypeRewrite()); - } - } + StateInitImpl(ev,SelfId()); } STFUNC(StateWork) { switch(ev->GetTypeRewrite()) { hFunc(TEvStatistics::TEvConfigureAggregator, Handle); - hFunc(TEvPrivate::TEvProcess, Handle); + hFunc(TEvStatistics::TEvSchemeShardStats, Handle); + hFunc(TEvPrivate::TEvPropagate, Handle); + hFunc(TEvStatistics::TEvConnectNode, Handle); + hFunc(TEvStatistics::TEvRequestStats, Handle); + hFunc(TEvStatistics::TEvConnectSchemeShard, Handle); + hFunc(TEvTabletPipe::TEvServerConnected, Handle); + hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); + hFunc(TEvPrivate::TEvFastPropagateCheck, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, @@ -80,6 +100,29 @@ class TStatisticsAggregator : public TActor, public NTabl private: TString Database; + + std::mt19937_64 RandomGenerator; + + static constexpr size_t StatsOptimizeFirstNodesCount = 3; // optimize first nodes - fast propagation + static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message + + TDuration PropagateInterval = TDuration::Minutes(3); + bool IsPropagateInFlight = false; // is slow propagation started + + std::unordered_map BaseStats; // schemeshard id -> serialized stats for all paths + + std::unordered_map SchemeShards; // all connected schemeshards + std::unordered_map SchemeShardPipes; // schemeshard pipe servers + + std::unordered_map Nodes; // all connected nodes + std::unordered_map NodePipes; // node pipe servers + + std::unordered_set RequestedSchemeShards; // all schemeshards that were requested from all nodes + + size_t FastCounter = StatsOptimizeFirstNodesCount; + bool FastCheckInFlight = false; + std::unordered_set FastNodes; // nodes for fast propagation + std::unordered_set FastSchemeShards; // schemeshards for fast propagation }; } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/schema.h b/ydb/core/statistics/aggregator/schema.h index a0a616746dd7..61385262a658 100644 --- a/ydb/core/statistics/aggregator/schema.h +++ b/ydb/core/statistics/aggregator/schema.h @@ -13,8 +13,17 @@ struct TAggregatorSchema : NIceDb::Schema { using TColumns = TableColumns; }; + struct BaseStats : Table<2> { + struct SchemeShardId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct Stats : Column<2, NScheme::NTypeIds::String> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + using TTables = SchemaTables< - SysParams + SysParams, + BaseStats >; using TSettings = SchemaSettings< diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index c4de87c000fa..8cfb57b8cc3f 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -16,8 +16,11 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { { // precharge auto sysParamsRowset = db.Table().Range().Select(); + auto baseStatsRowset = db.Table().Range().Select(); - if (!sysParamsRowset.IsReady()) { + if (!sysParamsRowset.IsReady() || + !baseStatsRowset.IsReady()) + { return false; } } @@ -48,6 +51,30 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { } } + // BaseStats + { + Self->BaseStats.clear(); + + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + ui64 schemeShardId = rowset.GetValue(); + TString stats = rowset.GetValue(); + + Self->BaseStats[schemeShardId] = stats; + + if (!rowset.Next()) { + return false; + } + } + + SA_LOG_D("[" << Self->TabletID() << "] Loading base stats: " + << "schemeshard count# " << Self->BaseStats.size()); + } + return true; } diff --git a/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp new file mode 100644 index 000000000000..fff0d67f2dbd --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp @@ -0,0 +1,43 @@ +#include "aggregator_impl.h" + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase { + NKikimrStat::TEvSchemeShardStats Record; + + TTxSchemeShardStats(TSelf* self, NKikimrStat::TEvSchemeShardStats&& record) + : TTxBase(self) + , Record(std::move(record)) + {} + + TTxType GetTxType() const override { return TXTYPE_SCHEMESHARD_STATS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + ui64 schemeShardId = Record.GetSchemeShardId(); + const auto& stats = Record.GetStats(); + + SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Execute: " + << "schemeshard id# " << schemeShardId + << ", stats size# " << stats.size()); + + NIceDb::TNiceDb db(txc.DB); + db.Table().Key(schemeShardId).Update( + NIceDb::TUpdate(stats)); + + Self->BaseStats[schemeShardId] = stats; + + return true; + } + + void Complete(const TActorContext&) override { + SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Complete"); + } +}; + +void TStatisticsAggregator::Handle(TEvStatistics::TEvSchemeShardStats::TPtr& ev) { + auto& record = ev->Get()->Record; + Execute(new TTxSchemeShardStats(this, std::move(record)), + TActivationContext::AsActorContext()); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make index 1dec5b495387..f8db9f8e1271 100644 --- a/ydb/core/statistics/aggregator/ya.make +++ b/ydb/core/statistics/aggregator/ya.make @@ -1,5 +1,10 @@ LIBRARY() +OWNER( + monster + g:kikimr +) + SRCS( aggregator.h aggregator.cpp @@ -10,6 +15,7 @@ SRCS( tx_configure.cpp tx_init.cpp tx_init_schema.cpp + tx_schemeshard_stats.cpp ) PEERDIR( diff --git a/ydb/core/statistics/events.h b/ydb/core/statistics/events.h index f867ffd3a549..055e8c0cafa6 100644 --- a/ydb/core/statistics/events.h +++ b/ydb/core/statistics/events.h @@ -44,11 +44,17 @@ struct TEvStatistics { EvGetStatisticsFromSS, // deprecated EvGetStatisticsFromSSResult, // deprecated - EvBroadcastStatistics, - EvRegisterNode, + EvBroadcastStatistics, // deprecated + EvRegisterNode, // deprecated EvConfigureAggregator, + EvConnectSchemeShard, + EvSchemeShardStats, + EvConnectNode, + EvRequestStats, + EvPropagateStatistics, + EvEnd }; @@ -60,19 +66,7 @@ struct TEvStatistics { bool Success = true; std::vector StatResponses; }; - - struct TEvBroadcastStatistics : public TEventPreSerializedPB< - TEvBroadcastStatistics, - NKikimrStat::TEvBroadcastStatistics, - EvBroadcastStatistics> - {}; - - struct TEvRegisterNode : public TEventPB< - TEvRegisterNode, - NKikimrStat::TEvRegisterNode, - EvRegisterNode> - {}; - + struct TEvConfigureAggregator : public TEventPB< TEvConfigureAggregator, NKikimrStat::TEvConfigureAggregator, @@ -85,6 +79,35 @@ struct TEvStatistics { } }; + struct TEvConnectSchemeShard : public TEventPB< + TEvConnectSchemeShard, + NKikimrStat::TEvConnectSchemeShard, + EvConnectSchemeShard> + {}; + + struct TEvSchemeShardStats : public TEventPB< + TEvSchemeShardStats, + NKikimrStat::TEvSchemeShardStats, + EvSchemeShardStats> + {}; + + struct TEvConnectNode : public TEventPB< + TEvConnectNode, + NKikimrStat::TEvConnectNode, + EvConnectNode> + {}; + + struct TEvRequestStats : public TEventPB< + TEvRequestStats, + NKikimrStat::TEvRequestStats, + EvRequestStats> + {}; + + struct TEvPropagateStatistics : public TEventPreSerializedPB< + TEvPropagateStatistics, + NKikimrStat::TEvPropagateStatistics, + EvPropagateStatistics> + {}; }; } // NStat diff --git a/ydb/core/statistics/stat_service.cpp b/ydb/core/statistics/stat_service.cpp index 76fd9e2f1337..9a4c802ca8b6 100644 --- a/ydb/core/statistics/stat_service.cpp +++ b/ydb/core/statistics/stat_service.cpp @@ -36,13 +36,13 @@ class TStatService : public TActorBootstrapped { STFUNC(StateWork) { switch(ev->GetTypeRewrite()) { + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleConfig) + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleConfig) hFunc(TEvStatistics::TEvGetStatistics, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(TEvStatistics::TEvBroadcastStatistics, Handle); + hFunc(TEvStatistics::TEvPropagateStatistics, Handle); hFunc(TEvTabletPipe::TEvClientConnected, Handle); hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleConfig) - hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, HandleConfig) cFunc(TEvents::TEvPoison::EventType, PassAway); default: LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, @@ -51,6 +51,24 @@ class TStatService : public TActorBootstrapped { } private: + bool IsSAUnavailable() { + return ResolveSAStage == RSA_FINISHED && StatisticsAggregatorId == 0; + } + + void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { + LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "Subscribed for config changes"); + } + + void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + const auto& record = ev->Get()->Record; + const auto& featureFlags = record.GetConfig().GetFeatureFlags(); + EnableStatistics = featureFlags.GetEnableStatistics(); + + auto response = std::make_unique(record); + Send(ev->Sender, response.release(), 0, ev->Cookie); + } + void Handle(TEvStatistics::TEvGetStatistics::TPtr& ev) { ui64 requestId = NextRequestId++; @@ -59,95 +77,167 @@ class TStatService : public TActorBootstrapped { request.EvCookie = ev->Cookie; request.StatRequests.swap(ev->Get()->StatRequests); - if (!EnableStatistics) { - ReplyFailed(requestId); + if (!EnableStatistics || IsSAUnavailable()) { + ReplyFailed(requestId, true); return; } using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); for (const auto& req : request.StatRequests) { - TNavigate::TEntry entry; + auto& entry = navigate->ResultSet.emplace_back(); entry.TableId = TTableId(req.PathId.OwnerId, req.PathId.LocalPathId); entry.Operation = TNavigate::EOp::OpPath; entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; - entry.RedirectRequired = false; - navigate->ResultSet.push_back(entry); } - navigate->Cookie = requestId; - Send(MakeSchemeCacheID(), - new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { using TNavigate = NSchemeCache::TSchemeCacheNavigate; std::unique_ptr navigate(ev->Get()->Request.Release()); - ui64 requestId = navigate->Cookie; + auto cookie = navigate->Cookie; + + if (cookie == ResolveSACookie) { + Y_ABORT_UNLESS(navigate->ResultSet.size() == 1); + auto& entry = navigate->ResultSet.back(); + if (entry.Status != TNavigate::EStatus::Ok) { + StatisticsAggregatorId = 0; + } else { + StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); + } + ResolveSAStage = RSA_FINISHED; + + if (StatisticsAggregatorId != 0) { + ConnectToSA(); + SyncNode(); + } else { + ReplyAllFailed(); + } + return; + } + + ui64 requestId = cookie; auto itRequest = InFlight.find(requestId); if (itRequest == InFlight.end()) { return; } + auto& request = itRequest->second; - if (!EnableStatistics) { - ReplyFailed(requestId); + if (!EnableStatistics || IsSAUnavailable()) { + ReplyFailed(requestId, true); return; } std::unordered_set ssIds; bool isServerless = false; + ui64 aggregatorId = 0; + TPathId resourcesDomainKey; for (const auto& entry : navigate->ResultSet) { if (entry.Status != TNavigate::EStatus::Ok) { continue; } - ssIds.insert(entry.DomainInfo->ExtractSchemeShard()); - isServerless = entry.DomainInfo->IsServerless(); + auto& domainInfo = entry.DomainInfo; + ssIds.insert(domainInfo->ExtractSchemeShard()); + aggregatorId = domainInfo->Params.GetStatisticsAggregator(); + isServerless = domainInfo->IsServerless(); + resourcesDomainKey = domainInfo->ResourcesDomainKey; } - if (ssIds.size() != 1 || isServerless) { - ReplyFailed(requestId); + if (ssIds.size() != 1) { + ReplyFailed(requestId, true); return; } + request.SchemeShardId = *ssIds.begin(); - if (SchemeShardId) { - if (SchemeShardId != *ssIds.begin()) { - ReplyFailed(requestId); + if (Statistics.find(request.SchemeShardId) != Statistics.end()) { + ReplySuccess(requestId, true); + return; + } + + switch (ResolveSAStage) { + case RSA_NOT_RUN: + if (!isServerless) { + StatisticsAggregatorId = aggregatorId; + ResolveSAStage = RSA_FINISHED; + } else { + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + auto navigate = std::make_unique(); + auto& entry = navigate->ResultSet.emplace_back(); + entry.TableId = TTableId(resourcesDomainKey.OwnerId, resourcesDomainKey.LocalPathId); + entry.Operation = TNavigate::EOp::OpPath; + entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; + entry.RedirectRequired = false; + navigate->Cookie = ResolveSACookie; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); + ResolveSAStage = RSA_IN_FLIGHT; return; } - } else { - SchemeShardId = *ssIds.begin(); - EstablishPipe(); - RegisterNode(); + break; + case RSA_IN_FLIGHT: + return; + default: + break; } - if (!HasStatistics) { - return; // reply on incoming broadcast + if (IsSAUnavailable()) { + ReplyFailed(requestId, true); + return; } - ReplySuccess(requestId, true); + if (!SAPipeClientId) { + ConnectToSA(); + SyncNode(); + } else { + auto requestStats = std::make_unique(); + requestStats->Record.SetNodeId(SelfId().NodeId()); + requestStats->Record.AddNeedSchemeShards(request.SchemeShardId); + NTabletPipe::SendData(SelfId(), SAPipeClientId, requestStats.release()); + } } - void Handle(TEvStatistics::TEvBroadcastStatistics::TPtr& ev) { + void Handle(TEvStatistics::TEvPropagateStatistics::TPtr& ev) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Handle TEvBroadcastStatistics, node id = " << SelfId().NodeId()); - - StatisticsMap.clear(); + "EvPropagateStatistics, node id = " << SelfId().NodeId()); auto* record = ev->Get()->MutableRecord(); for (const auto& entry : record->GetEntries()) { - TPathId pathId(entry.GetPathId().GetOwnerId(), entry.GetPathId().GetLocalId()); - auto& mapEntry = StatisticsMap[pathId]; - mapEntry.RowCount = entry.GetRowCount(); - mapEntry.BytesSize = entry.GetBytesSize(); - } + ui64 schemeShardId = entry.GetSchemeShardId(); + auto& statisticsState = Statistics[schemeShardId]; - HasStatistics = true; + if (entry.GetStats().empty()) { + continue; // stats are not ready in SA, wait for next cycle + } - for (const auto& [requestId, _] : InFlight) { - ReplySuccess(requestId, false); + statisticsState.Map.clear(); + + NKikimrStat::TSchemeShardStats statRecord; + Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(entry.GetStats()); + + for (const auto& pathEntry : statRecord.GetEntries()) { + TPathId pathId(pathEntry.GetPathId().GetOwnerId(), pathEntry.GetPathId().GetLocalId()); + auto& mapEntry = statisticsState.Map[pathId]; + mapEntry.RowCount = pathEntry.GetRowCount(); + mapEntry.BytesSize = pathEntry.GetBytesSize(); + } + } + + for (auto itReq = InFlight.begin(); itReq != InFlight.end(); ) { + auto requestId = itReq->first; + auto requestState = itReq->second; + if (requestState.SchemeShardId == 0) { + ++itReq; + continue; + } + if (Statistics.find(requestState.SchemeShardId) != Statistics.end()) { + ReplySuccess(requestId, false); + itReq = InFlight.erase(itReq); + } else { + ++itReq; + } } - InFlight.clear(); if (record->NodeIdsSize() == 0) { return; @@ -160,15 +250,15 @@ class TStatService : public TActorBootstrapped { } size_t step = 0; - if (nodeIds.size() <= STAT_FAN_OUT + 1) { + if (nodeIds.size() <= StatFanOut + 1) { step = 0; - } else if (nodeIds.size() <= STAT_FAN_OUT * (STAT_FAN_OUT + 1)) { - step = STAT_FAN_OUT; + } else if (nodeIds.size() <= StatFanOut * (StatFanOut + 1)) { + step = StatFanOut; } else { - step = nodeIds.size() / STAT_FAN_OUT; + step = nodeIds.size() / StatFanOut; } - auto serialized = std::make_unique(); + auto serialized = std::make_unique(); serialized->MutableRecord()->MutableEntries()->Swap(record->MutableEntries()); TString preSerializedStats; Y_PROTOBUF_SUPPRESS_NODISCARD serialized->GetRecord().SerializeToString(&preSerializedStats); @@ -176,83 +266,81 @@ class TStatService : public TActorBootstrapped { for (size_t i = 0; i < nodeIds.size(); ) { ui32 leadingNodeId = nodeIds[i++]; - auto broadcast = std::make_unique(); - broadcast->MutableRecord()->MutableNodeIds()->Reserve(step); + auto propagate = std::make_unique(); + propagate->MutableRecord()->MutableNodeIds()->Reserve(step); for (size_t j = 0; i < nodeIds.size() && j < step; ++i, ++j) { - broadcast->MutableRecord()->AddNodeIds(nodeIds[i]); + propagate->MutableRecord()->AddNodeIds(nodeIds[i]); } - broadcast->PreSerializedData = preSerializedStats; - Send(MakeStatServiceID(leadingNodeId), broadcast.release()); + propagate->PreSerializedData = preSerializedStats; + Send(MakeStatServiceID(leadingNodeId), propagate.release()); } } void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ClientConnected" + "EvClientConnected" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << ev->Get()->ClientId << ", server id = " << ev->Get()->ServerId << ", status = " << ev->Get()->Status); if (ev->Get()->Status != NKikimrProto::OK) { - SchemeShardPipeClient = TActorId(); - EstablishPipe(); - RegisterNode(); - } + SAPipeClientId = TActorId(); + ConnectToSA(); + SyncNode(); + } } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ClientDestroyed" + "EvClientDestroyed" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << ev->Get()->ClientId << ", server id = " << ev->Get()->ServerId); - SchemeShardPipeClient = TActorId(); - EstablishPipe(); - RegisterNode(); - } - - void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Subscribed for config changes"); - } - - void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { - const auto& record = ev->Get()->Record; - const auto& featureFlags = record.GetConfig().GetFeatureFlags(); - EnableStatistics = featureFlags.GetEnableStatistics(); - - auto response = std::make_unique(record); - Send(ev->Sender, response.release(), 0, ev->Cookie); + SAPipeClientId = TActorId(); + ConnectToSA(); + SyncNode(); } - void EstablishPipe() { - if (!SchemeShardPipeClient && SchemeShardId) { - auto policy = NTabletPipe::TClientRetryPolicy::WithRetries(); - NTabletPipe::TClientConfig pipeConfig{policy}; - SchemeShardPipeClient = Register(NTabletPipe::CreateClient(SelfId(), SchemeShardId, pipeConfig)); - - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EstablishPipe, pipe client id = " << SchemeShardPipeClient); + void ConnectToSA() { + if (SAPipeClientId || !StatisticsAggregatorId) { + return; } + auto policy = NTabletPipe::TClientRetryPolicy::WithRetries(); + NTabletPipe::TClientConfig pipeConfig{policy}; + SAPipeClientId = Register(NTabletPipe::CreateClient(SelfId(), StatisticsAggregatorId, pipeConfig)); + + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "ConnectToSA(), pipe client id = " << SAPipeClientId); } - void RegisterNode() { - if (!SchemeShardPipeClient) { + void SyncNode() { + if (!SAPipeClientId || !StatisticsAggregatorId) { return; } - - auto registerNode = std::make_unique(); - registerNode->Record.SetNodeId(SelfId().NodeId()); - registerNode->Record.SetHasStatistics(HasStatistics); - - NTabletPipe::SendData(SelfId(), SchemeShardPipeClient, registerNode.release()); + auto connect = std::make_unique(); + auto& record = connect->Record; + + record.SetNodeId(SelfId().NodeId()); + for (const auto& [ssId, ssState] : Statistics) { + auto* entry = record.AddHaveSchemeShards(); + entry->SetSchemeShardId(ssId); + entry->SetTimestamp(ssState.Timestamp); + } + std::unordered_set ssIds; + for (const auto& [reqId, reqState] : InFlight) { + if (reqState.SchemeShardId != 0) { + ssIds.insert(reqState.SchemeShardId); + } + } + for (const auto& ssId : ssIds) { + record.AddNeedSchemeShards(ssId); + } + NTabletPipe::SendData(SelfId(), SAPipeClientId, connect.release()); LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Send register node" - << ", node id = " << SelfId().NodeId() - << ", has statistics = " << HasStatistics); + "SyncNode(), pipe client id = " << SAPipeClientId); } void ReplySuccess(ui64 requestId, bool eraseRequest) { @@ -262,18 +350,26 @@ class TStatService : public TActorBootstrapped { } auto& request = itRequest->second; + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "ReplySuccess(), request id = " << requestId); + + auto itStatistics = Statistics.find(request.SchemeShardId); + if (itStatistics == Statistics.end()) { + return; + } + auto& statisticsMap = itStatistics->second.Map; + auto result = std::make_unique(); result->Success = true; for (auto& req : request.StatRequests) { - auto itStat = StatisticsMap.find(req.PathId); - TResponse rsp; rsp.Success = true; rsp.Req = req; TStatSimple stat; - if (itStat != StatisticsMap.end()) { + auto itStat = statisticsMap.find(req.PathId); + if (itStat != statisticsMap.end()) { stat.RowCount = itStat->second.RowCount; stat.BytesSize = itStat->second.BytesSize; } else { @@ -292,13 +388,16 @@ class TStatService : public TActorBootstrapped { } } - void ReplyFailed(ui64 requestId) { + void ReplyFailed(ui64 requestId, bool eraseRequest) { auto itRequest = InFlight.find(requestId); if (itRequest == InFlight.end()) { return; } auto& request = itRequest->second; + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "ReplyFailed(), request id = " << requestId); + auto result = std::make_unique(); result->Success = false; @@ -317,12 +416,21 @@ class TStatService : public TActorBootstrapped { Send(request.ReplyToActorId, result.release(), 0, request.EvCookie); - InFlight.erase(requestId); + if (eraseRequest) { + InFlight.erase(requestId); + } + } + + void ReplyAllFailed() { + for (const auto& [requestId, _] : InFlight) { + ReplyFailed(requestId, false); + } + InFlight.clear(); } void PassAway() { - if (SchemeShardPipeClient) { - NTabletPipe::CloseClient(SelfId(), SchemeShardPipeClient); + if (SAPipeClientId) { + NTabletPipe::CloseClient(SelfId(), SAPipeClientId); } TBase::PassAway(); } @@ -330,25 +438,38 @@ class TStatService : public TActorBootstrapped { private: bool EnableStatistics = false; + static constexpr size_t StatFanOut = 10; + struct TRequestState { NActors::TActorId ReplyToActorId; ui64 EvCookie = 0; + ui64 SchemeShardId = 0; std::vector StatRequests; }; - std::map InFlight; // request id -> state + std::unordered_map InFlight; // request id -> state ui64 NextRequestId = 1; - static const size_t STAT_FAN_OUT = 10; - struct TStatEntry { ui64 RowCount = 0; ui64 BytesSize = 0; }; - std::unordered_map StatisticsMap; - bool HasStatistics = false; + typedef std::unordered_map TStatisticsMap; + struct TStatisticsState { + TStatisticsMap Map; + ui64 Timestamp = 0; + }; + std::unordered_map Statistics; // ss id -> stats + + ui64 StatisticsAggregatorId = 0; + TActorId SAPipeClientId; - ui64 SchemeShardId = 0; - TActorId SchemeShardPipeClient; + static const ui64 ResolveSACookie = std::numeric_limits::max(); + enum EResolveSAStage { + RSA_NOT_RUN, + RSA_IN_FLIGHT, + RSA_FINISHED + }; + EResolveSAStage ResolveSAStage = RSA_NOT_RUN; }; THolder CreateStatService() { diff --git a/ydb/core/statistics/ut/ut_common.cpp b/ydb/core/statistics/ut/ut_common.cpp index 80b209722640..984a3466a7ef 100644 --- a/ydb/core/statistics/ut/ut_common.cpp +++ b/ydb/core/statistics/ut/ut_common.cpp @@ -15,8 +15,8 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings(const TString NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString &name, const TStoragePools &pools) { NKikimrSubDomains::TSubDomainSettings subdomain; subdomain.SetName(name); - subdomain.SetCoordinators(2); - subdomain.SetMediators(2); + subdomain.SetCoordinators(1); + subdomain.SetMediators(1); subdomain.SetPlanResolution(50); subdomain.SetTimeCastBucketsPerMediator(2); for (auto& pool: pools) { diff --git a/ydb/core/statistics/ut/ut_common.h b/ydb/core/statistics/ut/ut_common.h index dde9c4545219..56e49624d067 100644 --- a/ydb/core/statistics/ut/ut_common.h +++ b/ydb/core/statistics/ut/ut_common.h @@ -14,7 +14,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( class TTestEnv { public: - TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, ui32 storagePools = 0); + TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, ui32 storagePools = 1); ~TTestEnv(); Tests::TServer& GetServer() const { diff --git a/ydb/core/statistics/ut/ut_statistics.cpp b/ydb/core/statistics/ut/ut_statistics.cpp index d6d429bb6230..08fa79316c98 100644 --- a/ydb/core/statistics/ut/ut_statistics.cpp +++ b/ydb/core/statistics/ut/ut_statistics.cpp @@ -19,65 +19,120 @@ using namespace NYdb::NScheme; namespace { -void CreateDatabase(TTestEnv& env, const TString& databaseName) { +void CreateDatabase(TTestEnv& env, const TString& databaseName, size_t nodeCount = 1) { auto subdomain = GetSubDomainDeclareSettings(databaseName); UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, env.GetClient().CreateExtSubdomain("/Root", subdomain)); - env.GetTenants().Run("/Root/" + databaseName, 1); + env.GetTenants().Run("/Root/" + databaseName, nodeCount); auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); subdomainSettings.SetExternalSchemeShard(true); + subdomainSettings.SetExternalStatisticsAggregator(true); UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); } -void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { +void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, TPathId resourcesDomainKey) { + auto subdomain = GetSubDomainDeclareSettings(databaseName); + subdomain.MutableResourcesDomainKey()->SetSchemeShard(resourcesDomainKey.OwnerId); + subdomain.MutableResourcesDomainKey()->SetPathId(resourcesDomainKey.LocalPathId); + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + env.GetClient().CreateExtSubdomain("/Root", subdomain)); + + env.GetTenants().Run("/Root/" + databaseName, 0); + + auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); + subdomainSettings.SetExternalSchemeShard(true); + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); +} + +void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount) { TTableClient client(env.GetDriver()); auto session = client.CreateSession().GetValueSync().GetSession(); auto result = session.ExecuteSchemeQuery(Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, - Value String, + Value Uint64, PRIMARY KEY (Key) ); )", databaseName.c_str(), tableName.c_str())).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - result = session.ExecuteDataQuery(Sprintf(R"( - REPLACE INTO `Root/%s/%s` (Key, Value) VALUES - (1u, "A"), - (2u, "B"), - (3u, "C"); - )", databaseName.c_str(), tableName.c_str()), TTxControl::BeginTx().CommitTx()).GetValueSync(); + TStringBuilder replace; + replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", + databaseName.c_str(), tableName.c_str()); + for (ui32 i = 0; i < rowCount; ++i) { + if (i > 0) { + replace << ", "; + } + replace << Sprintf("(%uu, %uu)", i, i); + } + replace << ";"; + result = session.ExecuteDataQuery(replace, TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } -void CreateAll(TTestEnv& env) { - CreateDatabase(env, "Database"); - CreateTable(env, "Database", "Table"); -} +TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey = nullptr) { + auto sender = runtime.AllocateEdgeActor(); -std::unique_ptr Navigate(TTestActorRuntime& runtime, - const TActorId& sender, const TString& path, NSchemeCache::TSchemeCacheNavigate::EOp op) -{ using TNavigate = NSchemeCache::TSchemeCacheNavigate; using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet; using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult; - auto request = MakeHolder(); + auto request = std::make_unique(); auto& entry = request->ResultSet.emplace_back(); entry.Path = SplitPath(path); entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; - entry.Operation = op; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath; entry.ShowPrivatePath = true; - runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.Release())); + runtime.Send(MakeSchemeCacheID(), sender, new TEvRequest(request.release())); auto ev = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(ev); UNIT_ASSERT(ev->Get()); - return std::unique_ptr(ev->Get()->Request.Release()); + std::unique_ptr response(ev->Get()->Request.Release()); + UNIT_ASSERT(response->ResultSet.size() == 1); + auto& resultEntry = response->ResultSet[0]; + if (domainKey) { + *domainKey = resultEntry.DomainInfo->DomainKey; + } + return resultEntry.TableId.PathId; +} + +void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) { + auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex)); + ui64 rowCount = 0; + while (rowCount == 0) { + NStat::TRequest req; + req.StatType = NStat::EStatType::SIMPLE; + req.PathId = pathId; + + auto evGet = std::make_unique(); + evGet->StatRequests.push_back(req); + + auto sender = runtime.AllocateEdgeActor(nodeIndex); + runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true); + auto evResult = runtime.GrabEdgeEventRethrow(sender); + + UNIT_ASSERT(evResult); + UNIT_ASSERT(evResult->Get()); + UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1); + + auto rsp = evResult->Get()->StatResponses[0]; + auto stat = std::get(rsp.Statistics); + + rowCount = stat.RowCount; + + if (rowCount != 0) { + UNIT_ASSERT(stat.RowCount == expectedRowCount); + break; + } + + Sleep(TDuration::Seconds(5)); + } } } // namespace @@ -86,45 +141,110 @@ Y_UNIT_TEST_SUITE(Statistics) { Y_UNIT_TEST(Simple) { TTestEnv env(1, 1); - CreateAll(env); + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table", 5); + + auto& runtime = *env.GetServer().GetRuntime(); + auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); + + ValidateRowCount(runtime, 1, pathId, 5); + } + + Y_UNIT_TEST(TwoNodes) { + TTestEnv env(1, 2); + CreateDatabase(env, "Database", 2); + CreateTable(env, "Database", "Table", 5); + + auto& runtime = *env.GetServer().GetRuntime(); + auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table"); + + ValidateRowCount(runtime, 1, pathId1, 5); + ValidateRowCount(runtime, 2, pathId1, 5); + } + + Y_UNIT_TEST(TwoTables) { + TTestEnv env(1, 1); + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table1", 5); + CreateTable(env, "Database", "Table2", 6); + + auto& runtime = *env.GetServer().GetRuntime(); + auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); + auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); + + ValidateRowCount(runtime, 1, pathId1, 5); + ValidateRowCount(runtime, 1, pathId2, 6); + } + + Y_UNIT_TEST(TwoDatabases) { + TTestEnv env(1, 2); + CreateDatabase(env, "Database1"); + CreateDatabase(env, "Database2"); + CreateTable(env, "Database1", "Table1", 5); + CreateTable(env, "Database2", "Table2", 6); + + auto& runtime = *env.GetServer().GetRuntime(); + auto pathId1 = ResolvePathId(runtime, "/Root/Database1/Table1"); + auto pathId2 = ResolvePathId(runtime, "/Root/Database2/Table2"); + + ValidateRowCount(runtime, 1, pathId1, 5); + ValidateRowCount(runtime, 2, pathId2, 6); + } + + Y_UNIT_TEST(Serverless) { + TTestEnv env(1, 1); + CreateDatabase(env, "Shared"); + TPathId domainKey; auto& runtime = *env.GetServer().GetRuntime(); - auto navigate = Navigate(runtime, runtime.AllocateEdgeActor(), - "/Root/Database/Table", - NSchemeCache::TSchemeCacheNavigate::EOp::OpTable); - const auto& entry = navigate->ResultSet[0]; + ResolvePathId(runtime, "/Root/Shared", &domainKey); + CreateServerlessDatabase(env, "Serverless", domainKey); + CreateTable(env, "Serverless", "Table", 5); + + auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); - auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(0)); + ValidateRowCount(runtime, 1, pathId, 5); + } - ui64 rowCount = 0; - while (rowCount == 0) { - NStat::TRequest req; - req.StatType = NStat::EStatType::SIMPLE; - req.PathId = entry.TableId.PathId; + Y_UNIT_TEST(TwoServerlessDbs) { + TTestEnv env(1, 1); + CreateDatabase(env, "Shared"); - auto evGet = std::make_unique(); - evGet->StatRequests.push_back(req); + TPathId domainKey; + auto& runtime = *env.GetServer().GetRuntime(); + ResolvePathId(runtime, "/Root/Shared", &domainKey); + CreateServerlessDatabase(env, "Serverless1", domainKey); + CreateServerlessDatabase(env, "Serverless2", domainKey); + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); - auto sender = runtime.AllocateEdgeActor(); - runtime.Send(statServiceId, sender, evGet.release()); - auto evResult = runtime.GrabEdgeEventRethrow(sender); + auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); + auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); - UNIT_ASSERT(evResult); - UNIT_ASSERT(evResult->Get()); - UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1); + ValidateRowCount(runtime, 1, pathId1, 5); + ValidateRowCount(runtime, 1, pathId2, 6); + } - auto rsp = evResult->Get()->StatResponses[0]; - auto stat = std::get(rsp.Statistics); + Y_UNIT_TEST(TwoServerlessTwoSharedDbs) { + TTestEnv env(1, 2); + CreateDatabase(env, "Shared1"); + CreateDatabase(env, "Shared2"); - rowCount = stat.RowCount; + TPathId domainKey1, domainKey2; + auto& runtime = *env.GetServer().GetRuntime(); + ResolvePathId(runtime, "/Root/Shared1", &domainKey1); + ResolvePathId(runtime, "/Root/Shared2", &domainKey2); + CreateServerlessDatabase(env, "Serverless1", domainKey1); + CreateServerlessDatabase(env, "Serverless2", domainKey2); - if (rowCount != 0) { - UNIT_ASSERT(stat.RowCount == 3); - break; - } + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); - Sleep(TDuration::Seconds(5)); - } + auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); + auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); + + ValidateRowCount(runtime, 1, pathId1, 5); + ValidateRowCount(runtime, 2, pathId2, 6); } } diff --git a/ydb/core/statistics/ut/ya.make b/ydb/core/statistics/ut/ya.make index 8e9306d76b24..c691e9e34749 100644 --- a/ydb/core/statistics/ut/ya.make +++ b/ydb/core/statistics/ut/ya.make @@ -1,5 +1,10 @@ UNITTEST_FOR(ydb/core/statistics) +OWNER( + monster + g:kikimr +) + FORK_SUBTESTS() IF (WITH_VALGRIND) diff --git a/ydb/core/statistics/ya.make b/ydb/core/statistics/ya.make index 9907ee28ef45..423ec14e214f 100644 --- a/ydb/core/statistics/ya.make +++ b/ydb/core/statistics/ya.make @@ -1,5 +1,10 @@ LIBRARY() +OWNER( + monster + g:kikimr +) + SRCS( events.h stat_service.h diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 259d5cf6d9b2..a8cbc2c0da25 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -728,7 +728,7 @@ namespace Tests { TMailboxType::Revolving, appData.SystemPoolId)); localConfig.TabletClassInfo[TTabletTypes::StatisticsAggregator] = TLocalConfig::TTabletClassInfo(new TTabletSetupInfo( - &NStat::CreateStatisticsAggregator, TMailboxType::Revolving, appData.UserPoolId, + &NStat::CreateStatisticsAggregatorForTests, TMailboxType::Revolving, appData.UserPoolId, TMailboxType::Revolving, appData.SystemPoolId)); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 8e8a85102fce..b8726d933ec5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -129,8 +130,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, TActiva ctx.Send(TxAllocatorClient, MakeHolder(InitiateCachedTxIdsCount)); - GenerateStatisticsMap(); - ctx.Schedule(TDuration::Seconds(15), new TEvPrivate::TEvProcessStatistics()); + InitializeStatistics(ctx); Become(&TThis::StateWork); } @@ -4190,6 +4190,10 @@ void TSchemeShard::Die(const TActorContext &ctx) { ShardDeleter.Shutdown(ctx); ParentDomainLink.Shutdown(ctx); + if (SAPipeClientId) { + NTabletPipe::CloseClient(SelfId(), SAPipeClientId); + } + PipeClientCache->Detach(ctx); if (CompactionQueue) @@ -4509,9 +4513,8 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvPersQueue::TEvProposeTransactionAttachResult, Handle); - HFuncTraced(TEvPrivate::TEvProcessStatistics, Handle); - HFuncTraced(TEvPrivate::TEvStatFastBroadcastCheck, Handle); - HFuncTraced(NStat::TEvStatistics::TEvRegisterNode, Handle); + HFuncTraced(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + HFuncTraced(TEvPrivate::TEvSendBaseStatsToSA, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -5143,6 +5146,11 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc return; } + if (clientId == SAPipeClientId) { + ConnectToSA(); + return; + } + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Failed to connect" << ", to tablet: " << tabletId @@ -5193,34 +5201,17 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc return; } + if (clientId == SAPipeClientId) { + ConnectToSA(); + return; + } + BorrowedCompactionHandleDisconnect(tabletId, clientId); ConditionalEraseHandleDisconnect(tabletId, clientId, ctx); RestartPipeTx(tabletId, ctx); } -void TSchemeShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) { - auto serverId = ev->Get()->ServerId; - auto itServer = StatNodePipes.find(serverId); - - if (itServer != StatNodePipes.end()) { - auto nodeId = itServer->second; - StatNodePipes.erase(itServer); - - auto itNode = StatNodes.find(nodeId); - if (itNode != StatNodes.end()) { - --itNode->second; - if (itNode->second == 0) { - StatNodes.erase(itNode); - } - } - - LOG_DEBUG_S(ctx, NKikimrServices::STATISTICS, - "ServerDisconnected" - << ", node id = " << nodeId - << ", server id = " << serverId - << ", at schemeshard: " << TabletID()); - } - +void TSchemeShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &, const TActorContext &ctx) { LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Server pipe is reset" << ", at schemeshard: " << TabletID()); @@ -6911,147 +6902,110 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvLogin::TPtr &ev, const TActorContex Execute(CreateTxLogin(ev), ctx); } -void TSchemeShard::GenerateStatisticsMap() { - int count = 0; - - auto broadcast = std::make_unique(); - auto* record = broadcast->MutableRecord(); - - if (EnableStatistics) { - for (const auto& [pathId, tableInfo] : Tables) { - const auto& aggregated = tableInfo->GetStats().Aggregated; - auto* entry = record->AddEntries(); - auto* entryPathId = entry->MutablePathId(); - entryPathId->SetOwnerId(pathId.OwnerId); - entryPathId->SetLocalId(pathId.LocalPathId); - entry->SetRowCount(aggregated.RowCount); - entry->SetBytesSize(aggregated.DataSize); - ++count; - } - // TODO: column tables +void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext&) { + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + std::unique_ptr request(ev->Get()->Request.Release()); + if (request->ResultSet.size() != 1) { + return; } - - PreSerializedStatisticsMapData.clear(); - Y_PROTOBUF_SUPPRESS_NODISCARD record->SerializeToString(&PreSerializedStatisticsMapData); - - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Generate statistics map" - << ", table count = " << count - << ", at schemeshard: " << TabletID()); -} - -void TSchemeShard::BroadcastStatistics() { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Broadcast statistics" - << ", node count = " << StatNodes.size() - << ", at schemeshard: " << TabletID()); - - if (StatNodes.empty()) { + auto& entry = request->ResultSet.back(); + if (entry.Status != TNavigate::EStatus::Ok) { return; } - ui32 leadingNodeId = StatNodes.begin()->first; - - auto broadcast = std::make_unique(); - auto* record = broadcast->MutableRecord(); - record->MutableNodeIds()->Reserve(StatNodes.size()); - for (const auto& [nodeId, _] : StatNodes) { - if (nodeId == leadingNodeId) { - continue; - } - record->AddNodeIds(nodeId); + if (entry.DomainInfo->Params.HasStatisticsAggregator()) { + StatisticsAggregatorId = TTabletId(entry.DomainInfo->Params.GetStatisticsAggregator()); + ConnectToSA(); } - - broadcast->PreSerializedData = PreSerializedStatisticsMapData; - - Send(NStat::MakeStatServiceID(leadingNodeId), broadcast.release()); } -void TSchemeShard::BroadcastStatisticsFast() { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Broadcast statistics fast" - << ", node count = " << StatFastBroadcastNodes.size() - << ", at schemeshard: " << TabletID()); +void TSchemeShard::Handle(TEvPrivate::TEvSendBaseStatsToSA::TPtr&, const TActorContext& ctx) { + SendBaseStatsToSA(); + auto seconds = SendStatsIntervalMaxSeconds - SendStatsIntervalMinSeconds; + ctx.Schedule(TDuration::Seconds(SendStatsIntervalMinSeconds + RandomNumber(seconds)), + new TEvPrivate::TEvSendBaseStatsToSA()); +} - if (StatFastBroadcastNodes.empty()) { +void TSchemeShard::InitializeStatistics(const TActorContext& ctx) { + if (!EnableStatistics) { return; } + ResolveSA(); + ctx.Schedule(TDuration::Seconds(30), new TEvPrivate::TEvSendBaseStatsToSA()); +} - ui32 leadingNodeId = *StatFastBroadcastNodes.begin(); +void TSchemeShard::ResolveSA() { + auto subDomainInfo = SubDomains.at(RootPathId()); + if (IsServerlessDomain(subDomainInfo)) { + auto resourcesDomainId = subDomainInfo->GetResourcesDomainId(); - auto broadcast = std::make_unique(); - auto* record = broadcast->MutableRecord(); - record->MutableNodeIds()->Reserve(StatFastBroadcastNodes.size()); - for (const auto& nodeId : StatFastBroadcastNodes) { - if (nodeId == leadingNodeId) { - continue; - } - record->AddNodeIds(nodeId); + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + auto navigate = std::make_unique(); + auto& entry = navigate->ResultSet.emplace_back(); + entry.TableId = TTableId(resourcesDomainId.OwnerId, resourcesDomainId.LocalPathId); + entry.Operation = TNavigate::EOp::OpPath; + entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId; + entry.RedirectRequired = false; + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); + } else { + StatisticsAggregatorId = subDomainInfo->GetTenantStatisticsAggregatorID(); + ConnectToSA(); } +} - broadcast->PreSerializedData = PreSerializedStatisticsMapData; +void TSchemeShard::ConnectToSA() { + if (!EnableStatistics || !StatisticsAggregatorId) { + return; + } + auto policy = NTabletPipe::TClientRetryPolicy::WithRetries(); + NTabletPipe::TClientConfig pipeConfig{policy}; + SAPipeClientId = Register(NTabletPipe::CreateClient(SelfId(), (ui64)StatisticsAggregatorId, pipeConfig)); - Send(NStat::MakeStatServiceID(leadingNodeId), broadcast.release()); + auto connect = std::make_unique(); + connect->Record.SetSchemeShardId(TabletID()); - StatFastBroadcastNodes.clear(); -} + NTabletPipe::SendData(SelfId(), SAPipeClientId, connect.release()); -void TSchemeShard::SendStatisticsToNode(ui32 nodeId) { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Send statistics to node" - << ", node id = " << nodeId + "ConnectToSA()" + << ", pipe client id: " << SAPipeClientId << ", at schemeshard: " << TabletID()); - - auto broadcast = std::make_unique(); - broadcast->PreSerializedData = PreSerializedStatisticsMapData; - - Send(NStat::MakeStatServiceID(nodeId), broadcast.release()); } -void TSchemeShard::Handle(TEvPrivate::TEvProcessStatistics::TPtr&, const TActorContext& ctx) { - GenerateStatisticsMap(); - BroadcastStatistics(); - ctx.Schedule(TDuration::Minutes(2), new TEvPrivate::TEvProcessStatistics()); -} +void TSchemeShard::SendBaseStatsToSA() { + if (!EnableStatistics || !SAPipeClientId) { + return; + } -void TSchemeShard::Handle(TEvPrivate::TEvStatFastBroadcastCheck::TPtr&, const TActorContext& ctx) { - StatFastCheckInFlight = false; - BroadcastStatisticsFast(); + int count = 0; - if (++StatFastBroadcastCounter < STAT_OPTIMIZE_N_FIRST_NODES) { - ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStatFastBroadcastCheck()); - StatFastCheckInFlight = true; + NKikimrStat::TSchemeShardStats record; + for (const auto& [pathId, tableInfo] : Tables) { + const auto& aggregated = tableInfo->GetStats().Aggregated; + auto* entry = record.AddEntries(); + auto* entryPathId = entry->MutablePathId(); + entryPathId->SetOwnerId(pathId.OwnerId); + entryPathId->SetLocalId(pathId.LocalPathId); + entry->SetRowCount(aggregated.RowCount); + entry->SetBytesSize(aggregated.DataSize); + ++count; } -} + // TODO: add column tables -void TSchemeShard::Handle(NStat::TEvStatistics::TEvRegisterNode::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - const ui32 nodeId = record.GetNodeId(); - auto serverId = ev->Recipient; - const bool hasStatistics = record.GetHasStatistics(); + TString stats; + stats.clear(); + Y_PROTOBUF_SUPPRESS_NODISCARD record.SerializeToString(&stats); - if (StatNodePipes.find(serverId) == StatNodePipes.end()) { - StatNodePipes[serverId] = nodeId; - ++StatNodes[nodeId]; - } + auto event = std::make_unique(); + event->Record.SetSchemeShardId(TabletID()); + event->Record.SetStats(stats); - if (!hasStatistics) { - if (StatFastBroadcastCounter > 0) { - --StatFastBroadcastCounter; - SendStatisticsToNode(nodeId); - } else { - StatFastBroadcastNodes.insert(nodeId); - } - if (!StatFastCheckInFlight) { - ctx.Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvStatFastBroadcastCheck()); - StatFastCheckInFlight = true; - } - } + NTabletPipe::SendData(SelfId(), SAPipeClientId, event.release()); - LOG_DEBUG_S(ctx, NKikimrServices::STATISTICS, - "Register node" - << ", server id = " << serverId - << ", node id = " << nodeId + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "SendBaseStatsToSA()" + << ", path count: " << count << ", at schemeshard: " << TabletID()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 0bf75f3117d2..bb7cc3b2373d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -45,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -1262,22 +1263,18 @@ class TSchemeShard // } // NCdcStreamScan // statistics - TString PreSerializedStatisticsMapData; - std::unordered_map StatNodes; - std::unordered_map StatNodePipes; - - static constexpr size_t STAT_OPTIMIZE_N_FIRST_NODES = 2; - size_t StatFastBroadcastCounter = STAT_OPTIMIZE_N_FIRST_NODES; - bool StatFastCheckInFlight = false; - std::unordered_set StatFastBroadcastNodes; - - void Handle(TEvPrivate::TEvProcessStatistics::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPrivate::TEvStatFastBroadcastCheck::TPtr& ev, const TActorContext& ctx); - void Handle(NStat::TEvStatistics::TEvRegisterNode::TPtr& ev, const TActorContext& ctx); - void GenerateStatisticsMap(); - void BroadcastStatistics(); - void BroadcastStatisticsFast(); - void SendStatisticsToNode(ui32 nodeId); + TTabletId StatisticsAggregatorId; + TActorId SAPipeClientId; + static constexpr ui64 SendStatsIntervalMinSeconds = 180; + static constexpr ui64 SendStatsIntervalMaxSeconds = 240; + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&, const TActorContext& ctx); + void Handle(TEvPrivate::TEvSendBaseStatsToSA::TPtr& ev, const TActorContext& ctx); + + void InitializeStatistics(const TActorContext& ctx); + void ResolveSA(); + void ConnectToSA(); + void SendBaseStatsToSA(); public: void ChangeStreamShardsCount(i64 delta) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 6d0cb3c4cab4..aed184ba4ba3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -28,8 +28,7 @@ struct TEvPrivate { EvConsoleConfigsTimeout, EvRunCdcStreamScan, EvPersistTopicStats, - EvProcessStatistics, - EvStatFastBroadcastCheck, + EvSendBaseStatsToSA, EvEnd }; @@ -181,10 +180,7 @@ struct TEvPrivate { {} }; - struct TEvProcessStatistics: public TEventLocal { - }; - - struct TEvStatFastBroadcastCheck: public TEventLocal { + struct TEvSendBaseStatsToSA: public TEventLocal { }; }; // TEvPrivate