diff --git a/ydb/library/actors/core/executor_pool.h b/ydb/library/actors/core/executor_pool.h index 9e283c7a5b35..1260c76e4efd 100644 --- a/ydb/library/actors/core/executor_pool.h +++ b/ydb/library/actors/core/executor_pool.h @@ -15,13 +15,13 @@ namespace NActors { class ISchedulerCookie; struct TCpuConsumption { - double ConsumedUs = 0; - double BookedUs = 0; + double ElapsedUs = 0; + double CpuUs = 0; ui64 NotEnoughCpuExecutions = 0; void Add(const TCpuConsumption& other) { - ConsumedUs += other.ConsumedUs; - BookedUs += other.BookedUs; + ElapsedUs += other.ElapsedUs; + CpuUs += other.CpuUs; NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; } }; diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 284392c36961..c94e73661c08 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -408,10 +408,10 @@ namespace NActors { poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange; poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; - poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu; - poolStats.MinConsumedCpuUs = stats.MinConsumedCpu; - poolStats.MaxBookedCpuUs = stats.MaxBookedCpu; - poolStats.MinBookedCpuUs = stats.MinBookedCpu; + poolStats.MaxElapsedCpuUs = stats.MaxElapsedCpu; + poolStats.MinElapsedCpuUs = stats.MinElapsedCpu; + poolStats.MaxCpuUs = stats.MaxCpu; + poolStats.MinCpuUs = stats.MinCpu; } statsCopy.resize(MaxFullThreadCount + 1); @@ -430,7 +430,7 @@ namespace NActors { void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgConsumedCpu; + poolState.UsedCpu = stats.AvgElapsedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 31592f06a6ed..954fff86d079 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -140,7 +140,7 @@ namespace NActors { void TIOExecutorPool::GetCurrentStats(TExecutorPoolStats& poolStats, TVector& statsCopy) const { poolStats.CurrentThreadCount = PoolThreads; - poolStats.DefaultThreadCount = PoolThreads; + poolStats.DefaultThreadCount = 0; poolStats.MaxThreadCount = PoolThreads; poolStats.PotentialMaxThreadCount = PoolThreads; statsCopy.resize(PoolThreads + 1); @@ -156,7 +156,7 @@ namespace NActors { void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const { if (Harmonizer) { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); - poolState.UsedCpu = stats.AvgConsumedCpu; + poolState.UsedCpu = stats.AvgElapsedCpu; } poolState.CurrentLimit = PoolThreads; poolState.MaxLimit = PoolThreads; diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index 2424c18c78be..437989ecbc4b 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -143,16 +143,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) { } void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount + 1); + statsCopy.resize(SharedThreadCount); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]); + Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]); } } void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector& statsCopy) { - statsCopy.resize(SharedThreadCount + 1); + statsCopy.resize(SharedThreadCount); for (i16 i = 0; i < SharedThreadCount; ++i) { - Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]); + Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]); } } diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index df6516421698..fd90fc6e8767 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -21,6 +21,15 @@ namespace NActors { +namespace { + constexpr bool IsDebug = false; + constexpr bool IsDebugByThread = false; + +#define DEBUG_PRINT(x) do { if constexpr (IsDebug) { (Cerr << x); } } while (0) +#define DEBUG_PRINT_BY_THREAD(x) do { if constexpr (IsDebug && IsDebugByThread) { (Cerr << x); } } while (0) + +} + LWTRACE_USING(ACTORLIB_PROVIDER); constexpr bool CheckBinaryPower(ui64 value) { @@ -164,8 +173,8 @@ struct TValueHistory { }; struct TThreadInfo { - TValueHistory<8> Consumed; - TValueHistory<8> Booked; + TValueHistory<8> Elapsed; + TValueHistory<8> Cpu; }; struct TPoolInfo { @@ -200,26 +209,26 @@ struct TPoolInfo { TAtomic DecreasingThreadsByExchange = 0; TAtomic PotentialMaxThreadCount = 0; - TValueHistory<16> Consumed; - TValueHistory<16> Booked; + TValueHistory<16> Elapsed; + TValueHistory<16> Cpu; - std::atomic MaxConsumedCpu = 0; - std::atomic MinConsumedCpu = 0; - std::atomic AvgConsumedCpu = 0; - std::atomic MaxBookedCpu = 0; - std::atomic MinBookedCpu = 0; + std::atomic MaxElapsed = 0; + std::atomic MinElapsed = 0; + std::atomic AvgElapsed = 0; + std::atomic MaxCpu = 0; + std::atomic MinCpu = 0; std::unique_ptr> WaitingStats; std::unique_ptr> MovingWaitingStats; - double GetBooked(i16 threadIdx); - double GetSharedBooked(i16 threadIdx); - double GetLastSecondBooked(i16 threadIdx); - double GetLastSecondSharedBooked(i16 threadIdx); - double GetConsumed(i16 threadIdx); - double GetSharedConsumed(i16 threadIdx); - double GetLastSecondConsumed(i16 threadIdx); - double GetLastSecondSharedConsumed(i16 threadIdx); + double GetElapsed(i16 threadIdx); + double GetSharedElapsed(i16 threadIdx); + double GetLastSecondElapsed(i16 threadIdx); + double GetLastSecondSharedElapsed(i16 threadIdx); + double GetCpu(i16 threadIdx); + double GetSharedCpu(i16 threadIdx); + double GetLastSecondCpu(i16 threadIdx); + double GetLastSecondSharedCpu(i16 threadIdx); TCpuConsumption PullStats(ui64 ts); i16 GetFullThreadCount(); float GetThreadCount(); @@ -227,73 +236,76 @@ struct TPoolInfo { bool IsAvgPingGood(); }; -double TPoolInfo::GetBooked(i16 threadIdx) { +double TPoolInfo::GetCpu(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Booked.GetAvgPart(); + return ThreadInfo[threadIdx].Cpu.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetSharedBooked(i16 threadIdx) { +double TPoolInfo::GetSharedCpu(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Booked.GetAvgPart(); + return SharedInfo[threadIdx].Cpu.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetLastSecondBooked(i16 threadIdx) { +double TPoolInfo::GetLastSecondCpu(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + return ThreadInfo[threadIdx].Cpu.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetLastSecondSharedBooked(i16 threadIdx) { +double TPoolInfo::GetLastSecondSharedCpu(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1); + return SharedInfo[threadIdx].Cpu.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetConsumed(i16 threadIdx) { +double TPoolInfo::GetElapsed(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Consumed.GetAvgPart(); + return ThreadInfo[threadIdx].Elapsed.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetSharedConsumed(i16 threadIdx) { +double TPoolInfo::GetSharedElapsed(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Consumed.GetAvgPart(); + return SharedInfo[threadIdx].Elapsed.GetAvgPart(); } return 0.0; } -double TPoolInfo::GetLastSecondConsumed(i16 threadIdx) { +double TPoolInfo::GetLastSecondElapsed(i16 threadIdx) { if ((size_t)threadIdx < ThreadInfo.size()) { - return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + return ThreadInfo[threadIdx].Elapsed.GetAvgPartForLastSeconds(1); } return 0.0; } -double TPoolInfo::GetLastSecondSharedConsumed(i16 threadIdx) { +double TPoolInfo::GetLastSecondSharedElapsed(i16 threadIdx) { if ((size_t)threadIdx < SharedInfo.size()) { - return SharedInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1); + return SharedInfo[threadIdx].Elapsed.GetAvgPartForLastSeconds(1); } return 0.0; } #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7] TCpuConsumption TPoolInfo::PullStats(ui64 ts) { + DEBUG_PRINT("TPoolInfo::PullStats " << ts << " PoolId: " << Pool->PoolId << " Name: " << Pool->GetName() << "\n"); TCpuConsumption acc; for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { + DEBUG_PRINT_BY_THREAD(" threadIdx: " << threadIdx << "\n"); TThreadInfo &threadInfo = ThreadInfo[threadIdx]; TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + DEBUG_PRINT_BY_THREAD(" CpuUs: " << cpuConsumption.CpuUs << " ElapsedUs: " << cpuConsumption.ElapsedUs << " NotEnoughCpuExecutions: " << cpuConsumption.NotEnoughCpuExecutions << "\n"); acc.Add(cpuConsumption); - threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); - threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); + threadInfo.Elapsed.Register(ts, cpuConsumption.ElapsedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Elapsed.History)); + threadInfo.Cpu.Register(ts, cpuConsumption.CpuUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Cpu.History)); } TVector sharedStats; if (Shared) { @@ -308,19 +320,21 @@ TCpuConsumption TPoolInfo::PullStats(ui64 ts) { stat.NotEnoughCpuExecutions }; acc.Add(sharedConsumption); - SharedInfo[sharedIdx].Consumed.Register(ts, sharedConsumption.ConsumedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Consumed.History)); - SharedInfo[sharedIdx].Booked.Register(ts, sharedConsumption.BookedUs); - LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Booked.History)); - } - - Consumed.Register(ts, acc.ConsumedUs); - MaxConsumedCpu.store(Consumed.GetMax() / 1'000'000, std::memory_order_relaxed); - MinConsumedCpu.store(Consumed.GetMin() / 1'000'000, std::memory_order_relaxed); - AvgConsumedCpu.store(Consumed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); - Booked.Register(ts, acc.BookedUs); - MaxBookedCpu.store(Booked.GetMax() / 1'000'000, std::memory_order_relaxed); - MinBookedCpu.store(Booked.GetMin() / 1'000'000, std::memory_order_relaxed); + SharedInfo[sharedIdx].Elapsed.Register(ts, sharedConsumption.ElapsedUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_consumed", UNROLL_HISTORY(SharedInfo[sharedIdx].Elapsed.History)); + SharedInfo[sharedIdx].Cpu.Register(ts, sharedConsumption.CpuUs); + LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "shared_booked", UNROLL_HISTORY(SharedInfo[sharedIdx].Cpu.History)); + } + + DEBUG_PRINT(" ElapsedUs: " << acc.ElapsedUs << " CpuUs: " << acc.CpuUs << "\n"); + + Elapsed.Register(ts, acc.ElapsedUs); + MaxElapsed.store(Elapsed.GetMax() / 1'000'000, std::memory_order_relaxed); + MinElapsed.store(Elapsed.GetMin() / 1'000'000, std::memory_order_relaxed); + AvgElapsed.store(Elapsed.GetAvgPart() / 1'000'000, std::memory_order_relaxed); + Cpu.Register(ts, acc.CpuUs); + MaxCpu.store(Cpu.GetMax() / 1'000'000, std::memory_order_relaxed); + MinCpu.store(Cpu.GetMin() / 1'000'000, std::memory_order_relaxed); NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions; NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions; if (WaitingStats && BasicPool) { @@ -365,13 +379,13 @@ class THarmonizer: public IHarmonizer { std::vector> Pools; std::vector PriorityOrder; - TValueHistory<16> Consumed; - TValueHistory<16> Booked; + TValueHistory<16> Elapsed; + TValueHistory<16> Cpu; - TAtomic MaxConsumedCpu = 0; - TAtomic MinConsumedCpu = 0; - TAtomic MaxBookedCpu = 0; - TAtomic MinBookedCpu = 0; + TAtomic MaxElapsedCpu = 0; + TAtomic MinElapsedCpu = 0; + TAtomic MaxCpu = 0; + TAtomic MinCpu = 0; TSharedExecutorPool* Shared = nullptr; @@ -406,32 +420,35 @@ double THarmonizer::Rescale(double value) const { } void THarmonizer::PullStats(ui64 ts) { + DEBUG_PRINT("THarmonizer::PullStats " << ts << "\n"); TCpuConsumption acc; for (auto &pool : Pools) { TCpuConsumption consumption = pool->PullStats(ts); acc.Add(consumption); } - Consumed.Register(ts, acc.ConsumedUs); - RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt()); - RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt()); - Booked.Register(ts, acc.BookedUs); - RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt()); - RelaxedStore(&MinBookedCpu, Booked.GetMinInt()); + Elapsed.Register(ts, acc.ElapsedUs); + RelaxedStore(&MaxElapsedCpu, Elapsed.GetMaxInt()); + RelaxedStore(&MinElapsedCpu, Elapsed.GetMinInt()); + Cpu.Register(ts, acc.CpuUs); + RelaxedStore(&MaxCpu, Cpu.GetMaxInt()); + RelaxedStore(&MinCpu, Cpu.GetMinInt()); } -Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; +Y_FORCE_INLINE bool IsStarved(double elapsed, double cpu) { + return std::max(elapsed, cpu) > 0.5 && elapsed > cpu + std::min(0.5, elapsed * 0.3); } -Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) { - return booked < currentThreadCount - 0.5; +Y_FORCE_INLINE bool IsHoggish(double cpu, double currentThreadCount) { + return cpu < currentThreadCount - 0.5; } void THarmonizer::HarmonizeImpl(ui64 ts) { + DEBUG_PRINT("THarmonizer::HarmonizeImpl " << ts << "\n"); bool isStarvedPresent = false; - double booked = 0.0; - double consumed = 0.0; - double lastSecondBooked = 0.0; + double cpu = 0.0; + double elapsed = 0.0; + double lastSecondCpu = 0.0; + double lastSecondElapsed = 0.0; i64 beingStopped = 0; double total = 0; TStackVec needyPools; @@ -515,49 +532,63 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + DEBUG_PRINT(" poolIdx: " << poolIdx << " PoolId: " << Pools[poolIdx]->Pool->PoolId << " Name: " << Pools[poolIdx]->Pool->GetName() << "\n"); TPoolInfo& pool = *Pools[poolIdx]; total += pool.DefaultThreadCount; i16 currentFullThreadCount = pool.GetFullThreadCount(); + DEBUG_PRINT(" currentFullThreadCount: " << currentFullThreadCount << "\n"); sumOfAdditionalThreads += Max(0, currentFullThreadCount - pool.DefaultFullThreadCount); + DEBUG_PRINT(" sumOfAdditionalThreads: " << sumOfAdditionalThreads << "\n"); float currentThreadCount = pool.GetThreadCount(); - - double poolBooked = 0.0; - double poolConsumed = 0.0; - double lastSecondPoolBooked = 0.0; - double lastSecondPoolConsumed = 0.0; + DEBUG_PRINT(" currentThreadCount: " << currentThreadCount << "\n"); + double poolCpu = 0.0; + double poolElapsed = 0.0; + double lastSecondPoolCpu = 0.0; + double lastSecondPoolElapsed = 0.0; beingStopped += pool.Pool->GetBlockingThreadCount(); - + DEBUG_PRINT(" beingStopped: " << beingStopped << "\n"); for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { - double threadBooked = Rescale(pool.GetBooked(threadIdx)); - double threadLastSecondBooked = Rescale(pool.GetLastSecondBooked(threadIdx)); - double threadConsumed = Rescale(pool.GetConsumed(threadIdx)); - double threadLastSecondConsumed = Rescale(pool.GetLastSecondConsumed(threadIdx)); - poolBooked += threadBooked; - lastSecondPoolBooked += threadLastSecondBooked; - poolConsumed += threadConsumed; - lastSecondPoolConsumed += threadLastSecondConsumed; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadBooked, threadConsumed, threadLastSecondBooked, threadLastSecondConsumed); + DEBUG_PRINT_BY_THREAD(" threadIdx: " << threadIdx << "\n"); + double threadElapsed = Rescale(pool.GetElapsed(threadIdx)); + double threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); + double threadCpu = Rescale(pool.GetCpu(threadIdx)); + double threadLastSecondCpu = Rescale(pool.GetLastSecondCpu(threadIdx)); + DEBUG_PRINT_BY_THREAD(" threadCpu: " << threadCpu << "\n"); + DEBUG_PRINT_BY_THREAD(" threadLastSecondCpu: " << threadLastSecondCpu << "\n"); + DEBUG_PRINT_BY_THREAD(" threadElapsed: " << threadElapsed << "\n"); + DEBUG_PRINT_BY_THREAD(" threadLastSecondElapsed: " << threadLastSecondElapsed << "\n"); + poolElapsed += threadElapsed; + lastSecondPoolElapsed += threadLastSecondElapsed; + poolCpu += threadCpu; + lastSecondPoolCpu += threadLastSecondCpu; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), threadIdx, threadCpu, threadElapsed, threadLastSecondCpu, threadLastSecondElapsed); } for (ui32 sharedIdx = 0; sharedIdx < pool.SharedInfo.size(); ++sharedIdx) { - double sharedBooked = Rescale(pool.GetSharedBooked(sharedIdx)); - double sharedLastSecondBooked = Rescale(pool.GetLastSecondSharedBooked(sharedIdx)); - double sharedConsumed = Rescale(pool.GetSharedConsumed(sharedIdx)); - double sharedLastSecondConsumed = Rescale(pool.GetLastSecondSharedConsumed(sharedIdx)); - poolBooked += sharedBooked; - lastSecondPoolBooked += sharedLastSecondBooked; - poolConsumed += sharedConsumed; - lastSecondPoolConsumed += sharedLastSecondConsumed; - LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedBooked, sharedConsumed, sharedLastSecondBooked, sharedLastSecondConsumed); + double sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); + double sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); + double sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); + double sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); + poolElapsed += sharedElapsed; + lastSecondPoolElapsed += sharedLastSecondElapsed; + poolCpu += sharedCpu; + lastSecondPoolCpu += sharedLastSecondCpu; + LWPROBE(HarmonizeCheckPoolByThread, poolIdx, pool.Pool->GetName(), -1 - sharedIdx, sharedCpu, sharedElapsed, sharedLastSecondCpu, sharedLastSecondElapsed); } - bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked); + DEBUG_PRINT(" poolElapsed: " << poolElapsed << "\n"); + DEBUG_PRINT(" lastSecondPoolElapsed: " << lastSecondPoolElapsed << "\n"); + DEBUG_PRINT(" poolCpu: " << poolCpu << "\n"); + DEBUG_PRINT(" lastSecondPoolCpu: " << lastSecondPoolCpu << "\n"); + + bool isStarved = IsStarved(poolElapsed, poolCpu) || IsStarved(lastSecondPoolElapsed, lastSecondPoolCpu); if (isStarved) { isStarvedPresent = true; } + DEBUG_PRINT(" isStarved: " << isStarved << "\n"); - bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolBooked >= currentThreadCount); + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (poolCpu >= currentThreadCount); if (pool.AvgPingCounter) { if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) { isNeedy = false; @@ -565,7 +596,9 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { pool.LastUpdateTs = ts; } } - if (currentThreadCount - poolBooked > 0.5) { + DEBUG_PRINT(" isNeedy: " << isNeedy << "\n"); + + if (currentThreadCount - poolCpu > 0.5) { if (hasBorrowedSharedThread[poolIdx] || hasSharedThreadWhichWasNotBorrowed[poolIdx]) { freeHalfThread.push_back(poolIdx); } @@ -574,23 +607,32 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (isNeedy) { needyPools.push_back(poolIdx); } - bool isHoggish = IsHoggish(poolBooked, currentThreadCount) - || IsHoggish(lastSecondPoolBooked, currentThreadCount); + + bool isHoggish = IsHoggish(poolElapsed, currentThreadCount) + || IsHoggish(lastSecondPoolElapsed, currentThreadCount); if (isHoggish) { - hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)}); + hoggishPools.push_back({poolIdx, std::max(currentThreadCount - poolElapsed, currentThreadCount - lastSecondPoolElapsed)}); } - booked += poolBooked; - consumed += poolConsumed; + DEBUG_PRINT(" isHoggish: " << isHoggish << "\n"); + + cpu += poolCpu; + elapsed += poolElapsed; + lastSecondCpu += lastSecondPoolCpu; + lastSecondElapsed += lastSecondPoolElapsed; AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2)); - LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); + LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolCpu, poolElapsed, lastSecondPoolCpu, lastSecondPoolElapsed, currentThreadCount, pool.MaxFullThreadCount, isStarved, isNeedy, isHoggish); } + Y_UNUSED(lastSecondElapsed); + DEBUG_PRINT(" total: " << total << " cpu: " << cpu << " lastSecondCpu: " << lastSecondCpu << "\n"); - double budget = total - Max(booked, lastSecondBooked); + double budget = total - Max(cpu, lastSecondCpu); + DEBUG_PRINT(" budget: " << budget << "\n"); i16 budgetInt = static_cast(Max(budget, 0.0)); + DEBUG_PRINT(" budgetInt: " << budgetInt << "\n"); if (budget < -0.1) { isStarvedPresent = true; } - double overbooked = consumed - booked; + double overbooked = elapsed - cpu; if (overbooked < 0) { isStarvedPresent = false; } @@ -617,6 +659,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { // last_starved_at_consumed_value = сумма по всем пулам consumed; // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, // использовать вместо total + DEBUG_PRINT(" isStarvedPresent\n"); if (beingStopped && beingStopped >= overbooked) { // do nothing } else { @@ -625,6 +668,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { i64 threadCount = pool.GetFullThreadCount(); if (hasSharedThread[poolIdx] && !hasSharedThreadWhichWasNotBorrowed[poolIdx]) { Shared->ReturnOwnHalfThread(poolIdx); + overbooked -= 0.5; } while (threadCount > pool.DefaultFullThreadCount) { pool.SetFullThreadCount(--threadCount); @@ -646,6 +690,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); + DEBUG_PRINT(" needyPoolIdx: " << needyPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " budget: " << budget << "\n"); if (budget >= 1.0) { if (threadCount + 1 <= pool.MaxFullThreadCount) { AtomicIncrement(pool.IncreasingThreadsByNeedyState); @@ -653,6 +698,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { sumOfAdditionalThreads++; pool.SetFullThreadCount(threadCount + 1); budget -= 1.0; + DEBUG_PRINT(" added 1 thread budget: " << budget << "\n"); LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } } else if (Shared && budget >= 0.5 && !hasBorrowedSharedThread[needyPoolIdx] && freeHalfThread.size()) { @@ -660,6 +706,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { freeHalfThread.pop_back(); isNeedyByPool[needyPoolIdx] = false; budget -= 0.5; + DEBUG_PRINT(" added 0.5 thread budget: " << budget << "\n"); } if constexpr (NFeatures::IsLocalQueues()) { bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxFullThreadCount; @@ -675,26 +722,33 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (budget < 1.0) { size_t takingAwayThreads = 0; + DEBUG_PRINT(" takingAwayThreads: " << takingAwayThreads << " sumOfAdditionalThreads: " << sumOfAdditionalThreads << " budget: " << budget << "\n"); for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = *Pools[needyPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount; + DEBUG_PRINT(" needyPoolIdx: " << needyPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " budget: " << budget << "\n"); + DEBUG_PRINT(" sumOfAdditionalThreads: " << sumOfAdditionalThreads << " takingAwayThreads: " << takingAwayThreads << "\n"); if (sumOfAdditionalThreads < takingAwayThreads + 1) { + DEBUG_PRINT(" sumOfAdditionalThreads < takingAwayThreads + 1\n"); break; } if (!isNeedyByPool[needyPoolIdx]) { + DEBUG_PRINT(" !isNeedyByPool[needyPoolIdx]\n"); continue; } AtomicIncrement(pool.IncreasingThreadsByExchange); isNeedyByPool[needyPoolIdx] = false; takingAwayThreads++; pool.SetFullThreadCount(threadCount + 1); - + DEBUG_PRINT(" added 1 thread by exchanging budget: " << budget << " takingAwayThreads: " << takingAwayThreads << " sumOfAdditionalThreads: " << sumOfAdditionalThreads << "\n"); LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } for (ui16 poolIdx : PriorityOrder) { + DEBUG_PRINT(" poolIdx: " << poolIdx << " takingAwayThreads: " << takingAwayThreads << " budget: " << budget << "\n"); if (takingAwayThreads <= 0) { + DEBUG_PRINT(" takingAwayThreads <= 0\n"); break; } @@ -702,14 +756,16 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { size_t threadCount = pool.GetFullThreadCount(); size_t additionalThreadsCount = Max(0L, threadCount - pool.DefaultFullThreadCount); size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads); + DEBUG_PRINT(" poolIdx: " << poolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " additionalThreadsCount: " << additionalThreadsCount << " currentTakingAwayThreads: " << currentTakingAwayThreads << "\n"); if (!currentTakingAwayThreads) { + DEBUG_PRINT(" !currentTakingAwayThreads\n"); continue; } takingAwayThreads -= currentTakingAwayThreads; pool.SetFullThreadCount(threadCount - currentTakingAwayThreads); - - AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads); + DEBUG_PRINT(" takingAwayThreads by exchanging: " << currentTakingAwayThreads << " takingAwayThreads: " << takingAwayThreads << "\n"); + AtomicAdd(pool.DecreasingThreadsByExchange, currentTakingAwayThreads); LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } } @@ -717,9 +773,11 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) { TPoolInfo &pool = *Pools[hoggishPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); + DEBUG_PRINT(" hoggishPoolIdx: " << hoggishPoolIdx << " poolId: " << pool.Pool->PoolId << " poolName: " << pool.Pool->GetName() << " threadCount: " << threadCount << " freeCpu: " << freeCpu << "\n"); if (hasBorrowedSharedThread[hoggishPoolIdx]) { Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); freeCpu -= 0.5; + DEBUG_PRINT(" return borrowed half thread freeCpu: " << freeCpu << "\n"); continue; } if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { @@ -730,12 +788,14 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { AtomicIncrement(pool.DecreasingThreadsByHoggishState); LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); pool.SetFullThreadCount(threadCount - 1); + DEBUG_PRINT(" decrease by hoggish threadCount: " << threadCount - 1 << "\n"); } } for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { TPoolInfo& pool = *Pools[poolIdx]; AtomicSet(pool.PotentialMaxThreadCount, std::min(pool.MaxThreadCount, pool.GetThreadCount() + budgetInt)); + DEBUG_PRINT(" poolIdx: " << poolIdx << " PotentialMaxThreadCount: " << pool.PotentialMaxThreadCount << " MaxThreadCount: " << pool.MaxThreadCount << " GetThreadCount: " << pool.GetThreadCount() << " budgetInt: " << budgetInt << "\n"); } } @@ -751,13 +811,18 @@ void THarmonizer::CalculatePriorityOrder() { } void THarmonizer::Harmonize(ui64 ts) { - if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) { + bool isDisabled = IsDisabled.load(std::memory_order_acquire); + ui64 nextHarmonizeTs = NextHarmonizeTs.load(std::memory_order_acquire); + if (isDisabled || nextHarmonizeTs > ts || !Lock.TryAcquire()) { + DEBUG_PRINT("TryToHarmonizeFailed: " << ts << " " << nextHarmonizeTs << " " << isDisabled << " " << false << "\n"); LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false); return; } // Check again under the lock - if (IsDisabled) { - LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true); + isDisabled = IsDisabled.load(std::memory_order_acquire); + if (isDisabled) { + DEBUG_PRINT("TryToHarmonizeFailed: " << ts << " " << nextHarmonizeTs << " " << isDisabled << " " << true << "\n"); + LWPROBE(TryToHarmonizeFailed, ts, nextHarmonizeTs, isDisabled, true); Lock.Release(); return; } @@ -769,6 +834,7 @@ void THarmonizer::Harmonize(ui64 ts) { TInternalActorTypeGuard activityGuard; if (PriorityOrder.empty()) { + DEBUG_PRINT("CalculatePriorityOrder\n"); CalculatePriorityOrder(); } @@ -797,6 +863,7 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) { poolInfo.DefaultFullThreadCount = pool->GetDefaultFullThreadCount(); poolInfo.MinFullThreadCount = pool->GetMinFullThreadCount(); poolInfo.MaxFullThreadCount = pool->GetMaxFullThreadCount(); + poolInfo.PotentialMaxThreadCount = poolInfo.MaxFullThreadCount; poolInfo.ThreadInfo.resize(poolInfo.MaxFullThreadCount); poolInfo.SharedInfo.resize(Shared ? Shared->GetSharedThreadCount() : 0); poolInfo.Priority = pool->GetPriority(); @@ -831,11 +898,11 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { .DecreasingThreadsByStarvedState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), .DecreasingThreadsByExchange = static_cast(RelaxedLoad(&pool.DecreasingThreadsByExchange)), - .MaxConsumedCpu = pool.MaxConsumedCpu.load(std::memory_order_relaxed), - .MinConsumedCpu = pool.MinConsumedCpu.load(std::memory_order_relaxed), - .AvgConsumedCpu = pool.AvgConsumedCpu.load(std::memory_order_relaxed), - .MaxBookedCpu = pool.MaxBookedCpu.load(std::memory_order_relaxed), - .MinBookedCpu = pool.MinBookedCpu.load(std::memory_order_relaxed), + .MaxElapsedCpu = pool.MaxElapsed.load(std::memory_order_relaxed), + .MinElapsedCpu = pool.MinElapsed.load(std::memory_order_relaxed), + .AvgElapsedCpu = pool.AvgElapsed.load(std::memory_order_relaxed), + .MaxCpu = pool.MaxCpu.load(std::memory_order_relaxed), + .MinCpu = pool.MinCpu.load(std::memory_order_relaxed), .PotentialMaxThreadCount = static_cast(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast(flags & 1), .IsStarved = static_cast(flags & 2), @@ -845,10 +912,10 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const { THarmonizerStats THarmonizer::GetStats() const { return THarmonizerStats{ - .MaxConsumedCpu = static_cast(RelaxedLoad(&MaxConsumedCpu)), - .MinConsumedCpu = static_cast(RelaxedLoad(&MinConsumedCpu)), - .MaxBookedCpu = static_cast(RelaxedLoad(&MaxBookedCpu)), - .MinBookedCpu = static_cast(RelaxedLoad(&MinBookedCpu)), + .MaxElapsedCpu = static_cast(RelaxedLoad(&MaxElapsedCpu)), + .MinElapsedCpu = static_cast(RelaxedLoad(&MinElapsedCpu)), + .MaxCpu = static_cast(RelaxedLoad(&MaxCpu)), + .MinCpu = static_cast(RelaxedLoad(&MinCpu)), .AvgAwakeningTimeUs = AvgAwakeningTimeUs, .AvgWakingUpTimeUs = AvgWakingUpTimeUs, }; @@ -858,4 +925,32 @@ void THarmonizer::SetSharedPool(TSharedExecutorPool* pool) { Shared = pool; } +TString TPoolHarmonizerStats::ToString() const { + return TStringBuilder() + << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << "\n" + << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << "\n" + << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << "\n" + << "DecreasingThreadsByHoggishState: " << DecreasingThreadsByHoggishState << "\n" + << "DecreasingThreadsByExchange: " << DecreasingThreadsByExchange << "\n" + << "MaxElapsedCpu: " << MaxElapsedCpu << "\n" + << "MinElapsedCpu: " << MinElapsedCpu << "\n" + << "AvgElapsedCpu: " << AvgElapsedCpu << "\n" + << "MaxCpu: " << MaxCpu << "\n" + << "MinCpu: " << MinCpu << "\n" + << "PotentialMaxThreadCount: " << PotentialMaxThreadCount << "\n" + << "IsNeedy: " << IsNeedy << "\n" + << "IsStarved: " << IsStarved << "\n" + << "IsHoggish: " << IsHoggish << "\n"; +} + +TString THarmonizerStats::ToString() const { + return TStringBuilder() + << "MaxElapsedCpu: " << MaxElapsedCpu << "\n" + << "MinElapsedCpu: " << MinElapsedCpu << "\n" + << "MaxCpu: " << MaxCpu << "\n" + << "MinCpu: " << MinCpu << "\n" + << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << "\n" + << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << "\n"; } + +} // namespace NActors \ No newline at end of file diff --git a/ydb/library/actors/core/harmonizer.h b/ydb/library/actors/core/harmonizer.h index b4323ef8de13..37daf851d7e2 100644 --- a/ydb/library/actors/core/harmonizer.h +++ b/ydb/library/actors/core/harmonizer.h @@ -17,25 +17,29 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - float MaxConsumedCpu = 0.0; - float MinConsumedCpu = 0.0; - float AvgConsumedCpu = 0.0; - float MaxBookedCpu = 0.0; - float MinBookedCpu = 0.0; + float MaxElapsedCpu = 0.0; + float MinElapsedCpu = 0.0; + float AvgElapsedCpu = 0.0; + float MaxCpu = 0.0; + float MinCpu = 0.0; i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; + + TString ToString() const; }; struct THarmonizerStats { - i64 MaxConsumedCpu = 0.0; - i64 MinConsumedCpu = 0.0; - i64 MaxBookedCpu = 0.0; - i64 MinBookedCpu = 0.0; + i64 MaxElapsedCpu = 0.0; + i64 MinElapsedCpu = 0.0; + i64 MaxCpu = 0.0; + i64 MinCpu = 0.0; double AvgAwakeningTimeUs = 0; double AvgWakingUpTimeUs = 0; + + TString ToString() const; }; // Pool cpu harmonizer diff --git a/ydb/library/actors/core/harmonizer_ut.cpp b/ydb/library/actors/core/harmonizer_ut.cpp new file mode 100644 index 000000000000..4149c1584afb --- /dev/null +++ b/ydb/library/actors/core/harmonizer_ut.cpp @@ -0,0 +1,360 @@ +#include "harmonizer.h" +#include +#include + +using namespace NActors; + + +/* + Сценарии тестов без полупотоков: + - IncreaseThreadsByNeedyState/DecreaseThreadsByHoggishState + - DecreaseThreadsByStarvedState + - IncreaseThreadsByExchange/DecreaseThreadsByExchange +*/ + +#define CHECK_CHANGING_THREADS(stats, inc_needy, inc_exchange, dec_hoggish, dec_starved, dec_exchange) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByNeedyState, inc_needy, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IncreasingThreadsByExchange, inc_exchange, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByHoggishState, dec_hoggish, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByStarvedState, dec_starved, (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).DecreasingThreadsByExchange, dec_exchange, (stats).ToString()); +// end CHECK_CHANGING_THREADS + +#define CHECK_STATE(stats, state) \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsNeedy, TString(state) == "needy", (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsHoggish, TString(state) == "hoggish", (stats).ToString()); \ + UNIT_ASSERT_VALUES_EQUAL_C((stats).IsStarved, TString(state) == "starved", (stats).ToString()); +// end CHECK_STATE + + +Y_UNIT_TEST_SUITE(HarmonizerTests) { + + struct TMockExecutorPoolParams { + i16 DefaultFullThreadCount = 4; + i16 MinFullThreadCount = 4; + i16 MaxFullThreadCount = 8; + float DefaultThreadCount = 4.0f; + float MinThreadCount = 4.0f; + float MaxThreadCount = 8.0f; + i16 Priority = 0; + TString Name = "MockPool"; + ui32 PoolId = 0; + }; + + struct TCpuConsumptionModel { + TCpuConsumption value; + TCpuConsumptionModel() : value() {} + TCpuConsumptionModel(const TCpuConsumption& other) : value(other) {} + operator TCpuConsumption() const { + return value; + } + void Increase(const TCpuConsumption& other) { + value.ElapsedUs += other.ElapsedUs; + value.CpuUs += other.CpuUs; + value.NotEnoughCpuExecutions += other.NotEnoughCpuExecutions; + } + }; + + class TMockExecutorPool : public IExecutorPool { + public: + TMockExecutorPool(const TMockExecutorPoolParams& params = TMockExecutorPoolParams()) + : IExecutorPool(params.PoolId) + , Params(params) + , ThreadCount(params.DefaultFullThreadCount) + , ThreadCpuConsumptions(params.MaxFullThreadCount, TCpuConsumption{0.0, 0.0}) + {} + + TMockExecutorPoolParams Params; + i16 ThreadCount = 0; + std::vector ThreadCpuConsumptions; + + i16 GetDefaultFullThreadCount() const override { return Params.DefaultFullThreadCount; } + i16 GetMinFullThreadCount() const override { return Params.MinFullThreadCount; } + i16 GetMaxFullThreadCount() const override { return Params.MaxFullThreadCount; } + void SetFullThreadCount(i16 count) override { ThreadCount = count; } + i16 GetFullThreadCount() const override { return ThreadCount; } + float GetDefaultThreadCount() const override { return Params.DefaultThreadCount; } + float GetMinThreadCount() const override { return Params.MinThreadCount; } + float GetMaxThreadCount() const override { return Params.MaxThreadCount; } + i16 GetPriority() const override { return Params.Priority; } + TString GetName() const override { return Params.Name; } + + // Дополнительные методы из IExecutorPool + void Prepare(TActorSystem* /*actorSystem*/, NSchedulerQueue::TReader** /*scheduleReaders*/, ui32* /*scheduleSz*/) override {} + void Start() override {} + void PrepareStop() override {} + void Shutdown() override {} + bool Cleanup() override { return true; } + + ui32 GetReadyActivation(TWorkerContext& /*wctx*/, ui64 /*revolvingCounter*/) override { return 0; } + void ReclaimMailbox(TMailboxType::EType /*mailboxType*/, ui32 /*hint*/, TWorkerId /*workerId*/, ui64 /*revolvingCounter*/) override {} + TMailboxHeader* ResolveMailbox(ui32 /*hint*/) override { return nullptr; } + + void Schedule(TInstant /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + void Schedule(TMonotonic /*deadline*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + void Schedule(TDuration /*delta*/, TAutoPtr /*ev*/, ISchedulerCookie* /*cookie*/, TWorkerId /*workerId*/) override {} + + bool Send(TAutoPtr& /*ev*/) override { return true; } + bool SpecificSend(TAutoPtr& /*ev*/) override { return true; } + void ScheduleActivation(ui32 /*activation*/) override {} + void SpecificScheduleActivation(ui32 /*activation*/) override {} + void ScheduleActivationEx(ui32 /*activation*/, ui64 /*revolvingCounter*/) override {} + TActorId Register(IActor* /*actor*/, TMailboxType::EType /*mailboxType*/, ui64 /*revolvingCounter*/, const TActorId& /*parentId*/) override { return TActorId(); } + TActorId Register(IActor* /*actor*/, TMailboxHeader* /*mailbox*/, ui32 /*hint*/, const TActorId& /*parentId*/) override { return TActorId(); } + + TAffinity* Affinity() const override { return nullptr; } + + ui32 GetThreads() const override { return static_cast(ThreadCount); } + float GetThreadCount() const override { return static_cast(ThreadCount); } + + void IncreaseThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { + if (count == -1) { + count = Params.MaxFullThreadCount - start; + } + for (i16 i = start; i < start + count; ++i) { + ThreadCpuConsumptions[i].Increase(consumption); + } + } + + void SetThreadCpuConsumption(TCpuConsumption consumption, i16 start = 0, i16 count = -1) { + if (count == -1) { + count = Params.MaxFullThreadCount - start; + } + for (i16 i = start; i < start + count; ++i) { + ThreadCpuConsumptions[i] = consumption; + } + } + + TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override { + UNIT_ASSERT_GE(threadIdx, 0); + UNIT_ASSERT_LE(static_cast(threadIdx), ThreadCpuConsumptions.size()); + return ThreadCpuConsumptions[threadIdx]; + } + }; + + Y_UNIT_TEST(TestHarmonizerCreation) { + ui64 currentTs = 1000000; + std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); + UNIT_ASSERT(harmonizer != nullptr); + } + + Y_UNIT_TEST(TestAddPool) { + ui64 currentTs = 1000000; + std::unique_ptr harmonizer(MakeHarmonizer(currentTs)); + auto mockPool = std::make_unique(); + harmonizer->AddPool(mockPool.get()); + + auto stats = harmonizer->GetPoolStats(0); + UNIT_ASSERT_VALUES_EQUAL(stats.PotentialMaxThreadCount, 8); + UNIT_ASSERT_VALUES_EQUAL(stats.IncreasingThreadsByNeedyState, 0); + UNIT_ASSERT_VALUES_EQUAL(stats.DecreasingThreadsByStarvedState, 0); + } + + Y_UNIT_TEST(TestHarmonize) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + auto mockPool = new TMockExecutorPool(); + harmonizer->AddPool(mockPool); + + harmonizer->Harmonize(currentTs + 1000000); // 1 second later + + auto stats = harmonizer->GetPoolStats(0); + Y_UNUSED(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should start with default + + delete harmonizer; + delete mockPool; + } + + Y_UNIT_TEST(TestToNeedyNextToHoggish) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params; + std::vector> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + TCpuConsumptionModel cpuConsumptionModel; + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + mockPools[0]->SetThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); + + currentTs += Us2Ts(59'000'000); + harmonizer->Harmonize(currentTs); + + auto stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); + CHECK_STATE(stats, "needy"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->SetThreadCpuConsumption({0.0, 0.0}, 0, params.DefaultFullThreadCount); + harmonizer->Harmonize(currentTs); + + stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 1, 0, 0); + CHECK_STATE(stats, "hoggish"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + } + + Y_UNIT_TEST(TestToNeedyNextToStarved) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params; + std::vector> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + TCpuConsumptionModel cpuConsumptionModel; + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + mockPools[0]->IncreaseThreadCpuConsumption({59'000'000.0, 59'000'000.0}, 0, params.DefaultFullThreadCount); + + currentTs += Us2Ts(59'000'000); + harmonizer->Harmonize(currentTs); + + auto stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 0, 0); + CHECK_STATE(stats, "needy"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 5); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 43'000'000.0}, 0, 5); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 4); + harmonizer->Harmonize(currentTs); + + stats = harmonizer->GetPoolStats(0); + + CHECK_CHANGING_THREADS(stats, 1, 0, 0, 1, 0); + CHECK_STATE(stats, "starved"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 4); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 4); + } + + Y_UNIT_TEST(TestExchangeThreads) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + TMockExecutorPoolParams params { + .DefaultFullThreadCount = 1, + .MinFullThreadCount = 1, + .MaxFullThreadCount = 2, + .DefaultThreadCount = 1.0f, + .MinThreadCount = 1.0f, + .MaxThreadCount = 2.0f, + }; + std::vector> mockPools; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 1; + mockPools.emplace_back(new TMockExecutorPool(params)); + params.PoolId = 2; + mockPools.emplace_back(new TMockExecutorPool(params)); + for (auto& pool : mockPools) { + harmonizer->AddPool(pool.get()); + pool->SetThreadCpuConsumption(TCpuConsumption{0.0, 0.0}, 0, params.MaxFullThreadCount); + } + + currentTs += Us2Ts(1'000'000); + harmonizer->Harmonize(currentTs); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + mockPools[2]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + harmonizer->Harmonize(currentTs); + + auto stats0 = harmonizer->GetPoolStats(0); + auto stats1 = harmonizer->GetPoolStats(1); + auto stats2 = harmonizer->GetPoolStats(2); + + CHECK_CHANGING_THREADS(stats0, 0, 0, 0, 0, 0); + CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 0); + CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); + CHECK_STATE(stats0, "hoggish"); + CHECK_STATE(stats1, "needy"); + CHECK_STATE(stats2, "needy"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 1); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); + + currentTs += Us2Ts(60'000'000); + mockPools[0]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 1); + mockPools[1]->IncreaseThreadCpuConsumption({60'000'000.0, 60'000'000.0}, 0, 2); + mockPools[2]->IncreaseThreadCpuConsumption({0.0, 0.0}, 0, 1); + harmonizer->Harmonize(currentTs); + + stats0 = harmonizer->GetPoolStats(0); + stats1 = harmonizer->GetPoolStats(1); + stats2 = harmonizer->GetPoolStats(2); + + CHECK_CHANGING_THREADS(stats0, 0, 1, 0, 0, 0); + CHECK_CHANGING_THREADS(stats1, 1, 0, 0, 0, 1); + CHECK_CHANGING_THREADS(stats2, 0, 0, 0, 0, 0); + CHECK_STATE(stats0, "needy"); + CHECK_STATE(stats1, "needy"); + CHECK_STATE(stats2, "hoggish"); + UNIT_ASSERT_VALUES_EQUAL(mockPools[0]->ThreadCount, 2); + UNIT_ASSERT_VALUES_EQUAL(mockPools[1]->ThreadCount, 1); + UNIT_ASSERT_VALUES_EQUAL(mockPools[2]->ThreadCount, 1); + } + + Y_UNIT_TEST(TestEnableDisable) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + auto mockPool = new TMockExecutorPool(); + harmonizer->AddPool(mockPool); + + harmonizer->Enable(false); // Disable harmonizer + harmonizer->Harmonize(currentTs + 1000000); + + auto stats = harmonizer->GetPoolStats(0); + Y_UNUSED(stats); + UNIT_ASSERT_VALUES_EQUAL(mockPool->ThreadCount, 4); // Should not change when disabled + + harmonizer->Enable(true); // Enable harmonizer + harmonizer->Harmonize(currentTs + 2000000); + + stats = harmonizer->GetPoolStats(0); + // Now it might change, but we can't predict exactly how without more complex mocking + + delete harmonizer; + delete mockPool; + } + + Y_UNIT_TEST(TestDeclareEmergency) { + ui64 currentTs = 1000000; + auto harmonizer = MakeHarmonizer(currentTs); + auto mockPool = new TMockExecutorPool(); + harmonizer->AddPool(mockPool); + + ui64 emergencyTs = currentTs + 500000; + harmonizer->DeclareEmergency(emergencyTs); + harmonizer->Harmonize(emergencyTs); + + // We can't easily test the internal state, but we can verify that Harmonize was called + // by checking if any stats have changed + auto stats = harmonizer->GetPoolStats(0); + Y_UNUSED(stats); + // Add appropriate assertions based on expected behavior during emergency + + delete harmonizer; + delete mockPool; + } +} diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 4bceffa8f373..f058df6e77ad 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -63,10 +63,10 @@ namespace NActors { ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; ui64 DecreasingThreadsByExchange = 0; - i64 MaxConsumedCpuUs = 0; - i64 MinConsumedCpuUs = 0; - i64 MaxBookedCpuUs = 0; - i64 MinBookedCpuUs = 0; + i64 MaxElapsedCpuUs = 0; + i64 MinElapsedCpuUs = 0; + i64 MaxCpuUs = 0; + i64 MinCpuUs = 0; double SpinningTimeUs = 0; double SpinThresholdUs = 0; i16 WrongWakenedThreadCount = 0; diff --git a/ydb/library/actors/core/probes.h b/ydb/library/actors/core/probes.h index f9394f3273f6..bfe0c86e9510 100644 --- a/ydb/library/actors/core/probes.h +++ b/ydb/library/actors/core/probes.h @@ -174,11 +174,11 @@ NAMES("poolId", "pool", "threacCount", "minThreadCount", "maxThreadCount", "defaultThreadCount")) \ PROBE(HarmonizeCheckPool, GROUPS("Harmonizer"), \ TYPES(ui32, TString, double, double, double, double, ui32, ui32, bool, bool, bool), \ - NAMES("poolId", "pool", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed", "threadCount", "maxThreadCount", \ + NAMES("poolId", "pool", "cpu", "elapsed", "lastSecondCpu", "lastSecondElapsed", "threadCount", "maxThreadCount", \ "isStarved", "isNeedy", "isHoggish")) \ PROBE(HarmonizeCheckPoolByThread, GROUPS("Harmonizer"), \ TYPES(ui32, TString, i16, double, double, double, double), \ - NAMES("poolId", "pool", "threadIdx", "booked", "consumed", "lastSecondBooked", "lastSecondConsumed")) \ + NAMES("poolId", "pool", "threadIdx", "cpu", "elapsed", "lastSecondCpu", "lastSecondElapsed")) \ PROBE(WakingUpConsumption, GROUPS("Harmonizer"), \ TYPES(double, double, double, double, double), \ NAMES("avgWakingUpUs", "realAvgWakingUpUs", "avgAwakeningUs", "realAvgAwakeningUs", "total")) \ diff --git a/ydb/library/actors/core/ut/ya.make b/ydb/library/actors/core/ut/ya.make index e1a225e52871..88c75df35049 100644 --- a/ydb/library/actors/core/ut/ya.make +++ b/ydb/library/actors/core/ut/ya.make @@ -33,6 +33,7 @@ SRCS( event_pb_payload_ut.cpp event_pb_ut.cpp executor_pool_basic_ut.cpp + harmonizer_ut.cpp log_ut.cpp mon_ut.cpp scheduler_actor_ut.cpp diff --git a/ydb/library/actors/helpers/pool_stats_collector.h b/ydb/library/actors/helpers/pool_stats_collector.h index fb4c3d9286b8..32b6767fcc70 100644 --- a/ydb/library/actors/helpers/pool_stats_collector.h +++ b/ydb/library/actors/helpers/pool_stats_collector.h @@ -189,10 +189,10 @@ class TStatsCollectingActor : public TActorBootstrapped { NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; - NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinCpu; NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; @@ -264,10 +264,10 @@ class TStatsCollectingActor : public TActorBootstrapped { DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); - MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false); - MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false); - MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false); - MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false); + MaxElapsedCpu = PoolGroup->GetCounter("MaxElapsedCpuByPool", false); + MinElapsedCpu = PoolGroup->GetCounter("MinElapsedCpuByPool", false); + MaxCpu = PoolGroup->GetCounter("MaxCpuByPool", false); + MinCpu = PoolGroup->GetCounter("MinCpuByPool", false); SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); @@ -348,6 +348,11 @@ class TStatsCollectingActor : public TActorBootstrapped { EventProcessingCountHistogram->Reset(); EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); + *MaxElapsedCpu = poolStats.MaxElapsedCpuUs; + *MinElapsedCpu = poolStats.MinElapsedCpuUs; + *MaxCpu = poolStats.MaxCpuUs; + *MinCpu = poolStats.MinCpuUs; + double toMicrosec = 1000000 / NHPTimer::GetClockRate(); LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); EventProcessingTimeHistogram->Reset(); @@ -383,10 +388,10 @@ class TStatsCollectingActor : public TActorBootstrapped { struct TActorSystemCounters { TIntrusivePtr Group; - NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu; - NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpu; + NMonitoring::TDynamicCounters::TCounterPtr MaxCpu; + NMonitoring::TDynamicCounters::TCounterPtr MinCpu; NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeUs; NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeUs; @@ -395,20 +400,20 @@ class TStatsCollectingActor : public TActorBootstrapped { void Init(NMonitoring::TDynamicCounters* group) { Group = group; - MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false); - MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false); - MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false); - MinBookedCpu = Group->GetCounter("MinBookedCpu", false); + MaxElapsedCpu = Group->GetCounter("MaxElapsedCpu", false); + MinElapsedCpu = Group->GetCounter("MinElapsedCpu", false); + MaxCpu = Group->GetCounter("MaxCpu", false); + MinCpu = Group->GetCounter("MinCpu", false); AvgAwakeningTimeUs = Group->GetCounter("AvgAwakeningTimeUs", false); AvgWakingUpTimeUs = Group->GetCounter("AvgWakingUpTimeUs", false); } void Set(const THarmonizerStats& harmonizerStats) { #ifdef ACTORSLIB_COLLECT_EXEC_STATS - *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu; - *MinConsumedCpu = harmonizerStats.MinConsumedCpu; - *MaxBookedCpu = harmonizerStats.MaxBookedCpu; - *MinBookedCpu = harmonizerStats.MinBookedCpu; + *MaxElapsedCpu = harmonizerStats.MaxElapsedCpu; + *MinElapsedCpu = harmonizerStats.MinElapsedCpu; + *MaxCpu = harmonizerStats.MaxCpu; + *MinCpu = harmonizerStats.MinCpu; *AvgAwakeningTimeUs = harmonizerStats.AvgAwakeningTimeUs; *AvgWakingUpTimeUs = harmonizerStats.AvgWakingUpTimeUs;