Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull out elapsed metrics even if actor still work #4245

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,23 @@ namespace NActors {
TWorkerId workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);

TlsThreadContext->Timers.Reset();

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart);
}

if (workerId >= 0) {
Threads[workerId].UnsetWork();
} else {
Y_ABORT_UNLESS(wctx.SharedThread);
wctx.SharedThread->UnsetWork();
}

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
}

TAtomic x = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
while (!StopFlag.load(std::memory_order_acquire)) {
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart);
return 0;
}

Expand All @@ -203,13 +199,6 @@ namespace NActors {
wctx.SharedThread->SetWork();
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed);
if (TlsThreadContext->Timers.Parked > 0) {
wctx.AddParkedCycles(TlsThreadContext->Timers.Parked);
}

return activation;
}
semaphore.CurrentSleepThreadCount++;
Expand Down
26 changes: 11 additions & 15 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,27 @@ namespace NActors {
i16 workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);

NHPTimer::STime elapsed = 0;
NHPTimer::STime parked = 0;
NHPTimer::STime hpstart = GetCycleCountFast();
NHPTimer::STime hpnow;

const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
TExecutorThreadCtx& threadCtx = Threads[workerId];
ThreadQueue.Push(workerId + 1, revolvingCounter);
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;
hpstart = GetCycleCountFast();
parked += hpstart - hpnow;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}

while (!StopFlag.load(std::memory_order_acquire)) {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
wctx.AddElapsedCycles(ActorSystemIndex, elapsed);
if (parked > 0) {
wctx.AddParkedCycles(parked);
}
return activation;
}
SpinLockPause();
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
, PoolCount(poolCount)
, SharedThreadCount(poolsWithThreads.size())
, Threads(new TSharedExecutorThreadCtx[SharedThreadCount])
, Timers(new TTimers[SharedThreadCount])
, TimePerMailbox(config.TimePerMailbox)
, EventsPerMailbox(config.EventsPerMailbox)
, SoftProcessingDurationTs(config.SoftProcessingDurationTs)
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace NActors {
i16 PoolCount;
i16 SharedThreadCount;
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
std::unique_ptr<TTimers[]> Timers;

std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
Expand Down
52 changes: 42 additions & 10 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace NActors {
, ThreadName(threadName)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
{
Ctx.Switch(
ExecutorPool,
Expand Down Expand Up @@ -75,6 +76,7 @@ namespace NActors {
, EventsPerMailbox(eventsPerMailbox)
, SoftProcessingDurationTs(softProcessingDurationTs)
, SharedStats(poolCount)
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
{
Ctx.Switch(
ExecutorPool,
Expand Down Expand Up @@ -189,7 +191,6 @@ namespace NActors {
Ctx.HPStart = GetCycleCountFast();
Ctx.ExecutedEvents = 0;
}
NHPTimer::STime hpprev = Ctx.HPStart;

IActor* actor = nullptr;
const std::type_info* actorType = nullptr;
Expand All @@ -198,10 +199,14 @@ namespace NActors {
bool firstEvent = true;
bool preempted = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
hpprev = Ctx.HPStart;

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
mailbox->ProcessEvents(mailbox);
NHPTimer::STime hpnow;
recipient = evExt->GetRecipientRewrite();
TActorContext ctx(*mailbox, *this, hpprev, recipient);
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
Expand Down Expand Up @@ -239,9 +244,14 @@ namespace NActors {
if (activityType != prevActivityType) {
prevActivityType = activityType;
NProfiling::TMemoryTagScope::Reset(activityType);
TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release);
}

actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();

Expand All @@ -256,7 +266,6 @@ namespace NActors {
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
reclaimAsFree = true;

hpnow = GetCycleCountFast();
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
Expand All @@ -277,10 +286,10 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}

hpprev = hpnow;

if (TlsThreadContext->CapturedType == ESendingType::Tail) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Ctx.IncrementMailboxPushedOutByTailSending();
Expand Down Expand Up @@ -360,6 +369,7 @@ namespace NActors {
break; // empty queue, leave
}
}
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

NProfiling::TMemoryTagScope::Reset(0);
TlsActivationContext = nullptr;
Expand Down Expand Up @@ -495,8 +505,11 @@ namespace NActors {
ThreadDisableBalloc();
#endif

TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
}
Expand Down Expand Up @@ -529,8 +542,11 @@ namespace NActors {
ThreadDisableBalloc();
#endif

TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
}
Expand All @@ -551,7 +567,7 @@ namespace NActors {
}

if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
TlsThreadContext->Timers.Reset();
ThreadCtx->UnsetWork();
ThreadCtx->Wait(0, &StopFlag);
}

Expand Down Expand Up @@ -760,10 +776,26 @@ namespace NActors {
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
Ctx.GetCurrentStats(statsCopy);
}

void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
statsCopy = TExecutorThreadStats();
statsCopy.Aggregate(SharedStats[poolId]);
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "event.h"
#include "callstack.h"
#include "probes.h"
#include "thread_context.h"
#include "worker_context.h"
#include "log_settings.h"

Expand Down Expand Up @@ -92,7 +93,8 @@ namespace NActors {
ui64 CurrentActorScheduledEventsCounter = 0;

// Thread-specific
TWorkerContext Ctx;
mutable TThreadContext TlsThreadCtx;
mutable TWorkerContext Ctx;
ui64 RevolvingReadCounter = 0;
ui64 RevolvingWriteCounter = 0;
const TString ThreadName;
Expand All @@ -104,6 +106,7 @@ namespace NActors {
ui64 SoftProcessingDurationTs;

std::vector<TExecutorThreadStats> SharedStats;
const ui32 ActorSystemIndex;
};

class TExecutorThread: public TGenericExecutorThread {
Expand Down
14 changes: 9 additions & 5 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "defs.h"
#include "thread_context.h"
#include "worker_context.h"

#include <ydb/library/actors/util/datetime.h>
#include <ydb/library/actors/util/threadparkpad.h>
Expand Down Expand Up @@ -95,16 +96,19 @@ namespace NActors {
return false;
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
if (WaitingPad.Park()) // interrupted
return true;
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}
Expand Down
23 changes: 6 additions & 17 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,11 @@
namespace NActors {

class IExecutorPool;
struct TWorkerContext;

template <typename T>
struct TWaitingStats;

struct TTimers {
NHPTimer::STime Elapsed = 0;
NHPTimer::STime Parked = 0;
NHPTimer::STime Blocked = 0;
NHPTimer::STime HPStart = GetCycleCountFast();
NHPTimer::STime HPNow;

void Reset() {
Elapsed = 0;
Parked = 0;
Blocked = 0;
HPStart = GetCycleCountFast();
HPNow = HPStart;
}
};

struct TThreadContext {
IExecutorPool *Pool = nullptr;
ui32 CapturedActivation = 0;
Expand All @@ -42,8 +27,12 @@ namespace NActors {
ui16 LocalQueueSize = 0;
TWaitingStats<ui64> *WaitingStats = nullptr;
bool IsCurrentRecipientAService = false;
TTimers Timers;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<ui64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;
};

extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
Expand Down
Loading