Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/library/actors/core/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ namespace NActors {
return NHPTimer::GetSeconds(GetCurrentEventTicks());
}

void TActivationContext::EnableMailboxStats() {
TlsActivationContext->Mailbox.EnableStats();
}

TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "actorsystem.h"
#include "event.h"
#include "executor_thread.h"
#include "mailbox.h"
#include "monotonic.h"
#include "thread_context.h"

Expand Down Expand Up @@ -130,6 +131,8 @@ namespace NActors {

static i64 GetCurrentEventTicks();
static double GetCurrentEventTicksAsSeconds();

static void EnableMailboxStats();
};

struct TActorContext: public TActivationContext {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ namespace NActors {

Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
mailbox->AddElapsedCycles(elapsed);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
Expand Down Expand Up @@ -372,7 +373,7 @@ namespace NActors {
break; // empty queue, leave
}
}
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

NProfiling::TMemoryTagScope::Reset(0);
Expand Down
24 changes: 21 additions & 3 deletions ydb/library/actors/core/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ namespace NActors {
CleanupActors();
}

bool TMailboxHeader::CleanupActors() {
bool TMailboxHeader::CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo) {
bool done = true;
switch (ActorPack) {
switch (actorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId != 0) {
delete ActorsInfo.Simple.Actor;
Expand All @@ -399,13 +399,31 @@ namespace NActors {
done = false;
break;
}
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
ActorPack = TMailboxActorPack::Simple;
actorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple.ActorId = 0;
ActorsInfo.Simple.Actor = nullptr;
return done;
}

bool TMailboxHeader::CleanupActors() {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
bool done = CleanupActors(pack, ActorsInfo);
ActorPack = pack;
return done;
} else {
bool done = CleanupActors(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo);
delete ActorsInfo.Complex;
ActorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple.ActorId = 0;
ActorsInfo.Simple.Actor = nullptr;
return done;
}
}

std::pair<ui32, ui32> TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
switch (Type) {
case TMailboxType::Simple:
Expand Down
119 changes: 100 additions & 19 deletions ydb/library/actors/core/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "executor_pool.h"
#include "mailbox_queue_simple.h"
#include "mailbox_queue_revolving.h"
#include <functional>
#include <ydb/library/actors/util/unordered_cache.h>
#include <library/cpp/threading/queue/mpsc_htswap.h>
#include <library/cpp/threading/queue/mpsc_read_as_filled.h>
Expand All @@ -27,6 +28,10 @@ namespace NActors {

struct TMailboxHeader;

struct TMailboxStats {
ui64 ElapsedCycles = 0;
};

template<bool>
struct TMailboxUsageImpl {
void Push(ui64 /*localId*/) {}
Expand All @@ -53,7 +58,8 @@ namespace NActors {
enum EType {
Simple = 0,
Array = 1,
Map = 2
Map = 2,
Complex = 3,
};
};

Expand All @@ -79,7 +85,7 @@ namespace NActors {
volatile ui32 ExecutionState;
ui32 Reserved : 4; // never changes, always zero
ui32 Type : 4; // never changes
ui32 ActorPack : 2;
TMailboxActorPack::EType ActorPack : 2;
ui32 Knobs : 22;

struct TActorPair {
Expand All @@ -91,6 +97,8 @@ namespace NActors {
TActorPair Actors[ARRAY_CAPACITY];
};

struct alignas(64) TComplexActorInfo;

union TActorsInfo {
TActorPair Simple;
struct {
Expand All @@ -100,11 +108,19 @@ namespace NActors {
struct {
TActorMap* ActorsMap;
} Map;
TComplexActorInfo* Complex;
} ActorsInfo;

struct alignas(64) TComplexActorInfo{
TActorsInfo ActorsInfo;
TMailboxActorPack::EType ActorPack;
TMailboxStats Stats;
};

TMailboxHeader(TMailboxType::EType type);
~TMailboxHeader();

static bool CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo);
bool CleanupActors();

// this interface is used exclusively by executor thread, so implementation is there
Expand All @@ -119,12 +135,13 @@ namespace NActors {
bool UnlockAsFree(bool wouldReschedule); // preceed with releasing lock, but mark as free one

bool IsEmpty() const noexcept {
return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0);
return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0) ||
(ActorPack == TMailboxActorPack::Complex && ActorsInfo.Complex->ActorPack == TMailboxActorPack::Simple && ActorsInfo.Complex->ActorsInfo.Simple.ActorId == 0);
}

template<typename T>
void ForEach(T&& callback) noexcept {
switch (ActorPack) {
static void ForEach(TMailboxActorPack::EType actorPack, TActorsInfo &ActorsInfo, T&& callback) noexcept {
switch (actorPack) {
case TMailboxActorPack::Simple:
if (ActorsInfo.Simple.ActorId) {
callback(ActorsInfo.Simple.ActorId, ActorsInfo.Simple.Actor);
Expand All @@ -143,10 +160,22 @@ namespace NActors {
callback(row.ActorId, row.Actor);
}
break;

case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
}

IActor* FindActor(ui64 localActorId) noexcept {
template<typename T>
void ForEach(T&& callback) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
ForEach(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, std::move(callback));
} else {
ForEach(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, std::move(callback));
}
}

static IActor* FindActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
switch (ActorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId == localActorId)
Expand All @@ -167,14 +196,22 @@ namespace NActors {
}
break;
}
default:
Y_ABORT();
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
return nullptr;
}

void AttachActor(ui64 localActorId, IActor* actor) noexcept {
switch (ActorPack) {
IActor* FindActor(ui64 localActorId) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
return FindActor(static_cast<TMailboxActorPack::EType>(ActorPack), ActorsInfo, localActorId);
} else {
return FindActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
}
}

static void AttachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId, IActor* actor) noexcept {
switch (actorPack) {
case TMailboxActorPack::Simple: {
if (ActorsInfo.Simple.ActorId == 0) {
ActorsInfo.Simple.ActorId = localActorId;
Expand All @@ -185,7 +222,7 @@ namespace NActors {
ar->Actors[0] = ActorsInfo.Simple;
ar->Actors[1] = TActorPair{actor, localActorId};
ActorsInfo.Array.ActorsCount = 2;
ActorPack = TMailboxActorPack::Array;
actorPack = TMailboxActorPack::Array;
ActorsInfo.Array.ActorsArray = ar;
}
break;
Expand All @@ -201,7 +238,7 @@ namespace NActors {
mp->emplace(ActorsInfo.Array.ActorsArray->Actors[i].ActorId, ActorsInfo.Array.ActorsArray->Actors[i].Actor);
}
mp->emplace(localActorId, actor);
ActorPack = TMailboxActorPack::Map;
actorPack = TMailboxActorPack::Map;
ActorsInfo.Array.ActorsCount = 0;
delete ActorsInfo.Array.ActorsArray;
ActorsInfo.Map.ActorsMap = mp;
Expand All @@ -210,17 +247,27 @@ namespace NActors {
}
break;
}
default:
Y_ABORT();
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}
}

IActor* DetachActor(ui64 localActorId) noexcept {
Y_DEBUG_ABORT_UNLESS(FindActor(localActorId) != nullptr);
void AttachActor(ui64 localActorId, IActor* actor) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
AttachActor(pack, ActorsInfo, localActorId, actor);
ActorPack = pack;
} else {
AttachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId, actor);
}
}

static IActor* DetachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept {
Y_DEBUG_ABORT_UNLESS(FindActor(actorPack, ActorsInfo, localActorId) != nullptr);

IActor* actorToDestruct = nullptr;

switch (ActorPack) {
switch (actorPack) {
case TMailboxActorPack::Simple: {
Y_ABORT_UNLESS(ActorsInfo.Simple.ActorId == localActorId);
actorToDestruct = ActorsInfo.Simple.Actor;
Expand All @@ -243,7 +290,7 @@ namespace NActors {
ar->Actors[i++] = TActorPair{actor, actorId};
}
delete ActorsInfo.Map.ActorsMap;
ActorPack = TMailboxActorPack::Array;
actorPack = TMailboxActorPack::Array;
ActorsInfo.Array.ActorsArray = ar;
ActorsInfo.Array.ActorsCount = ARRAY_CAPACITY;
}
Expand All @@ -265,16 +312,50 @@ namespace NActors {
if (ActorsInfo.Array.ActorsCount == 1) {
const TActorPair Actor = ActorsInfo.Array.ActorsArray->Actors[0];
delete ActorsInfo.Array.ActorsArray;
ActorPack = TMailboxActorPack::Simple;
actorPack = TMailboxActorPack::Simple;
ActorsInfo.Simple = Actor;
}
break;
}
case TMailboxActorPack::Complex:
Y_ABORT("Unexpected ActorPack type");
}

return actorToDestruct;
}

IActor* DetachActor(ui64 localActorId) noexcept {
if (ActorPack != TMailboxActorPack::Complex) {
TMailboxActorPack::EType pack = ActorPack;
IActor* result = DetachActor(pack, ActorsInfo, localActorId);
ActorPack = pack;
return result;
} else {
return DetachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId);
}
}

void EnableStats() {
TComplexActorInfo* complex = new TComplexActorInfo;
complex->ActorPack = ActorPack;
complex->ActorsInfo = std::move(ActorsInfo);
ActorPack = TMailboxActorPack::Complex;
ActorsInfo.Complex = complex;
}

void AddElapsedCycles(ui64 elapsed) {
if (ActorPack == TMailboxActorPack::Complex) {
ActorsInfo.Complex->Stats.ElapsedCycles += elapsed;
}
}

std::optional<ui64> GetElapsedCycles() {
if (ActorPack == TMailboxActorPack::Complex) {
return ActorsInfo.Complex->Stats.ElapsedCycles;
}
return std::nullopt;
}

std::pair<ui32, ui32> CountMailboxEvents(ui64 localActorId, ui32 maxTraverse);
};

Expand Down