@@ -264,6 +264,63 @@ namespace NActors {
264264 return nullptr ;
265265 }
266266
267+ TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue (ui64 revolvingCounter) {
268+ if (StopFlag.load (std::memory_order_acquire)) {
269+ return nullptr ;
270+ }
271+
272+ TWorkerId workerId = TlsThreadContext->WorkerId ();
273+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " " );
274+ NHPTimer::STime hpnow = GetCycleCountFast ();
275+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false > activityGuard (hpnow);
276+
277+ Y_DEBUG_ABORT_UNLESS (workerId < MaxFullThreadCount);
278+
279+ Threads[workerId].UnsetWork ();
280+ if (Harmonizer) {
281+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " try to harmonize" );
282+ LWPROBE (TryToHarmonize, PoolId, PoolName);
283+ Harmonizer->Harmonize (hpnow);
284+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " harmonize done" );
285+ }
286+
287+ do {
288+ {
289+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false > activityGuard;
290+ if (const ui32 activation = std::visit ([&revolvingCounter](auto &x) {return x.Pop (++revolvingCounter);}, Activations)) {
291+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " activation found" );
292+ Threads[workerId].SetWork ();
293+ AtomicDecrement (Semaphore);
294+ return MailboxTable->Get (activation);
295+ }
296+ }
297+
298+ TAtomic semaphoreRaw = AtomicGet (Semaphore);
299+ TSemaphore semaphore = TSemaphore::GetSemaphore (semaphoreRaw);
300+ if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0 ) {
301+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0" );
302+ if (!TlsThreadContext->ExecutionContext .IsNeededToWaitNextActivation ) {
303+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " wctx.ExecutionContext.IsNeededToWaitNextActivation == false" );
304+ return nullptr ;
305+ }
306+
307+ bool needToWait = false ;
308+ bool needToBlock = false ;
309+ AskToGoToSleep (&needToWait, &needToBlock);
310+ if (needToWait) {
311+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " go to sleep" );
312+ if (Threads[workerId].Wait (SpinThresholdCycles, &StopFlag)) {
313+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " sleep interrupted" );
314+ return nullptr ;
315+ }
316+ }
317+ }
318+ SpinLockPause ();
319+ } while (!StopFlag.load (std::memory_order_acquire));
320+
321+ return nullptr ;
322+ }
323+
267324 TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue (ui64 revolvingCounter) {
268325 TWorkerId workerId = TlsThreadContext->WorkerId ();
269326 Y_DEBUG_ABORT_UNLESS (workerId < static_cast <i32 >(MaxFullThreadCount));
@@ -278,13 +335,19 @@ namespace NActors {
278335 TlsThreadContext->LocalQueueContext .LocalQueueSize = LocalQueueSize.load (std::memory_order_relaxed);
279336 }
280337 EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " local queue done; moving to common" );
338+ if (TlsThreadContext->UseRingQueue ()) {
339+ return GetReadyActivationRingQueue (revolvingCounter);
340+ }
281341 return GetReadyActivationCommon (revolvingCounter);
282342 }
283343
284344 TMailbox* TBasicExecutorPool::GetReadyActivation (ui64 revolvingCounter) {
285345 if (MaxLocalQueueSize) {
286346 EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " local queue" );
287347 return GetReadyActivationLocalQueue (revolvingCounter);
348+ } else if (TlsThreadContext->UseRingQueue ()) {
349+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " ring queue" );
350+ return GetReadyActivationRingQueue (revolvingCounter);
288351 } else {
289352 EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " " );
290353 return GetReadyActivationCommon (revolvingCounter);
@@ -305,37 +368,48 @@ namespace NActors {
305368 }
306369 }
307370
308- void TBasicExecutorPool::ScheduleActivationExCommon (TMailbox* mailbox, ui64 revolvingCounter, TAtomic x) {
309- TSemaphore semaphore = TSemaphore::GetSemaphore (x);
310- EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == " , semaphore.OldSemaphore , " semaphore.CurrentSleepThreadCount == " , semaphore.CurrentSleepThreadCount );
311- std::visit ([mailbox, revolvingCounter](auto &x) {
312- x.Push (mailbox->Hint , revolvingCounter);
371+ void TBasicExecutorPool::ScheduleActivationExCommon (TMailbox* mailbox, ui64 revolvingCounter, std::optional<TAtomic> initSemaphore) {
372+ std::visit ([mailbox, revolvingCounter](auto &queue) {
373+ queue.Push (mailbox->Hint , revolvingCounter);
313374 }, Activations);
314375 bool needToWakeUp = false ;
315376 bool needToChangeOldSemaphore = true ;
316377
317- if (SharedPool) {
378+ TAtomic x;
379+ TSemaphore semaphore;
380+ if (!initSemaphore || SharedPool) {
318381 x = AtomicIncrement (Semaphore);
319382 needToChangeOldSemaphore = false ;
383+ semaphore = TSemaphore::GetSemaphore (x);
384+ } else {
385+ x = *initSemaphore;
386+ semaphore = TSemaphore::GetSemaphore (x);
387+ }
388+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == " , semaphore.OldSemaphore , " semaphore.CurrentSleepThreadCount == " , semaphore.CurrentSleepThreadCount );
389+ if (SharedPool) {
320390 if (SharedPool->WakeUpLocalThreads (PoolId)) {
321391 EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " shared pool wake up local threads" );
322392 return ;
323393 }
324- semaphore = TSemaphore::GetSemaphore (x);
325394 }
326395
327396 i16 sleepThreads = 0 ;
328397 Y_UNUSED (sleepThreads);
329398 do {
330399 needToWakeUp = semaphore.CurrentSleepThreadCount > 0 ;
331400 i64 oldX = semaphore.ConvertToI64 ();
401+ bool changed = false ;
332402 if (needToChangeOldSemaphore) {
333403 semaphore.OldSemaphore ++;
404+ changed = true ;
334405 }
335406 if (needToWakeUp) {
336407 sleepThreads = semaphore.CurrentSleepThreadCount --;
408+ changed = true ;
409+ }
410+ if (changed) {
411+ x = AtomicGetAndCas (&Semaphore, semaphore.ConvertToI64 (), oldX);
337412 }
338- x = AtomicGetAndCas (&Semaphore, semaphore.ConvertToI64 (), oldX);
339413 if (x == oldX) {
340414 break ;
341415 }
@@ -383,14 +457,14 @@ namespace NActors {
383457 return ;
384458 }
385459 }
386- ScheduleActivationExCommon (mailbox, revolvingWriteCounter, AtomicGet (Semaphore) );
460+ ScheduleActivationExCommon (mailbox, revolvingWriteCounter, std:: nullopt );
387461 }
388462
389463 void TBasicExecutorPool::ScheduleActivationEx (TMailbox* mailbox, ui64 revolvingCounter) {
390464 if (MaxLocalQueueSize) {
391465 ScheduleActivationExLocalQueue (mailbox, revolvingCounter);
392466 } else {
393- ScheduleActivationExCommon (mailbox, revolvingCounter, AtomicGet (Semaphore) );
467+ ScheduleActivationExCommon (mailbox, revolvingCounter, std:: nullopt );
394468 }
395469 }
396470
0 commit comments