From adc972c6d52e6ac05947a51dbcf16980aaba89d6 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Fri, 2 Aug 2024 09:15:36 +0300 Subject: [PATCH 1/5] Throttle compute-intensive tasks (#4891) --- .../kqp/compute_actor/kqp_compute_actor.cpp | 4 +- .../kqp/compute_actor/kqp_compute_actor.h | 5 +- .../kqp_compute_actor_factory.cpp | 10 +- .../compute_actor/kqp_compute_actor_factory.h | 5 +- .../compute_actor/kqp_pure_compute_actor.cpp | 11 +- .../compute_actor/kqp_pure_compute_actor.h | 13 +- .../compute_actor/kqp_scan_compute_actor.cpp | 6 +- .../compute_actor/kqp_scan_compute_actor.h | 8 +- ydb/core/kqp/counters/kqp_counters.cpp | 8 +- ydb/core/kqp/counters/kqp_counters.h | 8 + ydb/core/kqp/executer_actor/kqp_planner.cpp | 2 + .../kqp/node_service/kqp_node_service.cpp | 96 +++- .../kqp/runtime/kqp_compute_scheduler.cpp | 442 ++++++++++++++++++ ydb/core/kqp/runtime/kqp_compute_scheduler.h | 320 +++++++++++++ ydb/core/kqp/runtime/kqp_read_actor.cpp | 1 + ydb/core/kqp/runtime/ya.make | 1 + ydb/core/protos/kqp.proto | 1 + ydb/core/protos/table_service_config.proto | 14 + 18 files changed, 922 insertions(+), 33 deletions(-) create mode 100644 ydb/core/kqp/runtime/kqp_compute_scheduler.cpp create mode 100644 ydb/core/kqp/runtime/kqp_compute_scheduler.h diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 4f9932e580f7..17db4f9fe87e 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -134,8 +134,8 @@ using namespace NYql::NDqProto; IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, - TIntrusivePtr arena) { - return new NScanPrivate::TKqpScanComputeActor(executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory), + TIntrusivePtr arena, TComputeActorSchedulingOptions schedulingOptions) { + return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory), settings, memoryLimits, std::move(traceId), std::move(arena)); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index f298353c50a0..3cf258b3e64e 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions); IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, - TIntrusivePtr arena); + TIntrusivePtr arena, TComputeActorSchedulingOptions); IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 7a77406889ee..83297b69f206 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -104,7 +104,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { ApplyConfig(config); } - void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) + void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) override { MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit()); MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit()); @@ -114,7 +114,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { MinMemFreeSize.store(config.GetMinMemFreeSize()); } - TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) { + TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override { NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load(); @@ -213,7 +213,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits, - std::move(args.TraceId), std::move(args.Arena)); + std::move(args.TraceId), std::move(args.Arena), + std::move(args.SchedulingOptions)); TActorId result = TlsActivationContext->Register(computeActor); info.MutableActorIds().emplace_back(result); return result; @@ -223,7 +224,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { GUCSettings = std::make_shared(args.SerializedGUCSettings); } IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, - runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings); + runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings, + std::move(args.SchedulingOptions)); return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index 8f87b0c67787..c2a3325853ce 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -6,6 +6,8 @@ #include #include +#include + #include namespace NKikimr::NKqp { @@ -124,6 +126,7 @@ struct IKqpNodeComputeActorFactory { const TMaybe& RlPath; TComputeStagesWithScan* ComputesByStages = nullptr; std::shared_ptr State = nullptr; + TComputeActorSchedulingOptions SchedulingOptions = {}; }; typedef std::variant TActorStartResult; @@ -137,4 +140,4 @@ std::shared_ptr MakeKqpCaFactory(const NKikimrConfi NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const std::optional federatedQuerySetup); -} // namespace NKikimr::NKqp::NComputeActor \ No newline at end of file +} // namespace NKikimr::NKqp::NComputeActor diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index ac8f34e1979b..2dbe94b66b2e 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) - : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions) + : TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) { @@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() { ContinueExecute(); Become(&TKqpComputeActor::StateFunc); + + TBase::DoBoostrap(); } STFUNC(TKqpComputeActor::StateFunc) { + CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite()); try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute); @@ -278,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings) + const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions) { return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings); + settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions)); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index 590a9bcab774..613f5e2786c6 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -8,17 +8,16 @@ #include #include #include +#include #include #include -#include - namespace NKikimr { namespace NKqp { -class TKqpComputeActor : public TDqSyncComputeActorBase { - using TBase = TDqSyncComputeActorBase; +class TKqpComputeActor : public TSchedulableComputeActorBase { + using TBase = TSchedulableComputeActorBase; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase { IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + TComputeActorSchedulingOptions); void DoBootstrap(); @@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + TComputeActorSchedulingOptions); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 88ddaebdabe2..8947e2740030 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50); } // anonymous namespace -TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, +TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena) - : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, + : TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena)) , ComputeCtx(settings.StatsMode) , LockTxId(lockTxId) @@ -251,6 +251,8 @@ void TKqpScanComputeActor::DoBootstrap() { ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); Become(&TKqpScanComputeActor::StateFunc); + + TBase::DoBoostrap(); } } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index d1bba8c37989..7dbb9f4f8252 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -2,15 +2,15 @@ #include "kqp_scan_events.h" #include -#include +#include #include #include namespace NKikimr::NKqp::NScanPrivate { -class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase { +class TKqpScanComputeActor: public TSchedulableComputeActorBase { private: - using TBase = NYql::NDq::TDqSyncComputeActorBase; + using TBase = TSchedulableComputeActorBase; NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; @@ -65,7 +65,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase arena); diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 70e5169c4546..4c39ee6d89d6 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -8,7 +8,6 @@ #include #include - #include #include @@ -829,6 +828,13 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co "PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1)); FullScansExecuted = KqpGroup->GetCounter("FullScans", true); + + SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true); + SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity"); + ComputeActorExecutions = KqpGroup->GetHistogram("NodeScheduler/BatchUs", NMonitoring::ExponentialHistogram(20, 2, 1)); + ComputeActorDelays = KqpGroup->GetHistogram("NodeScheduler/Delays", NMonitoring::ExponentialHistogram(20, 2, 1)); + ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true); + SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1)); } ::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index f302897f1b8d..7d6f38d05fe9 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -409,6 +409,14 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; + // Scheduler signals + ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled; + ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity; + NMonitoring::THistogramPtr ComputeActorExecutions; + NMonitoring::THistogramPtr ComputeActorDelays; + ::NMonitoring::TDynamicCounters::TCounterPtr ThrottledActorsSpuriousActivations; + NMonitoring::THistogramPtr SchedulerDelays; + // Sequences counters ::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount; ::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 60b168150bd9..5e5a16efe76c 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -223,6 +223,8 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetSerializedGUCSettings(SerializedGUCSettings); } + request.SetSchedulerGroup(UserRequestContext->PoolId); + return result; } diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index a2c6e982596a..8895c2c1f15c 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -80,6 +81,10 @@ class TKqpNodeService : public TActorBootstrapped { if (config.HasIteratorReadQuotaSettings()) { SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings()); } + if (config.HasPoolsConfiguration()) { + SetPriorities(config.GetPoolsConfiguration()); + } + Scheduler.ReportCounters(counters); } void Bootstrap() { @@ -98,10 +103,16 @@ class TKqpNodeService : public TActorBootstrapped { TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); } - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag)); + Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); Become(&TKqpNodeService::WorkState); } + enum { + WakeCleaunupTag, + WakeAdvanceTimeTag + }; + private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { @@ -115,6 +126,8 @@ class TKqpNodeService : public TActorBootstrapped { hFunc(TEvents::TEvUndelivered, HandleWork); hFunc(TEvents::TEvPoison, HandleWork); hFunc(NMon::TEvHttpInfo, HandleWork); + // sheduling + hFunc(TEvSchedulerDeregister, HandleWork); default: { Y_ABORT("Unexpected event 0x%x for TKqpResourceManagerService", ev->GetTypeRewrite()); } @@ -123,6 +136,12 @@ class TKqpNodeService : public TActorBootstrapped { static constexpr double SecToUsec = 1e6; + void HandleWork(TEvSchedulerDeregister::TPtr& ev) { + if (ev->Get()->SchedulerEntity) { + Scheduler.Deregister(*ev->Get()->SchedulerEntity, TMonotonic::Now()); + } + } + void HandleWork(TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) { NWilson::TSpan sendTasksSpan(TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId(ev->TraceId), "KqpNode.SendTasks", NWilson::EFlags::AUTO_END); @@ -179,6 +198,8 @@ class TKqpNodeService : public TActorBootstrapped { const TString& serializedGUCSettings = ev->Get()->Record.HasSerializedGUCSettings() ? ev->Get()->Record.GetSerializedGUCSettings() : ""; + auto schedulerNow = TMonotonic::Now(); + // start compute actors TMaybe rlPath = Nothing(); if (msgRtSettings.HasRlPath()) { @@ -190,6 +211,24 @@ class TKqpNodeService : public TActorBootstrapped { const ui32 tasksCount = msg.GetTasks().size(); for (auto& dqTask: *msg.MutableTasks()) { + TString group = msg.GetSchedulerGroup(); + + TComputeActorSchedulingOptions schedulingOptions { + .Now = schedulerNow, + .NodeService = SelfId(), + .Scheduler = &Scheduler, + .Group = group, + .Weight = 1, + .NoThrottle = false, + .Counters = Counters + }; + + if (Scheduler.Disabled(schedulingOptions.Group)) { + schedulingOptions.NoThrottle = true; + } else { + schedulingOptions.Handle = Scheduler.Enroll(schedulingOptions.Group, schedulingOptions.Weight, schedulingOptions.Now); + } + auto result = CaFactory_->CreateKqpComputeActor({ .ExecuterId = request.Executer, .TxId = txId, @@ -210,7 +249,8 @@ class TKqpNodeService : public TActorBootstrapped { .ShareMailbox = false, .RlPath = rlPath, .ComputesByStages = &computesByStage, - .State = State_ + .State = State_, + .SchedulingOptions = std::move(schedulingOptions) }); if (const auto* rmResult = std::get_if(&result)) { @@ -303,11 +343,17 @@ class TKqpNodeService : public TActorBootstrapped { } void HandleWork(TEvents::TEvWakeup::TPtr& ev) { - Schedule(TDuration::Seconds(1), ev->Release().Release()); - for (auto& bucket : State_->Buckets) { - auto expiredRequests = bucket.ClearExpiredRequests(); - for (auto& cxt : expiredRequests) { - TerminateTx(cxt.TxId, "reached execution deadline", NYql::NDqProto::StatusIds::TIMEOUT); + if (ev->Get()->Tag == WakeAdvanceTimeTag) { + Scheduler.AdvanceTime(TMonotonic::Now()); + Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + } + if (ev->Get()->Tag == WakeCleaunupTag) { + Schedule(TDuration::Seconds(1), ev->Release().Release()); + for (auto& bucket : State_->Buckets) { + auto expiredRequests = bucket.ClearExpiredRequests(); + for (auto& cxt : expiredRequests) { + TerminateTx(cxt.TxId, "reached execution deadline", NYql::NDqProto::StatusIds::TIMEOUT); + } } } } @@ -350,6 +396,10 @@ class TKqpNodeService : public TActorBootstrapped { SetIteratorReadsQuotaSettings(event.GetConfig().GetTableServiceConfig().GetIteratorReadQuotaSettings()); } + if (event.GetConfig().GetTableServiceConfig().HasPoolsConfiguration()) { + SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration()); + } + auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); @@ -359,6 +409,35 @@ class TKqpNodeService : public TActorBootstrapped { SetDefaultIteratorQuotaSettings(settings.GetMaxRows(), settings.GetMaxBytes()); } + void SetPriorities(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) { + std::function convert + = [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) + { + if (conf.HasName()) { + return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare(), .Name = conf.GetName()}; + } else if (conf.HasSubPoolsConfiguration()) { + auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare()}; + for (auto& subConf : conf.GetSubPoolsConfiguration().GetSubPools()) { + res.SubRules.push_back(convert(subConf)); + } + return res; + } else { + Y_ENSURE(false, "unknown case"); + } + }; + SetPriorities(convert(conf)); + } + + void SetPriorities(TComputeScheduler::TDistributionRule rule) { + NActors::TExecutorPoolStats poolStats; + TVector threadsStats; + TlsActivationContext->ActorSystem()->GetPoolStats(SelfId().PoolID(), poolStats, threadsStats); + Y_ENSURE(poolStats.MaxThreadCount > 0); + Counters->SchedulerCapacity->Set(poolStats.MaxThreadCount); + + Scheduler.SetPriorities(rule, poolStats.MaxThreadCount, TMonotonic::Now()); + } + void SetIteratorReadsRetrySettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) { auto ptr = MakeIntrusive(); ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartDelayMs()); @@ -445,6 +524,9 @@ class TKqpNodeService : public TActorBootstrapped { NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const std::optional FederatedQuerySetup; + TComputeScheduler Scheduler; + + //state sharded by TxId std::shared_ptr State_; }; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp new file mode 100644 index 000000000000..3946f405a647 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -0,0 +1,442 @@ +#include "kqp_compute_scheduler.h" + +namespace { + static constexpr ui64 FromDuration(TDuration d) { + return d.MicroSeconds(); + } + + static constexpr TDuration ToDuration(double t) { + return TDuration::MicroSeconds(t); + } + + static constexpr double MinEntitiesWeight = 1e-8; + + static constexpr TDuration AvgBatch = TDuration::MicroSeconds(100); +} + +namespace NKikimr { +namespace NKqp { + +template +class TMultiThreadView { +public: + TMultiThreadView(std::atomic* usage, T* slot) + : Usage(usage) + , Slot(slot) + { + Usage->fetch_add(1); + } + + const T* get() { + return Slot; + } + + ~TMultiThreadView() { + Usage->fetch_sub(1); + } + +private: + std::atomic* Usage; + T* Slot; +}; + +template +class TMultithreadPublisher { +public: + void Publish() { + auto oldVal = CurrentT.load(); + auto newVal = 1 - oldVal; + CurrentT.store(newVal); + while (true) { + if (Usage[oldVal].load() == 0) { + Slots[oldVal] = Slots[newVal]; + return; + } + } + } + + T* Next() { + return &Slots[1 - CurrentT.load()]; + } + + TMultiThreadView Current() { + while (true) { + auto val = CurrentT.load(); + TMultiThreadView view(&Usage[val], &Slots[val]); + if (CurrentT.load() == val) { + return view; + } + } + } + +private: + std::atomic CurrentT = 0; + std::atomic Usage[2] = {0, 0}; + T Slots[2]; +}; + +TSchedulerEntityHandle::TSchedulerEntityHandle(TSchedulerEntity* ptr) + : Ptr(ptr) +{ +} + +TSchedulerEntityHandle::TSchedulerEntityHandle(){} + +TSchedulerEntityHandle::TSchedulerEntityHandle(TSchedulerEntityHandle&& other) { + Ptr.swap(other.Ptr); +} + +TSchedulerEntityHandle& TSchedulerEntityHandle::operator = (TSchedulerEntityHandle&& other) { + Ptr.swap(other.Ptr); + return *this; +} + +TSchedulerEntityHandle::~TSchedulerEntityHandle() = default; + +class TSchedulerEntity { +public: + TSchedulerEntity() {} + ~TSchedulerEntity() {} + + struct TGroupMutableStats { + double Weight = 0; + TMonotonic LastNowRecalc; + bool Disabled = false; + double EntitiesWeight = 0; + double MaxDeviation = 0; + double MaxLimitDeviation = 0; + + ssize_t TrackedBefore = 0; + + double Limit(TMonotonic now) const { + return FromDuration(now - LastNowRecalc) * Weight + MaxLimitDeviation + TrackedBefore; + } + }; + + struct TGroupRecord { + std::atomic TrackedMicroSeconds = 0; + std::atomic DelayedSumBatches = 0; + std::atomic DelayedCount = 0; + + TMultithreadPublisher MutableStats; + }; + + TGroupRecord* Group; + double Weight; + double Vruntime = 0; + double Vstart; + + double Vcurrent; + + TDuration MaxDelay; + + static constexpr double WakeupDelay = 1.1; + static constexpr double BatchCalcDecay = 0; + TDuration BatchTime = AvgBatch; + + static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10); + + size_t Wakeups = 0; + bool isThrottled = false; + + void TrackTime(TDuration time, TMonotonic) { + auto group = Group->MutableStats.Current(); + Group->TrackedMicroSeconds.fetch_add(time.MicroSeconds()); + } + + void UpdateBatchTime(TDuration time) { + Wakeups = 0; + auto newBatch = BatchTime * BatchCalcDecay + time * (1 - BatchCalcDecay); + if (isThrottled) { + MarkResumed(); + BatchTime = newBatch; + MarkThrottled(); + } else { + BatchTime = newBatch; + } + } + + TMaybe GroupDelay(TMonotonic now) { + auto group = Group->MutableStats.Current(); + auto limit = group.get()->Limit(now); + auto tracked = Group->TrackedMicroSeconds.load(); + //double Coeff = pow(WakeupDelay, Wakeups); + if (limit > tracked) { + return {}; + } else { + return Min(MaxDelay, ToDuration(/*Coeff * */(tracked - limit + + Max(0, Group->DelayedSumBatches.load()) + BatchTime.MicroSeconds() + + ActivationPenalty.MicroSeconds() * (Group->DelayedCount.load() + 1) + + group.get()->MaxLimitDeviation) / group.get()->Weight)); + } + } + + void MarkThrottled() { + isThrottled = true; + Group->DelayedSumBatches.fetch_add(BatchTime.MicroSeconds()); + Group->DelayedCount.fetch_add(1); + } + + void MarkResumed() { + isThrottled = false; + Group->DelayedSumBatches.fetch_sub(BatchTime.MicroSeconds()); + Group->DelayedCount.fetch_sub(1); + } +}; + +struct TComputeScheduler::TImpl { + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> VtimeCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> EntitiesWeightCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> LimitCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> WeightCounters; + + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerClock; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerLimitUs; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerTrackedUs; + + THashMap PoolId; + std::vector> Records; + + struct TRule { + size_t Parent; + double Weight = 0; + + double Share; + TMaybe RecordId = {}; + double SubRulesSum = 0; + bool Empty = true; + }; + std::vector Rules; + + double SumCores; + + TIntrusivePtr Counters; + TDuration SmoothPeriod = TDuration::MilliSeconds(100); + + TDuration MaxDelay = TDuration::Seconds(10); + + void AssignWeights() { + ssize_t rootRule = static_cast(Rules.size()) - 1; + for (size_t i = 0; i < Rules.size(); ++i) { + Rules[i].SubRulesSum = 0; + Rules[i].Empty = true; + } + for (ssize_t i = 0; i < static_cast(Rules.size()); ++i) { + if (Rules[i].RecordId) { + Rules[i].Empty = Records[*Rules[i].RecordId]->MutableStats.Next()->EntitiesWeight < MinEntitiesWeight; + Rules[i].SubRulesSum = Rules[i].Share; + } + if (i != rootRule && !Rules[i].Empty) { + Rules[Rules[i].Parent].Empty = false; + Rules[Rules[i].Parent].SubRulesSum += Rules[i].SubRulesSum; + } + } + for (ssize_t i = static_cast(Rules.size()) - 1; i >= 0; --i) { + if (i == static_cast(Rules.size()) - 1) { + Rules[i].Weight = SumCores * Rules[i].Share; + } else if (!Rules[i].Empty) { + Rules[i].Weight = Rules[Rules[i].Parent].Weight * Rules[i].Share / Rules[Rules[i].Parent].SubRulesSum; + } else { + Rules[i].Weight = 0; + } + if (Rules[i].RecordId) { + Records[*Rules[i].RecordId]->MutableStats.Next()->Weight = Rules[i].Weight; + } + } + } +}; + +TComputeScheduler::TComputeScheduler() { + Impl = std::make_unique(); +} + +TComputeScheduler::~TComputeScheduler() = default; + +void TComputeScheduler::SetPriorities(TDistributionRule rule, double cores, TMonotonic now) { + THashSet seenNames; + std::function exploreNames = [&](TDistributionRule& rule) { + if (rule.SubRules.empty()) { + seenNames.insert(rule.Name); + } else { + for (auto& subRule : rule.SubRules) { + exploreNames(subRule); + } + } + }; + exploreNames(rule); + + for (auto& k : seenNames) { + auto ptr = Impl->PoolId.FindPtr(k); + if (!ptr) { + Impl->PoolId[k] = Impl->Records.size(); + auto group = std::make_unique(); + group->MutableStats.Next()->LastNowRecalc = now; + Impl->Records.push_back(std::move(group)); + } + } + for (auto& [k, v] : Impl->PoolId) { + if (!seenNames.contains(k)) { + auto& group = Impl->Records[Impl->PoolId[k]]->MutableStats; + group.Next()->Weight = 0; + group.Next()->Disabled = true; + group.Publish(); + } + } + Impl->SumCores = cores; + + TVector rules; + std::function makeRules = [&](TDistributionRule& rule) { + size_t result; + if (rule.SubRules.empty()) { + result = rules.size(); + rules.push_back(TImpl::TRule{.Share = rule.Share, .RecordId=Impl->PoolId[rule.Name]}); + } else { + TVector toAssign; + for (auto& subRule : rule.SubRules) { + toAssign.push_back(makeRules(subRule)); + } + size_t result = rules.size(); + rules.push_back(TImpl::TRule{.Share = rule.Share}); + for (auto i : toAssign) { + rules[i].Parent = result; + } + return result; + } + return result; + }; + makeRules(rule); + Impl->Rules.swap(rules); + + Impl->AssignWeights(); + for (auto& record : Impl->Records) { + record->MutableStats.Publish(); + } +} + + +TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight, TMonotonic now) { + Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group"); + auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get(); + groupEntry->MutableStats.Next()->EntitiesWeight += weight; + Impl->AssignWeights(); + AdvanceTime(now); + + auto result = std::make_unique(); + result->Group = groupEntry; + result->Weight = weight; + result->MaxDelay = Impl->MaxDelay; + + return TSchedulerEntityHandle(result.release()); +} + +void TComputeScheduler::AdvanceTime(TMonotonic now) { + if (Impl->Counters) { + if (Impl->VtimeCounters.size() < Impl->Records.size()) { + Impl->VtimeCounters.resize(Impl->Records.size()); + Impl->EntitiesWeightCounters.resize(Impl->Records.size()); + Impl->LimitCounters.resize(Impl->Records.size()); + Impl->WeightCounters.resize(Impl->Records.size()); + Impl->SchedulerClock.resize(Impl->Records.size()); + Impl->SchedulerLimitUs.resize(Impl->Records.size()); + Impl->SchedulerTrackedUs.resize(Impl->Records.size()); + + for (auto& [k, i] : Impl->PoolId) { + auto group = Impl->Counters->GetKqpCounters()->GetSubgroup("NodeScheduler/Group", k); + Impl->VtimeCounters[i] = group->GetCounter("VTime", true); + Impl->EntitiesWeightCounters[i] = group->GetCounter("Entities", false); + Impl->LimitCounters[i] = group->GetCounter("Limit", true); + Impl->WeightCounters[i] = group->GetCounter("Weight", false); + Impl->SchedulerClock[i] = group->GetCounter("Clock", false); + Impl->SchedulerTrackedUs[i] = group->GetCounter("Tracked", true); + Impl->SchedulerLimitUs[i] = group->GetCounter("AbsoluteLimit", true); + } + } + } + for (size_t i = 0; i < Impl->Records.size(); ++i) { + auto& v = Impl->Records[i]->MutableStats; + { + auto group = v.Current(); + if (group.get()->LastNowRecalc > now) { + continue; + } + double delta = 0; + + v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load(); + v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight; + v.Next()->LastNowRecalc = now; + v.Next()->TrackedBefore = Min(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore); + + if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) { + delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight; + v.Next()->MaxDeviation = (FromDuration(Impl->SmoothPeriod) * v.Next()->Weight) / v.Next()->EntitiesWeight; + } + + if (Impl->VtimeCounters.size() > i && Impl->VtimeCounters[i]) { + Impl->SchedulerLimitUs[i]->Set(group.get()->Limit(now)); + Impl->SchedulerTrackedUs[i]->Set(Impl->Records[i]->TrackedMicroSeconds.load()); + Impl->SchedulerClock[i]->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds()); + Impl->VtimeCounters[i]->Add(delta); + Impl->EntitiesWeightCounters[i]->Set(v.Next()->EntitiesWeight); + Impl->LimitCounters[i]->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight); + Impl->WeightCounters[i]->Set(group.get()->Weight); + } + } + v.Publish(); + } +} + +void TComputeScheduler::Deregister(TSchedulerEntity& self, TMonotonic now) { + auto* group = self.Group->MutableStats.Next(); + group->EntitiesWeight -= self.Weight; + + Impl->AssignWeights(); + AdvanceTime(now); +} + +void TSchedulerEntityHandle::TrackTime(TDuration time, TMonotonic now) { + Ptr->TrackTime(time, now); +} + +void TSchedulerEntityHandle::ReportBatchTime(TDuration time) { + Ptr->UpdateBatchTime(time); +} + +TMaybe TSchedulerEntityHandle::Delay(TMonotonic now) { + return Ptr->GroupDelay(now); +} + +void TSchedulerEntityHandle::MarkResumed() { + Ptr->MarkResumed(); +} + +void TSchedulerEntityHandle::MarkThrottled() { + Ptr->MarkThrottled(); +} + +void TSchedulerEntityHandle::Clear() { + Ptr.reset(); +} + +void TComputeScheduler::ReportCounters(TIntrusivePtr counters) { + Impl->Counters = counters; +} + +void TComputeScheduler::SetMaxDeviation(TDuration period) { + Impl->SmoothPeriod = period; +} + +bool TComputeScheduler::Disabled(TString group) { + auto ptr = Impl->PoolId.FindPtr(group); + return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled; +} + + +::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter(TString group) const { + return Impl->Counters + ->GetKqpCounters() + ->GetSubgroup("NodeScheduler/Group", group) + ->GetCounter("Usage", true); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h new file mode 100644 index 000000000000..e3c39a0291de --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -0,0 +1,320 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +#include + +namespace NKikimr { +namespace NKqp { + +class TSchedulerEntity; +class TSchedulerEntityHandle { +private: + std::unique_ptr Ptr; + +public: + TSchedulerEntityHandle(TSchedulerEntity*); + + TSchedulerEntityHandle(); + TSchedulerEntityHandle(TSchedulerEntityHandle&&); + + TSchedulerEntityHandle& operator = (TSchedulerEntityHandle&&); + + bool Defined() const { + return Ptr.get() != nullptr; + } + + operator bool () const { + return Defined(); + } + + TSchedulerEntity& operator*() { + return *Ptr; + } + + void TrackTime(TDuration time, TMonotonic now); + void ReportBatchTime(TDuration time); + + TMaybe Delay(TMonotonic now); + + void MarkThrottled(); + void MarkResumed(); + + double EstimateWeight(TMonotonic now, TDuration minTime); + + void Clear(); + + ~TSchedulerEntityHandle(); +}; + +class TComputeScheduler { +public: + struct TDistributionRule { + double Share; + TString Name; + TVector SubRules; + + bool empty() { + return SubRules.empty() && Name.empty(); + } + }; + +public: + TComputeScheduler(); + ~TComputeScheduler(); + + void ReportCounters(TIntrusivePtr); + + void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now); + void SetMaxDeviation(TDuration); + ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const; + + TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now); + + void AdvanceTime(TMonotonic now); + + void Deregister(TSchedulerEntity& self, TMonotonic now); + + bool Disabled(TString group); + +private: + struct TImpl; + std::unique_ptr Impl; +}; + +struct TComputeActorSchedulingOptions { + TMonotonic Now; + NActors::TActorId NodeService; + TSchedulerEntityHandle Handle; + TComputeScheduler* Scheduler; + TString Group = ""; + double Weight = 1; + bool NoThrottle = true; + TIntrusivePtr Counters = nullptr; +}; + +struct TKqpComputeSchedulerEvents { + enum EKqpComputeSchedulerEvents { + EvDeregister = EventSpaceBegin(TKikimrEvents::ES_KQP) + 400, + EvAccountTime, + }; +}; + +struct TEvSchedulerDeregister : public TEventLocal { + TSchedulerEntityHandle SchedulerEntity; + + TEvSchedulerDeregister(TSchedulerEntityHandle entity) + : SchedulerEntity(std::move(entity)) + { + } +}; + +template +class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase { +private: + using TBase = NYql::NDq::TDqSyncComputeActorBase; + + static constexpr double SecToUsec = 1e6; + +public: + template + TSchedulableComputeActorBase(TComputeActorSchedulingOptions options, TArgs&&... args) + : TBase(std::forward(args)...) + , SelfHandle(std::move(options.Handle)) + , NodeService(options.NodeService) + , NoThrottle(options.NoThrottle) + , Counters(options.Counters) + , Group(options.Group) + , Weight(options.Weight) + { + if (!NoThrottle) { + Y_ABORT_UNLESS(Counters); + Y_ABORT_UNLESS(SelfHandle); + GroupUsage = options.Scheduler->GetGroupUsageCounter(options.Group); + } else { + Y_ABORT_UNLESS(!SelfHandle); + } + } + + static constexpr ui64 ResumeWakeupTag = 201; + + TMonotonic Now() { + return TMonotonic::Now(); + //return TlsActivationContext->Monotonic(); + } + + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { + auto tag = ev->Get()->Tag; + CA_LOG_D("wakeup with tag " << tag); + if (tag == ResumeWakeupTag) { + TBase::DoExecute(); + } else { + TBase::HandleExecuteBase(ev); + } + } + + STFUNC(BaseStateFuncBody) { + AccountActorSystemStats(TlsActivationContext->Monotonic()); + // we assume that exception handling is done in parents/descendents + switch (ev->GetTypeRewrite()) { + hFunc(NActors::TEvents::TEvWakeup, TSchedulableComputeActorBase::HandleWakeup); + default: + TBase::BaseStateFuncBody(ev); + } + } + + void DoBoostrap() { + if (!SelfHandle.Defined()) { + return; + } + + OldActivationStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + if (!OldActivationStats.has_value()) { + TlsActivationContext->AsActorContext().Mailbox.EnableStats(); + OldActivationStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + } + + Y_ABORT_UNLESS(OldActivationStats.has_value()); + } + +private: + void ReportThrottledTime(TMonotonic now) { + if (Counters && Throttled) { + Counters->SchedulerThrottled->Add((now - *Throttled).MicroSeconds()); + } + if (Throttled) { + SelfHandle.MarkResumed(); + Throttled.Clear(); + } + } + +protected: + void DoExecuteImpl() override { + if (!SelfHandle.Defined()) { + if (NoThrottle) { + return TBase::DoExecuteImpl(); + } else { + return; + } + } + + TMonotonic now = Now(); + AccountActorSystemStats(now); + TMaybe delay = CalcDelay(now); + bool executed = false; + if (NoThrottle || !delay) { + ReportThrottledTime(now); + executed = true; + + ExecutionTimer.ConstructInPlace(); + TBase::DoExecuteImpl(); + + TDuration passed = TDuration::MicroSeconds(ExecutionTimer->Passed() * SecToUsec); + + if (Finished) { + return; + } + TrackedWork += passed; + SelfHandle.ReportBatchTime(passed); + SelfHandle.TrackTime(passed, now); + GroupUsage->Add(passed.MicroSeconds()); + Counters->ComputeActorExecutions->Collect(passed.MicroSeconds()); + } + if (delay) { + Counters->SchedulerDelays->Collect(delay->MicroSeconds()); + CA_LOG_D("schedule wakeup after " << delay->MicroSeconds() << " msec "); + this->Schedule(*delay, new NActors::TEvents::TEvWakeup(ResumeWakeupTag)); + } + + if (!executed) { + if (!Throttled) { + SelfHandle.MarkThrottled(); + Throttled = now; + } else { + Counters->ThrottledActorsSpuriousActivations->Inc(); + } + } + ExecutionTimer.Clear(); + } + + void AccountActorSystemStats(NMonotonic::TMonotonic now) { + if (!SelfHandle.Defined()) { + return; + } + + auto newStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + Y_ABORT_UNLESS(OldActivationStats.has_value()); + Y_ABORT_UNLESS(newStats.has_value()); + Y_ABORT_UNLESS(*newStats >= *OldActivationStats); + auto toAccount = TDuration::MicroSeconds(NHPTimer::GetSeconds(*newStats - *OldActivationStats) * 1e6); + if (toAccount.MicroSeconds() > 100000) { + CA_LOG_E("very huge account " << toAccount.MicroSeconds() << " newStats=" << newStats << " oldStats=" << OldActivationStats << " trackedWork=" << TrackedWork.MicroSeconds()); + } + { + auto minTime = Min(toAccount, TrackedWork); + TrackedWork -= minTime; + toAccount -= minTime; + } + + GroupUsage->Add(toAccount.MicroSeconds()); + SelfHandle.TrackTime(toAccount, now); + OldActivationStats = newStats; + } + + TMaybe CalcDelay(NMonotonic::TMonotonic now) { + auto result = SelfHandle.Delay(now); + Counters->ComputeActorDelays->Collect(result.GetOrElse(TDuration::Zero()).MicroSeconds()); + if (NoThrottle || !result.Defined()) { + return {}; + } else { + return result; + } + } + + void PassAway() override { + Finished = true; + if (SelfHandle) { + auto now = Now(); + if (Throttled) { + SelfHandle.MarkResumed(); + } + if (ExecutionTimer) { + TDuration passed = TDuration::MicroSeconds(ExecutionTimer->Passed() * SecToUsec); + SelfHandle.TrackTime(passed, now); + GroupUsage->Add(passed.MicroSeconds()); + } + } + if (SelfHandle) { + auto finishEv = MakeHolder(std::move(SelfHandle)); + this->Send(NodeService, finishEv.Release()); + } + TBase::PassAway(); + } + +private: + TMaybe ExecutionTimer; + TDuration TrackedWork = TDuration::Zero(); + TMaybe Throttled; + TSchedulerEntityHandle SelfHandle; + NActors::TActorId NodeService; + bool NoThrottle; + bool Finished = false; + + std::optional OldActivationStats; + + TIntrusivePtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr GroupUsage; + + TString Group; + double Weight; +}; + +} // namespace NKqp +} // namespace NKikimR diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b9bb9f748dd4..aa17440a948d 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1,4 +1,5 @@ #include "kqp_read_actor.h" +#include "kqp_compute_scheduler.h" #include #include diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index 3801d1df0ea0..615ccbc9cbf4 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -5,6 +5,7 @@ SRCS( kqp_effects.cpp kqp_output_stream.cpp kqp_program_builder.cpp + kqp_compute_scheduler.cpp kqp_read_actor.cpp kqp_read_iterator_common.cpp kqp_read_table.cpp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9b107e2732b..d9bed4a449e8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -584,6 +584,7 @@ message TEvStartKqpTasksRequest { optional bool StartAllOrFail = 6 [default = true]; optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value optional string SerializedGUCSettings = 8; + optional string SchedulerGroup = 9; optional uint64 LockTxId = 13; optional uint32 LockNodeId = 14; } diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index ce437818af6a..ad0704df9f4c 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -309,4 +309,18 @@ message TTableServiceConfig { optional bool EnableImplicitQueryParameterTypes = 66 [ default = false ]; + optional string EnableSpillingNodes = 67 [ default = "All" ]; + + message TSubPoolsConfiguration { + repeated TComputePoolConfiguration SubPools = 1; + }; + + message TComputePoolConfiguration { + optional double MaxCpuShare = 1; + oneof ResourceConfiguration { + string Name = 2; + TSubPoolsConfiguration SubPoolsConfiguration = 3; + } + }; + optional TComputePoolConfiguration PoolsConfiguration = 68; }; From 154c365efc6456447ca3baf48a8e873c603f115d Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Thu, 8 Aug 2024 13:34:53 +0300 Subject: [PATCH 2/5] Remove debug output (#7553) --- ydb/core/kqp/runtime/kqp_compute_scheduler.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h index e3c39a0291de..21f7380db171 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -254,9 +254,6 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase= *OldActivationStats); auto toAccount = TDuration::MicroSeconds(NHPTimer::GetSeconds(*newStats - *OldActivationStats) * 1e6); - if (toAccount.MicroSeconds() > 100000) { - CA_LOG_E("very huge account " << toAccount.MicroSeconds() << " newStats=" << newStats << " oldStats=" << OldActivationStats << " trackedWork=" << TrackedWork.MicroSeconds()); - } { auto minTime = Min(toAccount, TrackedWork); TrackedWork -= minTime; From d8189dce70c5bfa5ce10ac93c85f27002720d752 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Wed, 14 Aug 2024 19:28:11 +0300 Subject: [PATCH 3/5] Allow to forget consumption peaks (#7763) --- ydb/core/kqp/node_service/kqp_node_service.cpp | 11 ++++++++--- ydb/core/kqp/runtime/kqp_compute_scheduler.cpp | 14 ++++++++++++-- ydb/core/kqp/runtime/kqp_compute_scheduler.h | 1 + ydb/core/protos/table_service_config.proto | 13 +++++++++++-- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 8895c2c1f15c..01508d9af7c4 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -85,6 +85,8 @@ class TKqpNodeService : public TActorBootstrapped { SetPriorities(config.GetPoolsConfiguration()); } Scheduler.ReportCounters(counters); + AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); + Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); } void Bootstrap() { @@ -104,7 +106,7 @@ class TKqpNodeService : public TActorBootstrapped { } Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag)); - Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); Become(&TKqpNodeService::WorkState); } @@ -345,7 +347,7 @@ class TKqpNodeService : public TActorBootstrapped { void HandleWork(TEvents::TEvWakeup::TPtr& ev) { if (ev->Get()->Tag == WakeAdvanceTimeTag) { Scheduler.AdvanceTime(TMonotonic::Now()); - Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); } if (ev->Get()->Tag == WakeCleaunupTag) { Schedule(TDuration::Seconds(1), ev->Release().Release()); @@ -400,9 +402,11 @@ class TKqpNodeService : public TActorBootstrapped { SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration()); } + AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); + Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); + auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); - } void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) { @@ -525,6 +529,7 @@ class TKqpNodeService : public TActorBootstrapped { const std::optional FederatedQuerySetup; TComputeScheduler Scheduler; + TDuration AdvanceTimeInterval; //state sharded by TxId std::shared_ptr State_; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp index 3946f405a647..a5764cfabb6d 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -134,6 +134,8 @@ class TSchedulerEntity { static constexpr double BatchCalcDecay = 0; TDuration BatchTime = AvgBatch; + TDuration OverflowToleranceTimeout = TDuration::Seconds(1); + static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10); size_t Wakeups = 0; @@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl { TIntrusivePtr Counters; TDuration SmoothPeriod = TDuration::MilliSeconds(100); + TDuration ForgetInteval = TDuration::Seconds(2); TDuration MaxDelay = TDuration::Seconds(10); @@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) { } double delta = 0; - v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load(); + auto tracked = Impl->Records[i]->TrackedMicroSeconds.load(); v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight; v.Next()->LastNowRecalc = now; - v.Next()->TrackedBefore = Min(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore); + v.Next()->TrackedBefore = + Max( + tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight, + Min(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked)); if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) { delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight; @@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) { Impl->SmoothPeriod = period; } +void TComputeScheduler::SetForgetInterval(TDuration period) { + Impl->ForgetInteval = period; +} + bool TComputeScheduler::Disabled(TString group) { auto ptr = Impl->PoolId.FindPtr(group); return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h index 21f7380db171..d2f706ab94a9 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -74,6 +74,7 @@ class TComputeScheduler { void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now); void SetMaxDeviation(TDuration); + void SetForgetInterval(TDuration); ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const; TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index ad0704df9f4c..fe8a37435cc0 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -313,7 +313,7 @@ message TTableServiceConfig { message TSubPoolsConfiguration { repeated TComputePoolConfiguration SubPools = 1; - }; + } message TComputePoolConfiguration { optional double MaxCpuShare = 1; @@ -321,6 +321,15 @@ message TTableServiceConfig { string Name = 2; TSubPoolsConfiguration SubPoolsConfiguration = 3; } - }; + } + + message TComputeSchedulerSettings { + optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000]; + optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000]; + } + optional TComputePoolConfiguration PoolsConfiguration = 68; + optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; + + optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; }; From 26571c3f5cdb926c49a56bf96899eeeebb880fbc Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Wed, 28 Aug 2024 17:00:27 +0300 Subject: [PATCH 4/5] Use cpu limits from resource pool scheme objects (#8370) --- ydb/core/kqp/executer_actor/kqp_planner.cpp | 13 + .../kqp/node_service/kqp_node_service.cpp | 116 +++------ .../kqp/runtime/kqp_compute_scheduler.cpp | 244 ++++++++++-------- ydb/core/kqp/runtime/kqp_compute_scheduler.h | 50 ++-- ydb/core/protos/kqp.proto | 3 + ydb/core/protos/table_service_config.proto | 15 +- .../resource_pools/resource_pool_settings.cpp | 3 +- .../resource_pools/resource_pool_settings.h | 1 + 8 files changed, 233 insertions(+), 212 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 5e5a16efe76c..834f90beb192 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -53,6 +53,10 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe ret.HeavyProgram = opts.GetHasMapJoin(); } +bool LimitCPU(TIntrusivePtr ctx) { + return ctx->PoolId && ctx->PoolConfig.has_value() && ctx->PoolConfig->TotalCpuLimitPercentPerNode > 0; +} + } bool TKqpPlanner::UseMockEmptyPlanner = false; @@ -101,6 +105,10 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) LOG_E("Database not set, use " << Database); } } + + if (LimitCPU(UserRequestContext)) { + AllowSinglePartitionOpt = false; + } } // ResourcesSnapshot, ResourceEstimations @@ -224,6 +232,11 @@ std::unique_ptr TKqpPlanner::SerializeReque } request.SetSchedulerGroup(UserRequestContext->PoolId); + request.SetDatabase(Database); + if (UserRequestContext->PoolConfig.has_value()) { + request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode); + request.SetMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0); + } return result; } diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 01508d9af7c4..b4f8931674e7 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -81,12 +81,13 @@ class TKqpNodeService : public TActorBootstrapped { if (config.HasIteratorReadQuotaSettings()) { SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings()); } - if (config.HasPoolsConfiguration()) { - SetPriorities(config.GetPoolsConfiguration()); - } - Scheduler.ReportCounters(counters); - AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); - Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); + + SchedulerOptions = { + .AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()), + .ForgetOverflowTimeout = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()), + .ActivePoolPollingTimeout = TDuration::Seconds(config.GetComputeSchedulerSettings().GetActivePoolPollingSec()), + .Counters = counters, + }; } void Bootstrap() { @@ -105,15 +106,13 @@ class TKqpNodeService : public TActorBootstrapped { TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); } - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag)); - Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); Become(&TKqpNodeService::WorkState); - } - enum { - WakeCleaunupTag, - WakeAdvanceTimeTag - }; + Scheduler = std::make_shared(); + SchedulerOptions.Scheduler = Scheduler; + SchedulerActorId = RegisterWithSameMailbox(CreateSchedulerActor(SchedulerOptions)); + } private: STATEFN(WorkState) { @@ -128,8 +127,6 @@ class TKqpNodeService : public TActorBootstrapped { hFunc(TEvents::TEvUndelivered, HandleWork); hFunc(TEvents::TEvPoison, HandleWork); hFunc(NMon::TEvHttpInfo, HandleWork); - // sheduling - hFunc(TEvSchedulerDeregister, HandleWork); default: { Y_ABORT("Unexpected event 0x%x for TKqpResourceManagerService", ev->GetTypeRewrite()); } @@ -138,12 +135,6 @@ class TKqpNodeService : public TActorBootstrapped { static constexpr double SecToUsec = 1e6; - void HandleWork(TEvSchedulerDeregister::TPtr& ev) { - if (ev->Get()->SchedulerEntity) { - Scheduler.Deregister(*ev->Get()->SchedulerEntity, TMonotonic::Now()); - } - } - void HandleWork(TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) { NWilson::TSpan sendTasksSpan(TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId(ev->TraceId), "KqpNode.SendTasks", NWilson::EFlags::AUTO_END); @@ -200,7 +191,7 @@ class TKqpNodeService : public TActorBootstrapped { const TString& serializedGUCSettings = ev->Get()->Record.HasSerializedGUCSettings() ? ev->Get()->Record.GetSerializedGUCSettings() : ""; - auto schedulerNow = TMonotonic::Now(); + auto schedulerNow = TlsActivationContext->Monotonic(); // start compute actors TMaybe rlPath = Nothing(); @@ -215,20 +206,28 @@ class TKqpNodeService : public TActorBootstrapped { for (auto& dqTask: *msg.MutableTasks()) { TString group = msg.GetSchedulerGroup(); - TComputeActorSchedulingOptions schedulingOptions { + TComputeActorSchedulingOptions schedulingTaskOptions { .Now = schedulerNow, - .NodeService = SelfId(), - .Scheduler = &Scheduler, + .SchedulerActorId = SchedulerActorId, + .Scheduler = Scheduler.get(), .Group = group, .Weight = 1, .NoThrottle = false, .Counters = Counters }; - if (Scheduler.Disabled(schedulingOptions.Group)) { - schedulingOptions.NoThrottle = true; - } else { - schedulingOptions.Handle = Scheduler.Enroll(schedulingOptions.Group, schedulingOptions.Weight, schedulingOptions.Now); + if (SchedulerOptions.Scheduler->Disabled(group)) { + auto share = msg.GetMaxCpuShare(); + if (share > 0) { + Scheduler->UpdateMaxShare(group, share, schedulerNow); + Send(SchedulerActorId, new TEvSchedulerNewPool(msg.GetDatabase(), group, share)); + } else { + schedulingTaskOptions.NoThrottle = true; + } + } + + if (!schedulingTaskOptions.NoThrottle) { + schedulingTaskOptions.Handle = SchedulerOptions.Scheduler->Enroll(schedulingTaskOptions.Group, schedulingTaskOptions.Weight, schedulingTaskOptions.Now); } auto result = CaFactory_->CreateKqpComputeActor({ @@ -252,7 +251,7 @@ class TKqpNodeService : public TActorBootstrapped { .RlPath = rlPath, .ComputesByStages = &computesByStage, .State = State_, - .SchedulingOptions = std::move(schedulingOptions) + .SchedulingOptions = std::move(schedulingTaskOptions), }); if (const auto* rmResult = std::get_if(&result)) { @@ -345,17 +344,11 @@ class TKqpNodeService : public TActorBootstrapped { } void HandleWork(TEvents::TEvWakeup::TPtr& ev) { - if (ev->Get()->Tag == WakeAdvanceTimeTag) { - Scheduler.AdvanceTime(TMonotonic::Now()); - Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); - } - if (ev->Get()->Tag == WakeCleaunupTag) { - Schedule(TDuration::Seconds(1), ev->Release().Release()); - for (auto& bucket : State_->Buckets) { - auto expiredRequests = bucket.ClearExpiredRequests(); - for (auto& cxt : expiredRequests) { - TerminateTx(cxt.TxId, "reached execution deadline", NYql::NDqProto::StatusIds::TIMEOUT); - } + Schedule(TDuration::Seconds(1), ev->Release().Release()); + for (auto& bucket : State_->Buckets) { + auto expiredRequests = bucket.ClearExpiredRequests(); + for (auto& cxt : expiredRequests) { + TerminateTx(cxt.TxId, "reached execution deadline", NYql::NDqProto::StatusIds::TIMEOUT); } } } @@ -398,13 +391,6 @@ class TKqpNodeService : public TActorBootstrapped { SetIteratorReadsQuotaSettings(event.GetConfig().GetTableServiceConfig().GetIteratorReadQuotaSettings()); } - if (event.GetConfig().GetTableServiceConfig().HasPoolsConfiguration()) { - SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration()); - } - - AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); - Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); - auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); } @@ -413,35 +399,6 @@ class TKqpNodeService : public TActorBootstrapped { SetDefaultIteratorQuotaSettings(settings.GetMaxRows(), settings.GetMaxBytes()); } - void SetPriorities(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) { - std::function convert - = [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) - { - if (conf.HasName()) { - return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare(), .Name = conf.GetName()}; - } else if (conf.HasSubPoolsConfiguration()) { - auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare()}; - for (auto& subConf : conf.GetSubPoolsConfiguration().GetSubPools()) { - res.SubRules.push_back(convert(subConf)); - } - return res; - } else { - Y_ENSURE(false, "unknown case"); - } - }; - SetPriorities(convert(conf)); - } - - void SetPriorities(TComputeScheduler::TDistributionRule rule) { - NActors::TExecutorPoolStats poolStats; - TVector threadsStats; - TlsActivationContext->ActorSystem()->GetPoolStats(SelfId().PoolID(), poolStats, threadsStats); - Y_ENSURE(poolStats.MaxThreadCount > 0); - Counters->SchedulerCapacity->Set(poolStats.MaxThreadCount); - - Scheduler.SetPriorities(rule, poolStats.MaxThreadCount, TMonotonic::Now()); - } - void SetIteratorReadsRetrySettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) { auto ptr = MakeIntrusive(); ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartDelayMs()); @@ -528,8 +485,9 @@ class TKqpNodeService : public TActorBootstrapped { NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const std::optional FederatedQuerySetup; - TComputeScheduler Scheduler; - TDuration AdvanceTimeInterval; + std::shared_ptr Scheduler; + TSchedulerActorOptions SchedulerOptions; + TActorId SchedulerActorId; //state sharded by TxId std::shared_ptr State_; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp index a5764cfabb6d..d24bc60e870a 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -1,5 +1,13 @@ #include "kqp_compute_scheduler.h" +#include + +#include +#include + +#include +#include + namespace { static constexpr ui64 FromDuration(TDuration d) { return d.MicroSeconds(); @@ -199,17 +207,6 @@ struct TComputeScheduler::TImpl { THashMap PoolId; std::vector> Records; - struct TRule { - size_t Parent; - double Weight = 0; - - double Share; - TMaybe RecordId = {}; - double SubRulesSum = 0; - bool Empty = true; - }; - std::vector Rules; - double SumCores; TIntrusivePtr Counters; @@ -218,35 +215,15 @@ struct TComputeScheduler::TImpl { TDuration MaxDelay = TDuration::Seconds(10); - void AssignWeights() { - ssize_t rootRule = static_cast(Rules.size()) - 1; - for (size_t i = 0; i < Rules.size(); ++i) { - Rules[i].SubRulesSum = 0; - Rules[i].Empty = true; - } - for (ssize_t i = 0; i < static_cast(Rules.size()); ++i) { - if (Rules[i].RecordId) { - Rules[i].Empty = Records[*Rules[i].RecordId]->MutableStats.Next()->EntitiesWeight < MinEntitiesWeight; - Rules[i].SubRulesSum = Rules[i].Share; - } - if (i != rootRule && !Rules[i].Empty) { - Rules[Rules[i].Parent].Empty = false; - Rules[Rules[i].Parent].SubRulesSum += Rules[i].SubRulesSum; - } - } - for (ssize_t i = static_cast(Rules.size()) - 1; i >= 0; --i) { - if (i == static_cast(Rules.size()) - 1) { - Rules[i].Weight = SumCores * Rules[i].Share; - } else if (!Rules[i].Empty) { - Rules[i].Weight = Rules[Rules[i].Parent].Weight * Rules[i].Share / Rules[Rules[i].Parent].SubRulesSum; - } else { - Rules[i].Weight = 0; - } - if (Rules[i].RecordId) { - Records[*Rules[i].RecordId]->MutableStats.Next()->Weight = Rules[i].Weight; - } - } - } + void AssignWeights() { } + + void CreateGroup(TString groupName, double maxShare, NMonotonic::TMonotonic now) { + PoolId[groupName] = Records.size(); + auto group = std::make_unique(); + group->MutableStats.Next()->LastNowRecalc = now; + group->MutableStats.Next()->Weight = maxShare; + Records.push_back(std::move(group)); + } }; TComputeScheduler::TComputeScheduler() { @@ -255,68 +232,6 @@ TComputeScheduler::TComputeScheduler() { TComputeScheduler::~TComputeScheduler() = default; -void TComputeScheduler::SetPriorities(TDistributionRule rule, double cores, TMonotonic now) { - THashSet seenNames; - std::function exploreNames = [&](TDistributionRule& rule) { - if (rule.SubRules.empty()) { - seenNames.insert(rule.Name); - } else { - for (auto& subRule : rule.SubRules) { - exploreNames(subRule); - } - } - }; - exploreNames(rule); - - for (auto& k : seenNames) { - auto ptr = Impl->PoolId.FindPtr(k); - if (!ptr) { - Impl->PoolId[k] = Impl->Records.size(); - auto group = std::make_unique(); - group->MutableStats.Next()->LastNowRecalc = now; - Impl->Records.push_back(std::move(group)); - } - } - for (auto& [k, v] : Impl->PoolId) { - if (!seenNames.contains(k)) { - auto& group = Impl->Records[Impl->PoolId[k]]->MutableStats; - group.Next()->Weight = 0; - group.Next()->Disabled = true; - group.Publish(); - } - } - Impl->SumCores = cores; - - TVector rules; - std::function makeRules = [&](TDistributionRule& rule) { - size_t result; - if (rule.SubRules.empty()) { - result = rules.size(); - rules.push_back(TImpl::TRule{.Share = rule.Share, .RecordId=Impl->PoolId[rule.Name]}); - } else { - TVector toAssign; - for (auto& subRule : rule.SubRules) { - toAssign.push_back(makeRules(subRule)); - } - size_t result = rules.size(); - rules.push_back(TImpl::TRule{.Share = rule.Share}); - for (auto i : toAssign) { - rules[i].Parent = result; - } - return result; - } - return result; - }; - makeRules(rule); - Impl->Rules.swap(rules); - - Impl->AssignWeights(); - for (auto& record : Impl->Records) { - record->MutableStats.Publish(); - } -} - - TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight, TMonotonic now) { Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group"); auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get(); @@ -441,6 +356,28 @@ bool TComputeScheduler::Disabled(TString group) { } +bool TComputeScheduler::Disable(TString group, TMonotonic now) { + auto ptr = Impl->PoolId.FindPtr(group); + if (Impl->Records[*ptr]->MutableStats.Current().get()->Weight > MinEntitiesWeight) { + return false; + } + Impl->Records[*ptr]->MutableStats.Next()->Disabled = true; + AdvanceTime(now); + return true; +} + +void TComputeScheduler::UpdateMaxShare(TString group, double share, TMonotonic now) { + auto ptr = Impl->PoolId.FindPtr(group); + if (!ptr) { + Impl->CreateGroup(group, share, now); + } else { + auto& record = Impl->Records[*ptr]; + record->MutableStats.Next()->Weight = share; + } + AdvanceTime(now); +} + + ::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter(TString group) const { return Impl->Counters ->GetKqpCounters() @@ -448,5 +385,108 @@ ::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCou ->GetCounter("Usage", true); } + +struct TEvPingPool : public TEventLocal { + TString Database; + TString Pool; + + TEvPingPool(TString database, TString pool) + : Database(database) + , Pool(pool) + { + } +}; + +class TSchedulerActor : public TActorBootstrapped { +public: + TSchedulerActor(TSchedulerActorOptions options) + : Opts(options) + { + if (!Opts.Scheduler) { + Opts.Scheduler = std::make_shared(); + } + Opts.Scheduler->SetForgetInterval(Opts.ForgetOverflowTimeout); + Opts.Scheduler->ReportCounters(Opts.Counters); + } + + void Bootstrap() { + Schedule(Opts.AdvanceTimeInterval, new TEvents::TEvWakeup()); + + ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem; + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), + new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}), + IEventHandle::FlagTrackDelivery); + + Become(&TSchedulerActor::State); + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + + hFunc(NWorkload::TEvUpdatePoolInfo, Handle); + + hFunc(TEvSchedulerDeregister, Handle); + hFunc(TEvSchedulerNewPool, Handle); + hFunc(TEvPingPool, Handle); + hFunc(TEvents::TEvWakeup, Handle); + default: { + Y_ABORT("Unexpected event 0x%x for TKqpSchedulerService", ev->GetTypeRewrite()); + } + } + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_NODE, "Subscribed for config changes"); + } + + void Handle(TEvSchedulerDeregister::TPtr& ev) { + if (ev->Get()->SchedulerEntity) { + Opts.Scheduler->Deregister(*ev->Get()->SchedulerEntity, TlsActivationContext->Monotonic()); + } + } + + void Handle(TEvSchedulerNewPool::TPtr& ev) { + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + Opts.Scheduler->UpdateMaxShare(ev->Get()->Pool, ev->Get()->MaxShare, TlsActivationContext->Monotonic()); + } + + void Handle(TEvPingPool::TPtr& ev) { + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + } + + void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { + if (ev->Get()->Config.has_value()) { + Opts.Scheduler->UpdateMaxShare(ev->Get()->PoolId, ev->Get()->Config->TotalCpuLimitPercentPerNode / 100.0, TlsActivationContext->Monotonic()); + } else { + if (!Opts.Scheduler->Disable(ev->Get()->PoolId, TlsActivationContext->Monotonic())) { + Schedule(Opts.ActivePoolPollingTimeout.ToDeadLine(), new TEvPingPool(ev->Get()->Database, ev->Get()->PoolId)); + } + } + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + Opts.Scheduler->AdvanceTime(TlsActivationContext->Monotonic()); + Schedule(Opts.AdvanceTimeInterval, new TEvents::TEvWakeup()); + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + auto &event = ev->Get()->Record; + auto& config = event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings(); + + Opts.AdvanceTimeInterval = TDuration::MicroSeconds(config.GetAdvanceTimeIntervalUsec()); + Opts.ActivePoolPollingTimeout = TDuration::Seconds(config.GetActivePoolPollingSec()); + Opts.Scheduler->SetForgetInterval(TDuration::MicroSeconds(config.GetForgetOverflowTimeoutUsec())); + } + +private: + TSchedulerActorOptions Opts; +}; + +IActor* CreateSchedulerActor(TSchedulerActorOptions opts) { + return new TSchedulerActor(opts); +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h index d2f706ab94a9..fc58b4e8dc98 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -55,24 +55,14 @@ class TSchedulerEntityHandle { }; class TComputeScheduler { -public: - struct TDistributionRule { - double Share; - TString Name; - TVector SubRules; - - bool empty() { - return SubRules.empty() && Name.empty(); - } - }; - public: TComputeScheduler(); ~TComputeScheduler(); void ReportCounters(TIntrusivePtr); - void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now); + void UpdateMaxShare(TString, double, TMonotonic now); + void SetMaxDeviation(TDuration); void SetForgetInterval(TDuration); ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const; @@ -84,6 +74,7 @@ class TComputeScheduler { void Deregister(TSchedulerEntity& self, TMonotonic now); bool Disabled(TString group); + bool Disable(TString group, TMonotonic now); private: struct TImpl; @@ -92,7 +83,7 @@ class TComputeScheduler { struct TComputeActorSchedulingOptions { TMonotonic Now; - NActors::TActorId NodeService; + NActors::TActorId SchedulerActorId; TSchedulerEntityHandle Handle; TComputeScheduler* Scheduler; TString Group = ""; @@ -104,7 +95,8 @@ struct TComputeActorSchedulingOptions { struct TKqpComputeSchedulerEvents { enum EKqpComputeSchedulerEvents { EvDeregister = EventSpaceBegin(TKikimrEvents::ES_KQP) + 400, - EvAccountTime, + EvNewPool, + EvPingPool, }; }; @@ -117,6 +109,20 @@ struct TEvSchedulerDeregister : public TEventLocal { + TString Database; + TString Pool; + double MaxShare; + + TEvSchedulerNewPool(TString database, TString pool, double maxShare) + : Database(database) + , Pool(pool) + , MaxShare(maxShare) + { + } +}; + + template class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase { private: @@ -129,7 +135,7 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase(args)...) , SelfHandle(std::move(options.Handle)) - , NodeService(options.NodeService) + , SchedulerActorId(options.SchedulerActorId) , NoThrottle(options.NoThrottle) , Counters(options.Counters) , Group(options.Group) @@ -291,7 +297,7 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase(std::move(SelfHandle)); - this->Send(NodeService, finishEv.Release()); + this->Send(SchedulerActorId, finishEv.Release()); } TBase::PassAway(); } @@ -301,7 +307,7 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase Throttled; TSchedulerEntityHandle SelfHandle; - NActors::TActorId NodeService; + NActors::TActorId SchedulerActorId; bool NoThrottle; bool Finished = false; @@ -314,5 +320,15 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase Scheduler; + TDuration AdvanceTimeInterval; + TDuration ForgetOverflowTimeout; + TDuration ActivePoolPollingTimeout; + TIntrusivePtr Counters; +}; + +IActor* CreateSchedulerActor(TSchedulerActorOptions); + } // namespace NKqp } // namespace NKikimR diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index d9bed4a449e8..fa9a42f981a8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -585,6 +585,9 @@ message TEvStartKqpTasksRequest { optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value optional string SerializedGUCSettings = 8; optional string SchedulerGroup = 9; + optional double MemoryPoolPercent = 10 [default = 100]; + optional string Database = 11; + optional double MaxCpuShare = 12; optional uint64 LockTxId = 13; optional uint32 LockNodeId = 14; } diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index fe8a37435cc0..a0fc6876ea9a 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -311,24 +311,13 @@ message TTableServiceConfig { optional string EnableSpillingNodes = 67 [ default = "All" ]; - message TSubPoolsConfiguration { - repeated TComputePoolConfiguration SubPools = 1; - } - - message TComputePoolConfiguration { - optional double MaxCpuShare = 1; - oneof ResourceConfiguration { - string Name = 2; - TSubPoolsConfiguration SubPoolsConfiguration = 3; - } - } - message TComputeSchedulerSettings { optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000]; optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000]; + optional uint64 ActivePoolPollingSec = 3 [default = 10]; } - optional TComputePoolConfiguration PoolsConfiguration = 68; + reserved 68; optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; diff --git a/ydb/core/resource_pools/resource_pool_settings.cpp b/ydb/core/resource_pools/resource_pool_settings.cpp index 5303229073f5..77d0e3a8d493 100644 --- a/ydb/core/resource_pools/resource_pool_settings.cpp +++ b/ydb/core/resource_pools/resource_pool_settings.cpp @@ -48,7 +48,8 @@ std::unordered_map TPoolSettings::GetProperti {"concurrent_query_limit", &ConcurrentQueryLimit}, {"queue_size", &QueueSize}, {"query_memory_limit_percent_per_node", &QueryMemoryLimitPercentPerNode}, - {"database_load_cpu_threshold", &DatabaseLoadCpuThreshold} + {"database_load_cpu_threshold", &DatabaseLoadCpuThreshold}, + {"total_cpu_limit_percent_per_node", &TotalCpuLimitPercentPerNode} }; if (!restricted) { properties.insert({"query_cancel_after_seconds", &QueryCancelAfter}); diff --git a/ydb/core/resource_pools/resource_pool_settings.h b/ydb/core/resource_pools/resource_pool_settings.h index edc24422d5a4..73a4031dea10 100644 --- a/ydb/core/resource_pools/resource_pool_settings.h +++ b/ydb/core/resource_pools/resource_pool_settings.h @@ -35,6 +35,7 @@ struct TPoolSettings : public TSettingsBase { TDuration QueryCancelAfter = TDuration::Zero(); // 0 = disabled TPercent QueryMemoryLimitPercentPerNode = -1; // Percent from node memory capacity, -1 = disabled TPercent DatabaseLoadCpuThreshold = -1; // -1 = disabled + TPercent TotalCpuLimitPercentPerNode = -1; // -1 = disabled }; } // namespace NKikimr::NResourcePool From e16fd707d8e7ef1a4427f3e5a7feb587c85fe297 Mon Sep 17 00:00:00 2001 From: kruall Date: Sat, 27 Jul 2024 00:33:32 +0300 Subject: [PATCH 5/5] Add mailbox stat (#6680) --- ydb/library/actors/core/actor.cpp | 4 + ydb/library/actors/core/actor.h | 3 + ydb/library/actors/core/executor_thread.cpp | 3 +- ydb/library/actors/core/mailbox.cpp | 24 +++- ydb/library/actors/core/mailbox.h | 119 ++++++++++++++++---- 5 files changed, 130 insertions(+), 23 deletions(-) diff --git a/ydb/library/actors/core/actor.cpp b/ydb/library/actors/core/actor.cpp index 179989fa2309..355e80c0df02 100644 --- a/ydb/library/actors/core/actor.cpp +++ b/ydb/library/actors/core/actor.cpp @@ -161,6 +161,10 @@ namespace NActors { return NHPTimer::GetSeconds(GetCurrentEventTicks()); } + void TActivationContext::EnableMailboxStats() { + TlsActivationContext->Mailbox.EnableStats(); + } + TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId); } diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h index 7385472d26e0..504335fa6a3d 100644 --- a/ydb/library/actors/core/actor.h +++ b/ydb/library/actors/core/actor.h @@ -3,6 +3,7 @@ #include "actorsystem.h" #include "event.h" #include "executor_thread.h" +#include "mailbox.h" #include "monotonic.h" #include "thread_context.h" @@ -130,6 +131,8 @@ namespace NActors { static i64 GetCurrentEventTicks(); static double GetCurrentEventTicksAsSeconds(); + + static void EnableMailboxStats(); }; struct TActorContext: public TActivationContext { diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index e9287e2059d9..acc82a046489 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -269,6 +269,7 @@ namespace NActors { Ctx.AddElapsedCycles(activityType, hpnow - hpprev); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter); + mailbox->AddElapsedCycles(elapsed); if (elapsed > 1000000) { LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); } @@ -372,7 +373,7 @@ namespace NActors { break; // empty queue, leave } } - TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release); + TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release); TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); NProfiling::TMemoryTagScope::Reset(0); diff --git a/ydb/library/actors/core/mailbox.cpp b/ydb/library/actors/core/mailbox.cpp index c208a4813f81..6b7ba86431a1 100644 --- a/ydb/library/actors/core/mailbox.cpp +++ b/ydb/library/actors/core/mailbox.cpp @@ -373,9 +373,9 @@ namespace NActors { CleanupActors(); } - bool TMailboxHeader::CleanupActors() { + bool TMailboxHeader::CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo) { bool done = true; - switch (ActorPack) { + switch (actorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId != 0) { delete ActorsInfo.Simple.Actor; @@ -399,13 +399,31 @@ namespace NActors { done = false; break; } + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } - ActorPack = TMailboxActorPack::Simple; + actorPack = TMailboxActorPack::Simple; ActorsInfo.Simple.ActorId = 0; ActorsInfo.Simple.Actor = nullptr; return done; } + bool TMailboxHeader::CleanupActors() { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + bool done = CleanupActors(pack, ActorsInfo); + ActorPack = pack; + return done; + } else { + bool done = CleanupActors(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo); + delete ActorsInfo.Complex; + ActorPack = TMailboxActorPack::Simple; + ActorsInfo.Simple.ActorId = 0; + ActorsInfo.Simple.Actor = nullptr; + return done; + } + } + std::pair TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) { switch (Type) { case TMailboxType::Simple: diff --git a/ydb/library/actors/core/mailbox.h b/ydb/library/actors/core/mailbox.h index e086b831c405..2ae1ff997e2d 100644 --- a/ydb/library/actors/core/mailbox.h +++ b/ydb/library/actors/core/mailbox.h @@ -5,6 +5,7 @@ #include "executor_pool.h" #include "mailbox_queue_simple.h" #include "mailbox_queue_revolving.h" +#include #include #include #include @@ -27,6 +28,10 @@ namespace NActors { struct TMailboxHeader; + struct TMailboxStats { + ui64 ElapsedCycles = 0; + }; + template struct TMailboxUsageImpl { void Push(ui64 /*localId*/) {} @@ -53,7 +58,8 @@ namespace NActors { enum EType { Simple = 0, Array = 1, - Map = 2 + Map = 2, + Complex = 3, }; }; @@ -79,7 +85,7 @@ namespace NActors { volatile ui32 ExecutionState; ui32 Reserved : 4; // never changes, always zero ui32 Type : 4; // never changes - ui32 ActorPack : 2; + TMailboxActorPack::EType ActorPack : 2; ui32 Knobs : 22; struct TActorPair { @@ -91,6 +97,8 @@ namespace NActors { TActorPair Actors[ARRAY_CAPACITY]; }; + struct alignas(64) TComplexActorInfo; + union TActorsInfo { TActorPair Simple; struct { @@ -100,11 +108,19 @@ namespace NActors { struct { TActorMap* ActorsMap; } Map; + TComplexActorInfo* Complex; } ActorsInfo; + struct alignas(64) TComplexActorInfo{ + TActorsInfo ActorsInfo; + TMailboxActorPack::EType ActorPack; + TMailboxStats Stats; + }; + TMailboxHeader(TMailboxType::EType type); ~TMailboxHeader(); + static bool CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo); bool CleanupActors(); // this interface is used exclusively by executor thread, so implementation is there @@ -119,12 +135,13 @@ namespace NActors { bool UnlockAsFree(bool wouldReschedule); // preceed with releasing lock, but mark as free one bool IsEmpty() const noexcept { - return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0); + return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0) || + (ActorPack == TMailboxActorPack::Complex && ActorsInfo.Complex->ActorPack == TMailboxActorPack::Simple && ActorsInfo.Complex->ActorsInfo.Simple.ActorId == 0); } template - void ForEach(T&& callback) noexcept { - switch (ActorPack) { + static void ForEach(TMailboxActorPack::EType actorPack, TActorsInfo &ActorsInfo, T&& callback) noexcept { + switch (actorPack) { case TMailboxActorPack::Simple: if (ActorsInfo.Simple.ActorId) { callback(ActorsInfo.Simple.ActorId, ActorsInfo.Simple.Actor); @@ -143,10 +160,22 @@ namespace NActors { callback(row.ActorId, row.Actor); } break; + + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } } - IActor* FindActor(ui64 localActorId) noexcept { + template + void ForEach(T&& callback) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + ForEach(static_cast(ActorPack), ActorsInfo, std::move(callback)); + } else { + ForEach(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, std::move(callback)); + } + } + + static IActor* FindActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept { switch (ActorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId == localActorId) @@ -167,14 +196,22 @@ namespace NActors { } break; } - default: - Y_ABORT(); + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } return nullptr; } - void AttachActor(ui64 localActorId, IActor* actor) noexcept { - switch (ActorPack) { + IActor* FindActor(ui64 localActorId) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + return FindActor(static_cast(ActorPack), ActorsInfo, localActorId); + } else { + return FindActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId); + } + } + + static void AttachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId, IActor* actor) noexcept { + switch (actorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId == 0) { ActorsInfo.Simple.ActorId = localActorId; @@ -185,7 +222,7 @@ namespace NActors { ar->Actors[0] = ActorsInfo.Simple; ar->Actors[1] = TActorPair{actor, localActorId}; ActorsInfo.Array.ActorsCount = 2; - ActorPack = TMailboxActorPack::Array; + actorPack = TMailboxActorPack::Array; ActorsInfo.Array.ActorsArray = ar; } break; @@ -201,7 +238,7 @@ namespace NActors { mp->emplace(ActorsInfo.Array.ActorsArray->Actors[i].ActorId, ActorsInfo.Array.ActorsArray->Actors[i].Actor); } mp->emplace(localActorId, actor); - ActorPack = TMailboxActorPack::Map; + actorPack = TMailboxActorPack::Map; ActorsInfo.Array.ActorsCount = 0; delete ActorsInfo.Array.ActorsArray; ActorsInfo.Map.ActorsMap = mp; @@ -210,17 +247,27 @@ namespace NActors { } break; } - default: - Y_ABORT(); + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } } - IActor* DetachActor(ui64 localActorId) noexcept { - Y_DEBUG_ABORT_UNLESS(FindActor(localActorId) != nullptr); + void AttachActor(ui64 localActorId, IActor* actor) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + AttachActor(pack, ActorsInfo, localActorId, actor); + ActorPack = pack; + } else { + AttachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId, actor); + } + } + + static IActor* DetachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept { + Y_DEBUG_ABORT_UNLESS(FindActor(actorPack, ActorsInfo, localActorId) != nullptr); IActor* actorToDestruct = nullptr; - switch (ActorPack) { + switch (actorPack) { case TMailboxActorPack::Simple: { Y_ABORT_UNLESS(ActorsInfo.Simple.ActorId == localActorId); actorToDestruct = ActorsInfo.Simple.Actor; @@ -243,7 +290,7 @@ namespace NActors { ar->Actors[i++] = TActorPair{actor, actorId}; } delete ActorsInfo.Map.ActorsMap; - ActorPack = TMailboxActorPack::Array; + actorPack = TMailboxActorPack::Array; ActorsInfo.Array.ActorsArray = ar; ActorsInfo.Array.ActorsCount = ARRAY_CAPACITY; } @@ -265,16 +312,50 @@ namespace NActors { if (ActorsInfo.Array.ActorsCount == 1) { const TActorPair Actor = ActorsInfo.Array.ActorsArray->Actors[0]; delete ActorsInfo.Array.ActorsArray; - ActorPack = TMailboxActorPack::Simple; + actorPack = TMailboxActorPack::Simple; ActorsInfo.Simple = Actor; } break; } + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } return actorToDestruct; } + IActor* DetachActor(ui64 localActorId) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + IActor* result = DetachActor(pack, ActorsInfo, localActorId); + ActorPack = pack; + return result; + } else { + return DetachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId); + } + } + + void EnableStats() { + TComplexActorInfo* complex = new TComplexActorInfo; + complex->ActorPack = ActorPack; + complex->ActorsInfo = std::move(ActorsInfo); + ActorPack = TMailboxActorPack::Complex; + ActorsInfo.Complex = complex; + } + + void AddElapsedCycles(ui64 elapsed) { + if (ActorPack == TMailboxActorPack::Complex) { + ActorsInfo.Complex->Stats.ElapsedCycles += elapsed; + } + } + + std::optional GetElapsedCycles() { + if (ActorPack == TMailboxActorPack::Complex) { + return ActorsInfo.Complex->Stats.ElapsedCycles; + } + return std::nullopt; + } + std::pair CountMailboxEvents(ui64 localActorId, ui32 maxTraverse); };