Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace NActors {
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue)
: TExecutorPoolBaseMailboxed(poolId)
, PoolThreads(threads)
, UseRingQueue(useRingQueue)
, UseRingQueueValue(useRingQueue)
, ThreadsAffinity(affinity)
{
if (useRingQueue) {
Expand Down Expand Up @@ -147,7 +147,7 @@ namespace NActors {
}

void TExecutorPoolBase::ScheduleActivation(TMailbox* mailbox) {
if (UseRingQueue) {
if (UseRingQueue()) {
ScheduleActivationEx(mailbox, 0);
} else {
ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter));
Expand All @@ -169,9 +169,12 @@ namespace NActors {
if (NFeatures::IsCommon() && IsAllowedToCapture(this) || IsTailSend(this)) {
mailbox = TlsThreadContext->CaptureMailbox(mailbox);
}
if (mailbox && UseRingQueue) {
if (!mailbox) {
return;
}
if (UseRingQueueValue) {
ScheduleActivationEx(mailbox, 0);
} else if (mailbox) {
} else {
ScheduleActivationEx(mailbox, AtomicIncrement(ActivationsRevolvingCounter));
}
}
Expand Down Expand Up @@ -302,4 +305,8 @@ namespace NActors {
TMailboxTable* TExecutorPoolBaseMailboxed::GetMailboxTable() const {
return MailboxTable;
}

bool TExecutorPoolBase::UseRingQueue() const {
return UseRingQueueValue;
}
}
9 changes: 5 additions & 4 deletions ydb/library/actors/core/executor_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ namespace NActors {
using TUnorderedCacheActivationQueue = TUnorderedCache<ui32, 512, 4>;

const i16 PoolThreads;
const bool UseRingQueue;
TIntrusivePtr<TAffinity> ThreadsAffinity;
TAtomic Semaphore = 0;
std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
const bool UseRingQueueValue;
alignas(64) TIntrusivePtr<TAffinity> ThreadsAffinity;
alignas(64) TAtomic Semaphore = 0;
alignas(64) std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
TAtomic ActivationsRevolvingCounter = 0;
std::atomic_bool StopFlag = false;
public:
Expand All @@ -67,6 +67,7 @@ namespace NActors {
void SpecificScheduleActivation(TMailbox* mailbox) override;
TAffinity* Affinity() const override;
ui32 GetThreads() const override;
bool UseRingQueue() const;
};

void DoActorInit(TActorSystem*, IActor*, const TActorId&, const TActorId&);
Expand Down
94 changes: 84 additions & 10 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,63 @@ namespace NActors {
return nullptr;
}

TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue(ui64 revolvingCounter) {
if (StopFlag.load(std::memory_order_acquire)) {
return nullptr;
}

TWorkerId workerId = TlsThreadContext->WorkerId();
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
NHPTimer::STime hpnow = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false> activityGuard(hpnow);

Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);

Threads[workerId].UnsetWork();
if (Harmonizer) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "try to harmonize");
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(hpnow);
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "harmonize done");
}

do {
{
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation found");
Threads[workerId].SetWork();
AtomicDecrement(Semaphore);
return MailboxTable->Get(activation);
}
}

TAtomic semaphoreRaw = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(semaphoreRaw);
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0");
if (!TlsThreadContext->ExecutionContext.IsNeededToWaitNextActivation) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "wctx.ExecutionContext.IsNeededToWaitNextActivation == false");
return nullptr;
}

bool needToWait = false;
bool needToBlock = false;
AskToGoToSleep(&needToWait, &needToBlock);
if (needToWait) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "go to sleep");
if (Threads[workerId].Wait(SpinThresholdCycles, &StopFlag)) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "sleep interrupted");
return nullptr;
}
}
}
SpinLockPause();
} while (!StopFlag.load(std::memory_order_acquire));

return nullptr;
}

TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue(ui64 revolvingCounter) {
TWorkerId workerId = TlsThreadContext->WorkerId();
Y_DEBUG_ABORT_UNLESS(workerId < static_cast<i32>(MaxFullThreadCount));
Expand All @@ -278,13 +335,19 @@ namespace NActors {
TlsThreadContext->LocalQueueContext.LocalQueueSize = LocalQueueSize.load(std::memory_order_relaxed);
}
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue done; moving to common");
if (TlsThreadContext->UseRingQueue()) {
return GetReadyActivationRingQueue(revolvingCounter);
}
return GetReadyActivationCommon(revolvingCounter);
}

TMailbox* TBasicExecutorPool::GetReadyActivation(ui64 revolvingCounter) {
if (MaxLocalQueueSize) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue");
return GetReadyActivationLocalQueue(revolvingCounter);
} else if (TlsThreadContext->UseRingQueue()) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "ring queue");
return GetReadyActivationRingQueue(revolvingCounter);
} else {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
return GetReadyActivationCommon(revolvingCounter);
Expand All @@ -305,37 +368,48 @@ namespace NActors {
}
}

void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, TAtomic x) {
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount);
std::visit([mailbox, revolvingCounter](auto &x) {
x.Push(mailbox->Hint, revolvingCounter);
void TBasicExecutorPool::ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingCounter, std::optional<TAtomic> initSemaphore) {
std::visit([mailbox, revolvingCounter](auto &queue) {
queue.Push(mailbox->Hint, revolvingCounter);
}, Activations);
bool needToWakeUp = false;
bool needToChangeOldSemaphore = true;

if (SharedPool) {
TAtomic x;
TSemaphore semaphore;
if (!initSemaphore || SharedPool) {
x = AtomicIncrement(Semaphore);
needToChangeOldSemaphore = false;
semaphore = TSemaphore::GetSemaphore(x);
} else {
x = *initSemaphore;
semaphore = TSemaphore::GetSemaphore(x);
}
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "semaphore.OldSemaphore == ", semaphore.OldSemaphore, " semaphore.CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount);
if (SharedPool) {
if (SharedPool->WakeUpLocalThreads(PoolId)) {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads");
return;
}
semaphore = TSemaphore::GetSemaphore(x);
}

i16 sleepThreads = 0;
Y_UNUSED(sleepThreads);
do {
needToWakeUp = semaphore.CurrentSleepThreadCount > 0;
i64 oldX = semaphore.ConvertToI64();
bool changed = false;
if (needToChangeOldSemaphore) {
semaphore.OldSemaphore++;
changed = true;
}
if (needToWakeUp) {
sleepThreads = semaphore.CurrentSleepThreadCount--;
changed = true;
}
if (changed) {
x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX);
}
x = AtomicGetAndCas(&Semaphore, semaphore.ConvertToI64(), oldX);
if (x == oldX) {
break;
}
Expand Down Expand Up @@ -383,14 +457,14 @@ namespace NActors {
return;
}
}
ScheduleActivationExCommon(mailbox, revolvingWriteCounter, AtomicGet(Semaphore));
ScheduleActivationExCommon(mailbox, revolvingWriteCounter, std::nullopt);
}

void TBasicExecutorPool::ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingCounter) {
if (MaxLocalQueueSize) {
ScheduleActivationExLocalQueue(mailbox, revolvingCounter);
} else {
ScheduleActivationExCommon(mailbox, revolvingCounter, AtomicGet(Semaphore));
ScheduleActivationExCommon(mailbox, revolvingCounter, std::nullopt);
}
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,15 @@ namespace NActors {
TMailbox* GetReadyActivation(ui64 revolvingReadCounter) override;
TMailbox* GetReadyActivationCommon(ui64 revolvingReadCounter);
TMailbox* GetReadyActivationShared(ui64 revolvingReadCounter);
TMailbox* GetReadyActivationRingQueue(ui64 revolvingReadCounter);
TMailbox* GetReadyActivationLocalQueue(ui64 revolvingReadCounter);

void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;
void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) override;

void ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingWriteCounter) override;
void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, TAtomic semaphoreValue);
void ScheduleActivationExCommon(TMailbox* mailbox, ui64 revolvingWriteCounter, std::optional<TAtomic> semaphoreValue);
void ScheduleActivationExLocalQueue(TMailbox* mailbox, ui64 revolvingWriteCounter);

void SetLocalQueueSize(ui16 size);
Expand Down
6 changes: 3 additions & 3 deletions ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ namespace NActors {
ui64 CurrentActorScheduledEventsCounter = 0;

// Thread-specific
mutable TThreadContext ThreadCtx;
mutable TExecutionStats ExecutionStats;
ui64 RevolvingReadCounter = 0;
alignas(64) mutable TThreadContext ThreadCtx;
alignas(64) mutable TExecutionStats ExecutionStats;
alignas(64) ui64 RevolvingReadCounter = 0;
ui64 RevolvingWriteCounter = 0;
const TString ThreadName;
volatile TThreadId ThreadId = UnknownThreadId;
Expand Down
17 changes: 17 additions & 0 deletions ydb/library/actors/core/thread_context.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
#include "thread_context.h"

#include "executor_pool.h"
#include "executor_pool_base.h"

namespace NActors {

bool UseRingQueue(IExecutorPool* pool) {
if (auto* basePool = dynamic_cast<TExecutorPoolBase*>(pool)) {
return basePool->UseRingQueue();
}
return false;
}

TWorkerContext::TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool)
: WorkerId(workerId)
, Pool(pool)
, OwnerPool(pool)
, SharedPool(sharedPool)
, UseRingQueueValue(::NActors::UseRingQueue(pool))
{
AssignPool(pool);
}
Expand Down Expand Up @@ -37,6 +46,10 @@ namespace NActors {
return SharedPool != nullptr;
}

bool TWorkerContext::UseRingQueue() const {
return UseRingQueueValue;
}

void TWorkerContext::AssignPool(IExecutorPool* pool, ui64 softDeadlineTs) {
Pool = pool;
TimePerMailboxTs = pool ? pool->TimePerMailboxTs() : TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX.SecondsFloat() * NHPTimer::GetClockRate();
Expand Down Expand Up @@ -78,6 +91,10 @@ namespace NActors {
return WorkerContext.IsShared();
}

bool TThreadContext::UseRingQueue() const {
return WorkerContext.UseRingQueue();
}

ui64 TThreadContext::TimePerMailboxTs() const {
return WorkerContext.TimePerMailboxTs;
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ namespace NActors {
ui64 TimePerMailboxTs = 0;
ui32 EventsPerMailbox = 0;
ui64 SoftDeadlineTs = ui64(-1);
bool UseRingQueueValue = false;

TWorkerContext(TWorkerId workerId, IExecutorPool* pool, IExecutorPool* sharedPool);

ui32 PoolId() const;
TString PoolName() const;
ui32 OwnerPoolId() const;
bool IsShared() const;

bool UseRingQueue() const;
void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = -1);
void FreeMailbox(TMailbox* mailbox);
};
Expand Down Expand Up @@ -118,7 +119,7 @@ namespace NActors {
ui32 EventsPerMailbox() const;
ui64 SoftDeadlineTs() const;
void FreeMailbox(TMailbox* mailbox);

bool UseRingQueue() const;
void AssignPool(IExecutorPool* pool, ui64 softDeadlineTs = Max<ui64>());

bool CheckSendingType(ESendingType type) const;
Expand Down
Loading