diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index e9287e2059d9..f8feb8f4fbb1 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -487,7 +487,7 @@ namespace NActors { ExecutorPool->ScheduleActivation(capturedActivation); } if (!activation) { - return {IsSharedThread, wasWorking}; + break; } executeActivation(activation, false); } diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index 706235b68647..df6516421698 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -424,7 +424,7 @@ Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { } Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) { - return booked < currentThreadCount - 1; + return booked < currentThreadCount - 0.5; } void THarmonizer::HarmonizeImpl(ui64 ts) { @@ -435,7 +435,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { i64 beingStopped = 0; double total = 0; TStackVec needyPools; - TStackVec hoggishPools; + TStackVec, 8> hoggishPools; TStackVec isNeedyByPool; size_t sumOfAdditionalThreads = 0; @@ -577,7 +577,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { bool isHoggish = IsHoggish(poolBooked, currentThreadCount) || IsHoggish(lastSecondPoolBooked, currentThreadCount); if (isHoggish) { - hoggishPools.push_back(poolIdx); + hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)}); } booked += poolBooked; consumed += poolConsumed; @@ -714,18 +714,19 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { } } - for (size_t hoggishPoolIdx : hoggishPools) { + for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) { TPoolInfo &pool = *Pools[hoggishPoolIdx]; i64 threadCount = pool.GetFullThreadCount(); if (hasBorrowedSharedThread[hoggishPoolIdx]) { Shared->ReturnBorrowedHalfThread(hoggishPoolIdx); + freeCpu -= 0.5; continue; } if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) { pool.LocalQueueSize = std::min(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2); pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize); } - if (threadCount > pool.MinFullThreadCount) { + if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) { AtomicIncrement(pool.DecreasingThreadsByHoggishState); LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); pool.SetFullThreadCount(threadCount - 1);