Skip to content

Commit d8ee822

Browse files
committed
Improve ring queue perfomance
1 parent 716d24a commit d8ee822

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,63 @@ namespace NActors {
264264
return nullptr;
265265
}
266266

267+
TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue(ui64 revolvingCounter) {
268+
if (StopFlag.load(std::memory_order_acquire)) {
269+
return nullptr;
270+
}
271+
272+
TWorkerId workerId = TlsThreadContext->WorkerId();
273+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
274+
NHPTimer::STime hpnow = GetCycleCountFast();
275+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false> activityGuard(hpnow);
276+
277+
Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);
278+
279+
Threads[workerId].UnsetWork();
280+
if (Harmonizer) {
281+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "try to harmonize");
282+
LWPROBE(TryToHarmonize, PoolId, PoolName);
283+
Harmonizer->Harmonize(hpnow);
284+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "harmonize done");
285+
}
286+
287+
do {
288+
{
289+
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
290+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
291+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation found");
292+
Threads[workerId].SetWork();
293+
AtomicDecrement(Semaphore);
294+
return MailboxTable->Get(activation);
295+
}
296+
}
297+
298+
TAtomic semaphoreRaw = AtomicGet(Semaphore);
299+
TSemaphore semaphore = TSemaphore::GetSemaphore(semaphoreRaw);
300+
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
301+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0");
302+
if (!TlsThreadContext->ExecutionContext.IsNeededToWaitNextActivation) {
303+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "wctx.ExecutionContext.IsNeededToWaitNextActivation == false");
304+
return nullptr;
305+
}
306+
307+
bool needToWait = false;
308+
bool needToBlock = false;
309+
AskToGoToSleep(&needToWait, &needToBlock);
310+
if (needToWait) {
311+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "go to sleep");
312+
if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) {
313+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "sleep interrupted");
314+
return nullptr;
315+
}
316+
}
317+
}
318+
SpinLockPause();
319+
} while (!StopFlag.load(std::memory_order_acquire));
320+
321+
return nullptr;
322+
}
323+
267324
TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue(ui64 revolvingCounter) {
268325
TWorkerId workerId = TlsThreadContext->WorkerId();
269326
Y_DEBUG_ABORT_UNLESS(workerId < static_cast<i32>(MaxFullThreadCount));
@@ -278,13 +335,19 @@ namespace NActors {
278335
TlsThreadContext->LocalQueueContext.LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed);
279336
}
280337
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue done; moving to common");
338+
if (TlsThreadContext->UseRingQueue()) {
339+
return GetReadyActivationRingQueue(revolvingCounter);
340+
}
281341
return GetReadyActivationCommon(revolvingCounter);
282342
}
283343

284344
TMailbox* TBasicExecutorPool::GetReadyActivation(ui64 revolvingCounter) {
285345
if (MaxLocalQueueSize) {
286346
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue");
287347
return GetReadyActivationLocalQueue(revolvingCounter);
348+
} else if (TlsThreadContext->UseRingQueue()) {
349+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "ring queue");
350+
return GetReadyActivationRingQueue(revolvingCounter);
288351
} else {
289352
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
290353
return GetReadyActivationCommon(revolvingCounter);

ydb/library/actors/core/executor_pool_basic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ namespace NActors {
234234
TMailbox* GetReadyActivation(ui64 revolvingReadCounter) override;
235235
TMailbox* GetReadyActivationCommon(ui64 revolvingReadCounter);
236236
TMailbox* GetReadyActivationShared(ui64 revolvingReadCounter);
237+
TMailbox* GetReadyActivationRingQueue(ui64 revolvingReadCounter);
237238
TMailbox* GetReadyActivationLocalQueue(ui64 revolvingReadCounter);
238239

239240
void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;

0 commit comments

Comments
 (0)