diff --git a/ydb/core/base/pool_stats_collector.cpp b/ydb/core/base/pool_stats_collector.cpp index 0ddce4c08d37..7a06d5a3dd9f 100644 --- a/ydb/core/base/pool_stats_collector.cpp +++ b/ydb/core/base/pool_stats_collector.cpp @@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor { void OnWakeup(const TActorContext &ctx) override { MiniKQLPoolStats.Update(); - TVector> pools; + TVector> pools; for (const auto& pool : PoolCounters) { - pools.emplace_back(pool.Name, pool.Usage, pool.Threads); + pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads); } ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools)); diff --git a/ydb/core/mind/hive/tx__update_tablet_metrics.cpp b/ydb/core/mind/hive/tx__update_tablet_metrics.cpp index 7e6150ec9a83..ed7d689456cc 100644 --- a/ydb/core/mind/hive/tx__update_tablet_metrics.cpp +++ b/ydb/core/mind/hive/tx__update_tablet_metrics.cpp @@ -54,6 +54,7 @@ class TTxUpdateTabletMetrics : public TTransactionBase { } TNodeInfo* node = Self->FindNode(nodeId); if (node != nullptr) { + node->UpdateResourceMaximum(record.GetResourceMaximum()); node->UpdateResourceTotalUsage(record); node->Statistics.SetLastAliveTimestamp(now.MilliSeconds()); node->ActualizeNodeStatistics(now); diff --git a/ydb/core/mind/local.cpp b/ydb/core/mind/local.cpp index ea60b85275a8..ebd564891b0a 100644 --- a/ydb/core/mind/local.cpp +++ b/ydb/core/mind/local.cpp @@ -109,6 +109,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped { ui64 UserPoolUsage = 0; // (usage uS x threads) / sec ui64 MemUsage = 0; ui64 MemLimit = 0; + ui64 CpuLimit = 0; // PotentialMaxThreadCount of UserPool double NodeUsage = 0; bool SentDrainNode = false; @@ -272,28 +273,28 @@ class TLocalNodeRegistrar : public TActorBootstrapped { HandlePipeDestroyed(ctx); } - void SendStatusOk(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::LOCAL, "TLocalNodeRegistrar SendStatusOk"); - TAutoPtr eventStatus = new TEvLocal::TEvStatus(TEvLocal::TEvStatus::StatusOk); - auto& record = eventStatus->Record; - record.SetStartTime(StartTime.GetValue()); - record.MutableResourceMaximum()->CopyFrom(ResourceLimit); - if (!record.GetResourceMaximum().HasCPU()) { - TExecutorPoolStats poolStats; - TVector statsCopy; - TVector sharedStatsCopy; - ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy); - if (!statsCopy.empty()) { - record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000); + void FillResourceMaximum(NKikimrTabletBase::TMetrics* record) { + record->CopyFrom(ResourceLimit); + if (!record->HasCPU()) { + if (CpuLimit != 0) { + record->SetCPU(CpuLimit); } } - if (!record.GetResourceMaximum().HasMemory()) { + if (!record->HasMemory()) { if (MemLimit != 0) { - record.MutableResourceMaximum()->SetMemory(MemLimit); + record->SetMemory(MemLimit); } else { - record.MutableResourceMaximum()->SetMemory(NSystemInfo::TotalMemorySize()); + record->SetMemory(NSystemInfo::TotalMemorySize()); } } + } + + void SendStatusOk(const TActorContext &ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::LOCAL, "TLocalNodeRegistrar SendStatusOk"); + TAutoPtr eventStatus = new TEvLocal::TEvStatus(TEvLocal::TEvStatus::StatusOk); + auto& record = eventStatus->Record; + record.SetStartTime(StartTime.GetValue()); + FillResourceMaximum(record.MutableResourceMaximum()); NTabletPipe::SendData(ctx, HivePipeClient, eventStatus.Release()); } @@ -587,6 +588,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped { record.MutableTotalResourceUsage()->SetMemory(MemUsage); } record.SetTotalNodeUsage(NodeUsage); + FillResourceMaximum(record.MutableResourceMaximum()); NTabletPipe::SendData(ctx, HivePipeClient, event.Release()); SendTabletMetricsTime = ctx.Now(); } else { @@ -649,7 +651,8 @@ class TLocalNodeRegistrar : public TActorBootstrapped { const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0); if (static_cast(info.PoolStatsSize()) > AppData()->UserPoolId) { const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId)); - UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS + CpuLimit = poolStats.limit() * 1'000'000; // microseconds + UserPoolUsage = poolStats.usage() * CpuLimit; // microseconds } // Note: we use allocated memory because MemoryUsed(AnonRSS) has lag diff --git a/ydb/core/node_whiteboard/node_whiteboard.h b/ydb/core/node_whiteboard/node_whiteboard.h index 2b697e1a81ca..ea1ae0735e39 100644 --- a/ydb/core/node_whiteboard/node_whiteboard.h +++ b/ydb/core/node_whiteboard/node_whiteboard.h @@ -361,12 +361,13 @@ struct TEvWhiteboard{ } } - TEvSystemStateUpdate(const TVector>& poolStats) { + TEvSystemStateUpdate(const TVector>& poolStats) { for (const auto& row : poolStats) { auto& pb = *Record.AddPoolStats(); pb.SetName(std::get<0>(row)); pb.SetUsage(std::get<1>(row)); pb.SetThreads(std::get<2>(row)); + pb.SetLimit(std::get<3>(row)); } } diff --git a/ydb/core/protos/hive.proto b/ydb/core/protos/hive.proto index bb9ef0d6e087..64e4e7c216f2 100644 --- a/ydb/core/protos/hive.proto +++ b/ydb/core/protos/hive.proto @@ -246,6 +246,7 @@ message TEvTabletMetrics { repeated TTabletMetrics TabletMetrics = 1; optional NKikimrTabletBase.TMetrics TotalResourceUsage = 2; optional double TotalNodeUsage = 3; + optional NKikimrTabletBase.TMetrics ResourceMaximum = 4; } message TEvReassignTablet { diff --git a/ydb/core/protos/node_whiteboard.proto b/ydb/core/protos/node_whiteboard.proto index 0492c3aa912a..090fd2909c75 100644 --- a/ydb/core/protos/node_whiteboard.proto +++ b/ydb/core/protos/node_whiteboard.proto @@ -277,6 +277,7 @@ message TSystemStateInfo { optional string Name = 1; optional double Usage = 2 [(InsignificantChangePercent) = 30]; optional uint32 Threads = 3; + optional uint32 Limit = 4; } message TEndpoint { diff --git a/ydb/library/actors/core/actorsystem.cpp b/ydb/library/actors/core/actorsystem.cpp index 7bac61b45e9e..e9db7b88c6eb 100644 --- a/ydb/library/actors/core/actorsystem.cpp +++ b/ydb/library/actors/core/actorsystem.cpp @@ -338,4 +338,13 @@ namespace NActors { CpuManager->Cleanup(); Scheduler.Destroy(); } + + void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const { + CpuManager->GetExecutorPoolState(poolId, state); + } + + void TActorSystem::GetExecutorPoolStates(std::vector &states) const { + CpuManager->GetExecutorPoolStates(states); + } + } diff --git a/ydb/library/actors/core/actorsystem.h b/ydb/library/actors/core/actorsystem.h index 10bdad11e1d3..cadc9047d339 100644 --- a/ydb/library/actors/core/actorsystem.h +++ b/ydb/library/actors/core/actorsystem.h @@ -306,5 +306,8 @@ namespace NActors { return CpuManager->GetBasicExecutorPools(); } + void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const; + void GetExecutorPoolStates(std::vector &states) const; + }; } diff --git a/ydb/library/actors/core/cpu_manager.cpp b/ydb/library/actors/core/cpu_manager.cpp index 8faad3151213..1c286d9bd79b 100644 --- a/ydb/library/actors/core/cpu_manager.cpp +++ b/ydb/library/actors/core/cpu_manager.cpp @@ -1,5 +1,6 @@ #include "cpu_manager.h" #include "executor_pool_jail.h" +#include "mon_stats.h" #include "probes.h" #include "executor_pool_basic.h" @@ -172,4 +173,17 @@ namespace NActors { } } + void TCpuManager::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const { + if (static_cast(poolId) < ExecutorPoolCount) { + Executors[poolId]->GetExecutorPoolState(state); + } + } + + void TCpuManager::GetExecutorPoolStates(std::vector &states) const { + states.resize(ExecutorPoolCount); + for (i16 poolId = 0; poolId < static_cast(ExecutorPoolCount); ++poolId) { + GetExecutorPoolState(poolId, states[poolId]); + } + } + } diff --git a/ydb/library/actors/core/cpu_manager.h b/ydb/library/actors/core/cpu_manager.h index 322b16c83bcc..1fee9467987d 100644 --- a/ydb/library/actors/core/cpu_manager.h +++ b/ydb/library/actors/core/cpu_manager.h @@ -5,6 +5,7 @@ #include "harmonizer.h" #include "executor_pool.h" #include "executor_pool_shared.h" +#include "mon_stats.h" #include namespace NActors { @@ -47,6 +48,8 @@ namespace NActors { } void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector& statsCopy, TVector& sharedStatsCopy) const; + void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const; + void GetExecutorPoolStates(std::vector &states) const; THarmonizerStats GetHarmonizerStats() const { if (Harmonizer) { diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 9061412d5132..9e283c7a5b35 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -9,6 +9,7 @@ namespace NActors { struct TMailboxHeader; struct TWorkerContext; struct TExecutorPoolStats; + struct TExecutorPoolState; struct TExecutorThreadStats; class TExecutorPoolJail; class ISchedulerCookie; @@ -108,6 +109,10 @@ namespace NActors { Y_UNUSED(statsCopy); } + virtual void GetExecutorPoolState(TExecutorPoolState &poolState) const { + Y_UNUSED(poolState); + } + virtual TString GetName() const { return TString(); } diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 25a6c2562fc4..4a940a9ca37f 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -9,6 +9,7 @@ #include "mailbox.h" #include "thread_context.h" #include +#include #include #include @@ -425,6 +426,19 @@ namespace NActors { } } + void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { + if (Harmonizer) { + TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); + poolState.UsedCpu = stats.AvgConsumedCpu; + poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; + } else { + poolState.PossibleMaxLimit = poolState.MaxLimit; + } + poolState.CurrentLimit = GetThreadCount(); + poolState.MaxLimit = GetMaxThreadCount(); + poolState.MinLimit = GetDefaultThreadCount(); + } + void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) { TAffinityGuard affinityGuard(Affinity()); diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index 8bede0e7c6b0..50418ff9bb92 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -251,6 +251,7 @@ namespace NActors { void Shutdown() override; void GetCurrentStats(TExecutorPoolStats& poolStats, TVector& statsCopy) const override; + void GetExecutorPoolState(TExecutorPoolState &poolState) const override; TString GetName() const override { return PoolName; } diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 463f7e9431c1..d7b01339671d 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -12,14 +12,16 @@ namespace NActors { , PoolName(poolName) {} - TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg) + TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer) : TIOExecutorPool( cfg.PoolId, cfg.Threads, cfg.PoolName, new TAffinity(cfg.Affinity) ) - {} + { + Harmonizer = harmonizer; + } TIOExecutorPool::~TIOExecutorPool() { Threads.Destroy(); @@ -148,6 +150,17 @@ namespace NActors { } } + void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { + if (Harmonizer) { + TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); + poolState.UsedCpu = stats.AvgConsumedCpu; + } + poolState.CurrentLimit = PoolThreads; + poolState.MaxLimit = PoolThreads; + poolState.MinLimit = PoolThreads; + poolState.PossibleMaxLimit = PoolThreads; + } + TString TIOExecutorPool::GetName() const { return PoolName; } diff --git a/ydb/library/actors/core/executor_pool_io.h b/ydb/library/actors/core/executor_pool_io.h index 6c890a066764..c4fd95b4890b 100644 --- a/ydb/library/actors/core/executor_pool_io.h +++ b/ydb/library/actors/core/executor_pool_io.h @@ -3,6 +3,7 @@ #include "actorsystem.h" #include "executor_thread.h" #include "executor_thread_ctx.h" +#include "harmonizer.h" #include "scheduler_queue.h" #include "executor_pool_base.h" #include @@ -20,12 +21,13 @@ namespace NActors { THolder ScheduleQueue; TTicketLock ScheduleLock; + IHarmonizer *Harmonizer = nullptr; const TString PoolName; const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex(); public: TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr); - explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg); + explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr); ~TIOExecutorPool(); ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override; @@ -42,6 +44,7 @@ namespace NActors { void Shutdown() override; void GetCurrentStats(TExecutorPoolStats& poolStats, TVector& statsCopy) const override; + void GetExecutorPoolState(TExecutorPoolState &poolState) const override; TString GetName() const override; }; } diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index fabbb5280e3d..706235b68647 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -10,6 +10,7 @@ #include "executor_pool_basic_feature_flags.h" #include "executor_pool_shared.h" +#include #include #include #include @@ -202,10 +203,11 @@ struct TPoolInfo { TValueHistory<16> Consumed; TValueHistory<16> Booked; - TAtomic MaxConsumedCpu = 0; - TAtomic MinConsumedCpu = 0; - TAtomic MaxBookedCpu = 0; - TAtomic MinBookedCpu = 0; + std::atomic MaxConsumedCpu = 0; + std::atomic MinConsumedCpu = 0; + std::atomic AvgConsumedCpu = 0; + std::atomic MaxBookedCpu = 0; + std::atomic MinBookedCpu = 0; std::unique_ptr> WaitingStats; std::unique_ptr> MovingWaitingStats; @@ -313,11 +315,12 @@ TCpuConsumption TPoolInfo::PullStats(ui64 ts) { } Consumed.Register(ts, acc.ConsumedUs); - RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); - RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); + MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed); + MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed); + AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); Booked.Register(ts, acc.BookedUs); - RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); - RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); + MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed); + MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed); NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; if (WaitingStats && BasicPool) { @@ -359,7 +362,7 @@ class THarmonizer: public IHarmonizer { std::atomic IsDisabled = false; TSpinLock Lock; std::atomic NextHarmonizeTs = 0; - std::vector Pools; + std::vector> Pools; std::vector PriorityOrder; TValueHistory<16> Consumed; @@ -404,8 +407,8 @@ double THarmonizer::Rescale(double value) const { void THarmonizer::PullStats(ui64 ts) { TCpuConsumption acc; - for (TPoolInfo &pool : Pools) { - TCpuConsumption consumption = pool.PullStats(ts); + for (auto &pool : Pools) { + TCpuConsumption consumption = pool->PullStats(ts); acc.Add(consumption); } Consumed.Register(ts, acc.ConsumedUs); @@ -442,7 +445,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { ui64 TotalAwakeningTime = 0; ui64 TotalAwakenings = 0; for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = Pools[poolIdx]; + TPoolInfo& pool = *Pools[poolIdx]; if (pool.WaitingStats) { TotalWakingUpTime += pool.WaitingStats->WakingUpTotalTime; TotalWakingUps += pool.WaitingStats->WakingUpCount; @@ -472,7 +475,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { LWPROBE(WakingUpConsumption, Ts2Us(avgWakingUpTime), Ts2Us(avgWakingUpTime), Ts2Us(avgAwakeningTime), Ts2Us(realAvgAwakeningTime), Ts2Us(avgWakingUpConsumption)); for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = Pools[poolIdx]; + TPoolInfo& pool = *Pools[poolIdx]; if (!pool.BasicPool) { continue; } @@ -512,7 +515,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = Pools[poolIdx]; + TPoolInfo& pool = *Pools[poolIdx]; total += pool.DefaultThreadCount; i16 currentFullThreadCount = pool.GetFullThreadCount(); @@ -594,19 +597,19 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (needyPools.size()) { Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs].Priority != Pools[rhs].Priority) { - return Pools[lhs].Priority > Pools[rhs].Priority; + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority > Pools[rhs]->Priority; } - return Pools[lhs].Pool->PoolId < Pools[rhs].Pool->PoolId; + return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; }); } if (freeHalfThread.size()) { Sort(freeHalfThread.begin(), freeHalfThread.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs].Priority != Pools[rhs].Priority) { - return Pools[lhs].Priority > Pools[rhs].Priority; + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority > Pools[rhs]->Priority; } - return Pools[lhs].Pool->PoolId < Pools[rhs].Pool->PoolId; + return Pools[lhs]->Pool->PoolId < Pools[rhs]->Pool->PoolId; }); } @@ -618,7 +621,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { // do nothing } else { for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = Pools[poolIdx]; + TPoolInfo &pool = *Pools[poolIdx]; i64 threadCount = pool.GetFullThreadCount(); if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { Shared->ReturnOwnHalfThread(poolIdx); @@ -641,7 +644,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } } else { for (size_t needyPoolIdx : needyPools) { - TPoolInfo &pool = Pools[needyPoolIdx]; + TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); if (budget >= 1.0) { if (threadCount + 1 <= pool.MaxFullThreadCount) { @@ -673,7 +676,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (budget < 1.0) { size_t takingAwayThreads = 0; for (size_t needyPoolIdx : needyPools) { - TPoolInfo &pool = Pools[needyPoolIdx]; + TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; if (sumOfAdditionalThreads < takingAwayThreads + 1) { @@ -695,7 +698,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { break; } - TPoolInfo &pool = Pools[poolIdx]; + TPoolInfo &pool = *Pools[poolIdx]; size_t threadCount = pool.GetFullThreadCount(); size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultFullThreadCount); size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); @@ -712,7 +715,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } for (size_t hoggishPoolIdx : hoggishPools) { - TPoolInfo &pool = Pools[hoggishPoolIdx]; + TPoolInfo &pool = *Pools[hoggishPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); if (hasBorrowedSharedThread[hoggishPoolIdx]) { Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); @@ -730,7 +733,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { - TPoolInfo& pool = Pools[poolIdx]; + TPoolInfo& pool = *Pools[poolIdx]; AtomicSet(pool.PotentialMaxThreadCount, std::min(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt)); } } @@ -739,10 +742,10 @@ void THarmonizer::CalculatePriorityOrder() { PriorityOrder.resize(Pools.size()); Iota(PriorityOrder.begin(), PriorityOrder.end(), 0); Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) { - if (Pools[lhs].Priority != Pools[rhs].Priority) { - return Pools[lhs].Priority < Pools[rhs].Priority; + if (Pools[lhs]->Priority != Pools[rhs]->Priority) { + return Pools[lhs]->Priority < Pools[rhs]->Priority; } - return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId; + return Pools[lhs]->Pool->PoolId > Pools[rhs]->Pool->PoolId; }); } @@ -781,7 +784,8 @@ void THarmonizer::DeclareEmergency(ui64 ts) { void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { TGuard guard(Lock); - TPoolInfo poolInfo; + Pools.emplace_back(new TPoolInfo); + TPoolInfo &poolInfo = *Pools.back(); poolInfo.Pool = pool; poolInfo.Shared = Shared; poolInfo.BasicPool = dynamic_cast(pool); @@ -805,7 +809,6 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { poolInfo.WaitingStats.reset(new TWaitingStats()); poolInfo.MovingWaitingStats.reset(new TWaitingStats()); } - Pools.push_back(std::move(poolInfo)); PriorityOrder.clear(); } @@ -819,7 +822,7 @@ IHarmonizer* MakeHarmonizer(ui64 ts) { } TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { - const TPoolInfo &pool = Pools[poolId]; + const TPoolInfo &pool = *Pools[poolId]; ui64 flags = RelaxedLoad(&pool.LastFlags); return TPoolHarmonizerStats{ .IncreasingThreadsByNeedyState = static_cast(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), @@ -827,10 +830,11 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { .DecreasingThreadsByStarvedState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), .DecreasingThreadsByExchange = static_cast(RelaxedLoad(&pool.DecreasingThreadsByExchange)), - .MaxConsumedCpu = static_cast(RelaxedLoad(&pool.MaxConsumedCpu)), - .MinConsumedCpu = static_cast(RelaxedLoad(&pool.MinConsumedCpu)), - .MaxBookedCpu = static_cast(RelaxedLoad(&pool.MaxBookedCpu)), - .MinBookedCpu = static_cast(RelaxedLoad(&pool.MinBookedCpu)), + .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed), + .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed), + .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed), + .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed), + .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed), .PotentialMaxThreadCount = static_cast(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast(flags & 1), .IsStarved = static_cast(flags & 2), diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer.h index 8eee2a3e4756..b4323ef8de13 100644 --- a/ydb/library/actors/core/harmonizer.h +++ b/ydb/library/actors/core/harmonizer.h @@ -17,10 +17,11 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - i64 MaxConsumedCpu = 0.0; - i64 MinConsumedCpu = 0.0; - i64 MaxBookedCpu = 0.0; - i64 MinBookedCpu = 0.0; + float MaxConsumedCpu = 0.0; + float MinConsumedCpu = 0.0; + float AvgConsumedCpu = 0.0; + float MaxBookedCpu = 0.0; + float MinBookedCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 9bb0df1b065e..4bceffa8f373 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -40,6 +40,22 @@ namespace NActors { ui64 Buckets[65]; }; + struct TExecutorPoolState { + float UsedCpu = 0; + float CurrentLimit = 0; + float PossibleMaxLimit = 0; + float MaxLimit = 0; + float MinLimit = 0; + + void Aggregate(const TExecutorPoolState& other) { + UsedCpu += other.UsedCpu; + CurrentLimit += other.CurrentLimit; + PossibleMaxLimit += other.PossibleMaxLimit; + MaxLimit += other.MaxLimit; + MinLimit += other.MinLimit; + } + }; + struct TExecutorPoolStats { ui64 MaxUtilizationTime = 0; ui64 IncreasingThreadsByNeedyState = 0; diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h index a2f9c8dfc5c4..fb4c3d9286b8 100644 --- a/ydb/library/actors/helpers/pool_stats_collector.h +++ b/ydb/library/actors/helpers/pool_stats_collector.h @@ -214,6 +214,7 @@ class TStatsCollectingActor : public TActorBootstrapped { THPTimer UsageTimer; TString Name; double Threads; + double LimitThreads; void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { LastElapsedSeconds = 0; @@ -221,6 +222,7 @@ class TStatsCollectingActor : public TActorBootstrapped { UsageTimer.Reset(); Name = poolName; Threads = threads; + LimitThreads = threads; PoolGroup = group->GetSubgroup("execpool", poolName); @@ -374,6 +376,7 @@ class TStatsCollectingActor : public TActorBootstrapped { Y_UNUSED(stats); #endif Threads = poolStats.CurrentThreadCount; + LimitThreads = poolStats.PotentialMaxThreadCount; } };