diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 4f9932e580f7..17db4f9fe87e 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -134,8 +134,8 @@ using namespace NYql::NDqProto; IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, - TIntrusivePtr arena) { - return new NScanPrivate::TKqpScanComputeActor(executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory), + TIntrusivePtr arena, TComputeActorSchedulingOptions schedulingOptions) { + return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory), settings, memoryLimits, std::move(traceId), std::move(arena)); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index f298353c50a0..3cf258b3e64e 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -48,12 +49,12 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions); IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, - TIntrusivePtr arena); + TIntrusivePtr arena, TComputeActorSchedulingOptions); IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector&& computeActors, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 7a77406889ee..83297b69f206 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -104,7 +104,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { ApplyConfig(config); } - void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) + void ApplyConfig(const NKikimrConfig::TTableServiceConfig::TResourceManager& config) override { MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit()); MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit()); @@ -114,7 +114,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { MinMemFreeSize.store(config.GetMinMemFreeSize()); } - TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) { + TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override { NYql::NDq::TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 0; memoryLimits.MkqlLightProgramMemoryLimit = MkqlLightProgramMemoryLimit.load(); @@ -213,7 +213,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()); IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task, AsyncIoFactory, runtimeSettings, memoryLimits, - std::move(args.TraceId), std::move(args.Arena)); + std::move(args.TraceId), std::move(args.Arena), + std::move(args.SchedulingOptions)); TActorId result = TlsActivationContext->Register(computeActor); info.MutableActorIds().emplace_back(result); return result; @@ -223,7 +224,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { GUCSettings = std::make_shared(args.SerializedGUCSettings); } IActor* computeActor = ::NKikimr::NKqp::CreateKqpComputeActor(args.ExecuterId, args.TxId, args.Task, AsyncIoFactory, - runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings); + runtimeSettings, memoryLimits, std::move(args.TraceId), std::move(args.Arena), FederatedQuerySetup, GUCSettings, + std::move(args.SchedulingOptions)); return args.ShareMailbox ? TlsActivationContext->AsActorContext().RegisterWithSameMailbox(computeActor) : TlsActivationContext->AsActorContext().Register(computeActor); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h index 8f87b0c67787..c2a3325853ce 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h @@ -6,6 +6,8 @@ #include #include +#include + #include namespace NKikimr::NKqp { @@ -124,6 +126,7 @@ struct IKqpNodeComputeActorFactory { const TMaybe& RlPath; TComputeStagesWithScan* ComputesByStages = nullptr; std::shared_ptr State = nullptr; + TComputeActorSchedulingOptions SchedulingOptions = {}; }; typedef std::variant TActorStartResult; @@ -137,4 +140,4 @@ std::shared_ptr MakeKqpCaFactory(const NKikimrConfi NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const std::optional federatedQuerySetup); -} // namespace NKikimr::NKqp::NComputeActor \ No newline at end of file +} // namespace NKikimr::NKqp::NComputeActor diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index ac8f34e1979b..2dbe94b66b2e 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -14,8 +14,8 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) - : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions schedulingOptions) + : TBase(std::move(schedulingOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) { @@ -121,9 +121,12 @@ void TKqpComputeActor::DoBootstrap() { ContinueExecute(); Become(&TKqpComputeActor::StateFunc); + + TBase::DoBoostrap(); } STFUNC(TKqpComputeActor::StateFunc) { + CA_LOG_D("CA StateFunc " << ev->GetTypeRewrite()); try { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute); @@ -278,10 +281,10 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings) + const TGUCSettings::TPtr& GUCSettings, TComputeActorSchedulingOptions cpuOptions) { return new TKqpComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings); + settings, memoryLimits, std::move(traceId), std::move(arena), federatedQuerySetup, GUCSettings, std::move(cpuOptions)); } } // namespace NKqp diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h index 590a9bcab774..613f5e2786c6 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.h @@ -8,17 +8,16 @@ #include #include #include +#include #include #include -#include - namespace NKikimr { namespace NKqp { -class TKqpComputeActor : public TDqSyncComputeActorBase { - using TBase = TDqSyncComputeActorBase; +class TKqpComputeActor : public TSchedulableComputeActorBase { + using TBase = TSchedulableComputeActorBase; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -29,7 +28,8 @@ class TKqpComputeActor : public TDqSyncComputeActorBase { IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + TComputeActorSchedulingOptions); void DoBootstrap(); @@ -68,7 +68,8 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NDqProto::T IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + TComputeActorSchedulingOptions); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 88ddaebdabe2..8947e2740030 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -23,11 +23,11 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50); } // anonymous namespace -TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, +TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr arena) - : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, + : TBase(std::move(cpuOptions), executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena)) , ComputeCtx(settings.StatsMode) , LockTxId(lockTxId) @@ -251,6 +251,8 @@ void TKqpScanComputeActor::DoBootstrap() { ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); Become(&TKqpScanComputeActor::StateFunc); + + TBase::DoBoostrap(); } } diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h index d1bba8c37989..7dbb9f4f8252 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h @@ -2,15 +2,15 @@ #include "kqp_scan_events.h" #include -#include +#include #include #include namespace NKikimr::NKqp::NScanPrivate { -class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase { +class TKqpScanComputeActor: public TSchedulableComputeActorBase { private: - using TBase = NYql::NDq::TDqSyncComputeActorBase; + using TBase = TSchedulableComputeActorBase; NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; @@ -65,7 +65,7 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase arena); diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 70e5169c4546..4c39ee6d89d6 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -8,7 +8,6 @@ #include #include - #include #include @@ -829,6 +828,13 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co "PhyTx/ScanTxTotalTimeMs", NMonitoring::ExponentialHistogram(20, 2, 1)); FullScansExecuted = KqpGroup->GetCounter("FullScans", true); + + SchedulerThrottled = KqpGroup->GetCounter("NodeScheduler/ThrottledUs", true); + SchedulerCapacity = KqpGroup->GetCounter("NodeScheduler/Capacity"); + ComputeActorExecutions = KqpGroup->GetHistogram("NodeScheduler/BatchUs", NMonitoring::ExponentialHistogram(20, 2, 1)); + ComputeActorDelays = KqpGroup->GetHistogram("NodeScheduler/Delays", NMonitoring::ExponentialHistogram(20, 2, 1)); + ThrottledActorsSpuriousActivations = KqpGroup->GetCounter("NodeScheduler/SpuriousActivations", true); + SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1)); } ::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index f302897f1b8d..7d6f38d05fe9 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -409,6 +409,14 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter ::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages; ::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems; + // Scheduler signals + ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerThrottled; + ::NMonitoring::TDynamicCounters::TCounterPtr SchedulerCapacity; + NMonitoring::THistogramPtr ComputeActorExecutions; + NMonitoring::THistogramPtr ComputeActorDelays; + ::NMonitoring::TDynamicCounters::TCounterPtr ThrottledActorsSpuriousActivations; + NMonitoring::THistogramPtr SchedulerDelays; + // Sequences counters ::NMonitoring::TDynamicCounters::TCounterPtr SequencerActorsCount; ::NMonitoring::TDynamicCounters::TCounterPtr SequencerErrors; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 60b168150bd9..834f90beb192 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -53,6 +53,10 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe ret.HeavyProgram = opts.GetHasMapJoin(); } +bool LimitCPU(TIntrusivePtr ctx) { + return ctx->PoolId && ctx->PoolConfig.has_value() && ctx->PoolConfig->TotalCpuLimitPercentPerNode > 0; +} + } bool TKqpPlanner::UseMockEmptyPlanner = false; @@ -101,6 +105,10 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) LOG_E("Database not set, use " << Database); } } + + if (LimitCPU(UserRequestContext)) { + AllowSinglePartitionOpt = false; + } } // ResourcesSnapshot, ResourceEstimations @@ -223,6 +231,13 @@ std::unique_ptr TKqpPlanner::SerializeReque request.SetSerializedGUCSettings(SerializedGUCSettings); } + request.SetSchedulerGroup(UserRequestContext->PoolId); + request.SetDatabase(Database); + if (UserRequestContext->PoolConfig.has_value()) { + request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode); + request.SetMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0); + } + return result; } diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index a2c6e982596a..b4f8931674e7 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -80,6 +81,13 @@ class TKqpNodeService : public TActorBootstrapped { if (config.HasIteratorReadQuotaSettings()) { SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings()); } + + SchedulerOptions = { + .AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()), + .ForgetOverflowTimeout = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()), + .ActivePoolPollingTimeout = TDuration::Seconds(config.GetComputeSchedulerSettings().GetActivePoolPollingSec()), + .Counters = counters, + }; } void Bootstrap() { @@ -100,6 +108,10 @@ class TKqpNodeService : public TActorBootstrapped { Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); Become(&TKqpNodeService::WorkState); + + Scheduler = std::make_shared(); + SchedulerOptions.Scheduler = Scheduler; + SchedulerActorId = RegisterWithSameMailbox(CreateSchedulerActor(SchedulerOptions)); } private: @@ -179,6 +191,8 @@ class TKqpNodeService : public TActorBootstrapped { const TString& serializedGUCSettings = ev->Get()->Record.HasSerializedGUCSettings() ? ev->Get()->Record.GetSerializedGUCSettings() : ""; + auto schedulerNow = TlsActivationContext->Monotonic(); + // start compute actors TMaybe rlPath = Nothing(); if (msgRtSettings.HasRlPath()) { @@ -190,6 +204,32 @@ class TKqpNodeService : public TActorBootstrapped { const ui32 tasksCount = msg.GetTasks().size(); for (auto& dqTask: *msg.MutableTasks()) { + TString group = msg.GetSchedulerGroup(); + + TComputeActorSchedulingOptions schedulingTaskOptions { + .Now = schedulerNow, + .SchedulerActorId = SchedulerActorId, + .Scheduler = Scheduler.get(), + .Group = group, + .Weight = 1, + .NoThrottle = false, + .Counters = Counters + }; + + if (SchedulerOptions.Scheduler->Disabled(group)) { + auto share = msg.GetMaxCpuShare(); + if (share > 0) { + Scheduler->UpdateMaxShare(group, share, schedulerNow); + Send(SchedulerActorId, new TEvSchedulerNewPool(msg.GetDatabase(), group, share)); + } else { + schedulingTaskOptions.NoThrottle = true; + } + } + + if (!schedulingTaskOptions.NoThrottle) { + schedulingTaskOptions.Handle = SchedulerOptions.Scheduler->Enroll(schedulingTaskOptions.Group, schedulingTaskOptions.Weight, schedulingTaskOptions.Now); + } + auto result = CaFactory_->CreateKqpComputeActor({ .ExecuterId = request.Executer, .TxId = txId, @@ -210,7 +250,8 @@ class TKqpNodeService : public TActorBootstrapped { .ShareMailbox = false, .RlPath = rlPath, .ComputesByStages = &computesByStage, - .State = State_ + .State = State_, + .SchedulingOptions = std::move(schedulingTaskOptions), }); if (const auto* rmResult = std::get_if(&result)) { @@ -352,7 +393,6 @@ class TKqpNodeService : public TActorBootstrapped { auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); - } void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) { @@ -445,6 +485,11 @@ class TKqpNodeService : public TActorBootstrapped { NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const std::optional FederatedQuerySetup; + std::shared_ptr Scheduler; + TSchedulerActorOptions SchedulerOptions; + TActorId SchedulerActorId; + + //state sharded by TxId std::shared_ptr State_; }; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp new file mode 100644 index 000000000000..d24bc60e870a --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -0,0 +1,492 @@ +#include "kqp_compute_scheduler.h" + +#include + +#include +#include + +#include +#include + +namespace { + static constexpr ui64 FromDuration(TDuration d) { + return d.MicroSeconds(); + } + + static constexpr TDuration ToDuration(double t) { + return TDuration::MicroSeconds(t); + } + + static constexpr double MinEntitiesWeight = 1e-8; + + static constexpr TDuration AvgBatch = TDuration::MicroSeconds(100); +} + +namespace NKikimr { +namespace NKqp { + +template +class TMultiThreadView { +public: + TMultiThreadView(std::atomic* usage, T* slot) + : Usage(usage) + , Slot(slot) + { + Usage->fetch_add(1); + } + + const T* get() { + return Slot; + } + + ~TMultiThreadView() { + Usage->fetch_sub(1); + } + +private: + std::atomic* Usage; + T* Slot; +}; + +template +class TMultithreadPublisher { +public: + void Publish() { + auto oldVal = CurrentT.load(); + auto newVal = 1 - oldVal; + CurrentT.store(newVal); + while (true) { + if (Usage[oldVal].load() == 0) { + Slots[oldVal] = Slots[newVal]; + return; + } + } + } + + T* Next() { + return &Slots[1 - CurrentT.load()]; + } + + TMultiThreadView Current() { + while (true) { + auto val = CurrentT.load(); + TMultiThreadView view(&Usage[val], &Slots[val]); + if (CurrentT.load() == val) { + return view; + } + } + } + +private: + std::atomic CurrentT = 0; + std::atomic Usage[2] = {0, 0}; + T Slots[2]; +}; + +TSchedulerEntityHandle::TSchedulerEntityHandle(TSchedulerEntity* ptr) + : Ptr(ptr) +{ +} + +TSchedulerEntityHandle::TSchedulerEntityHandle(){} + +TSchedulerEntityHandle::TSchedulerEntityHandle(TSchedulerEntityHandle&& other) { + Ptr.swap(other.Ptr); +} + +TSchedulerEntityHandle& TSchedulerEntityHandle::operator = (TSchedulerEntityHandle&& other) { + Ptr.swap(other.Ptr); + return *this; +} + +TSchedulerEntityHandle::~TSchedulerEntityHandle() = default; + +class TSchedulerEntity { +public: + TSchedulerEntity() {} + ~TSchedulerEntity() {} + + struct TGroupMutableStats { + double Weight = 0; + TMonotonic LastNowRecalc; + bool Disabled = false; + double EntitiesWeight = 0; + double MaxDeviation = 0; + double MaxLimitDeviation = 0; + + ssize_t TrackedBefore = 0; + + double Limit(TMonotonic now) const { + return FromDuration(now - LastNowRecalc) * Weight + MaxLimitDeviation + TrackedBefore; + } + }; + + struct TGroupRecord { + std::atomic TrackedMicroSeconds = 0; + std::atomic DelayedSumBatches = 0; + std::atomic DelayedCount = 0; + + TMultithreadPublisher MutableStats; + }; + + TGroupRecord* Group; + double Weight; + double Vruntime = 0; + double Vstart; + + double Vcurrent; + + TDuration MaxDelay; + + static constexpr double WakeupDelay = 1.1; + static constexpr double BatchCalcDecay = 0; + TDuration BatchTime = AvgBatch; + + TDuration OverflowToleranceTimeout = TDuration::Seconds(1); + + static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10); + + size_t Wakeups = 0; + bool isThrottled = false; + + void TrackTime(TDuration time, TMonotonic) { + auto group = Group->MutableStats.Current(); + Group->TrackedMicroSeconds.fetch_add(time.MicroSeconds()); + } + + void UpdateBatchTime(TDuration time) { + Wakeups = 0; + auto newBatch = BatchTime * BatchCalcDecay + time * (1 - BatchCalcDecay); + if (isThrottled) { + MarkResumed(); + BatchTime = newBatch; + MarkThrottled(); + } else { + BatchTime = newBatch; + } + } + + TMaybe GroupDelay(TMonotonic now) { + auto group = Group->MutableStats.Current(); + auto limit = group.get()->Limit(now); + auto tracked = Group->TrackedMicroSeconds.load(); + //double Coeff = pow(WakeupDelay, Wakeups); + if (limit > tracked) { + return {}; + } else { + return Min(MaxDelay, ToDuration(/*Coeff * */(tracked - limit + + Max(0, Group->DelayedSumBatches.load()) + BatchTime.MicroSeconds() + + ActivationPenalty.MicroSeconds() * (Group->DelayedCount.load() + 1) + + group.get()->MaxLimitDeviation) / group.get()->Weight)); + } + } + + void MarkThrottled() { + isThrottled = true; + Group->DelayedSumBatches.fetch_add(BatchTime.MicroSeconds()); + Group->DelayedCount.fetch_add(1); + } + + void MarkResumed() { + isThrottled = false; + Group->DelayedSumBatches.fetch_sub(BatchTime.MicroSeconds()); + Group->DelayedCount.fetch_sub(1); + } +}; + +struct TComputeScheduler::TImpl { + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> VtimeCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> EntitiesWeightCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> LimitCounters; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> WeightCounters; + + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerClock; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerLimitUs; + TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerTrackedUs; + + THashMap PoolId; + std::vector> Records; + + double SumCores; + + TIntrusivePtr Counters; + TDuration SmoothPeriod = TDuration::MilliSeconds(100); + TDuration ForgetInteval = TDuration::Seconds(2); + + TDuration MaxDelay = TDuration::Seconds(10); + + void AssignWeights() { } + + void CreateGroup(TString groupName, double maxShare, NMonotonic::TMonotonic now) { + PoolId[groupName] = Records.size(); + auto group = std::make_unique(); + group->MutableStats.Next()->LastNowRecalc = now; + group->MutableStats.Next()->Weight = maxShare; + Records.push_back(std::move(group)); + } +}; + +TComputeScheduler::TComputeScheduler() { + Impl = std::make_unique(); +} + +TComputeScheduler::~TComputeScheduler() = default; + +TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight, TMonotonic now) { + Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group"); + auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get(); + groupEntry->MutableStats.Next()->EntitiesWeight += weight; + Impl->AssignWeights(); + AdvanceTime(now); + + auto result = std::make_unique(); + result->Group = groupEntry; + result->Weight = weight; + result->MaxDelay = Impl->MaxDelay; + + return TSchedulerEntityHandle(result.release()); +} + +void TComputeScheduler::AdvanceTime(TMonotonic now) { + if (Impl->Counters) { + if (Impl->VtimeCounters.size() < Impl->Records.size()) { + Impl->VtimeCounters.resize(Impl->Records.size()); + Impl->EntitiesWeightCounters.resize(Impl->Records.size()); + Impl->LimitCounters.resize(Impl->Records.size()); + Impl->WeightCounters.resize(Impl->Records.size()); + Impl->SchedulerClock.resize(Impl->Records.size()); + Impl->SchedulerLimitUs.resize(Impl->Records.size()); + Impl->SchedulerTrackedUs.resize(Impl->Records.size()); + + for (auto& [k, i] : Impl->PoolId) { + auto group = Impl->Counters->GetKqpCounters()->GetSubgroup("NodeScheduler/Group", k); + Impl->VtimeCounters[i] = group->GetCounter("VTime", true); + Impl->EntitiesWeightCounters[i] = group->GetCounter("Entities", false); + Impl->LimitCounters[i] = group->GetCounter("Limit", true); + Impl->WeightCounters[i] = group->GetCounter("Weight", false); + Impl->SchedulerClock[i] = group->GetCounter("Clock", false); + Impl->SchedulerTrackedUs[i] = group->GetCounter("Tracked", true); + Impl->SchedulerLimitUs[i] = group->GetCounter("AbsoluteLimit", true); + } + } + } + for (size_t i = 0; i < Impl->Records.size(); ++i) { + auto& v = Impl->Records[i]->MutableStats; + { + auto group = v.Current(); + if (group.get()->LastNowRecalc > now) { + continue; + } + double delta = 0; + + auto tracked = Impl->Records[i]->TrackedMicroSeconds.load(); + v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight; + v.Next()->LastNowRecalc = now; + v.Next()->TrackedBefore = + Max( + tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight, + Min(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked)); + + if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) { + delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight; + v.Next()->MaxDeviation = (FromDuration(Impl->SmoothPeriod) * v.Next()->Weight) / v.Next()->EntitiesWeight; + } + + if (Impl->VtimeCounters.size() > i && Impl->VtimeCounters[i]) { + Impl->SchedulerLimitUs[i]->Set(group.get()->Limit(now)); + Impl->SchedulerTrackedUs[i]->Set(Impl->Records[i]->TrackedMicroSeconds.load()); + Impl->SchedulerClock[i]->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds()); + Impl->VtimeCounters[i]->Add(delta); + Impl->EntitiesWeightCounters[i]->Set(v.Next()->EntitiesWeight); + Impl->LimitCounters[i]->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight); + Impl->WeightCounters[i]->Set(group.get()->Weight); + } + } + v.Publish(); + } +} + +void TComputeScheduler::Deregister(TSchedulerEntity& self, TMonotonic now) { + auto* group = self.Group->MutableStats.Next(); + group->EntitiesWeight -= self.Weight; + + Impl->AssignWeights(); + AdvanceTime(now); +} + +void TSchedulerEntityHandle::TrackTime(TDuration time, TMonotonic now) { + Ptr->TrackTime(time, now); +} + +void TSchedulerEntityHandle::ReportBatchTime(TDuration time) { + Ptr->UpdateBatchTime(time); +} + +TMaybe TSchedulerEntityHandle::Delay(TMonotonic now) { + return Ptr->GroupDelay(now); +} + +void TSchedulerEntityHandle::MarkResumed() { + Ptr->MarkResumed(); +} + +void TSchedulerEntityHandle::MarkThrottled() { + Ptr->MarkThrottled(); +} + +void TSchedulerEntityHandle::Clear() { + Ptr.reset(); +} + +void TComputeScheduler::ReportCounters(TIntrusivePtr counters) { + Impl->Counters = counters; +} + +void TComputeScheduler::SetMaxDeviation(TDuration period) { + Impl->SmoothPeriod = period; +} + +void TComputeScheduler::SetForgetInterval(TDuration period) { + Impl->ForgetInteval = period; +} + +bool TComputeScheduler::Disabled(TString group) { + auto ptr = Impl->PoolId.FindPtr(group); + return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled; +} + + +bool TComputeScheduler::Disable(TString group, TMonotonic now) { + auto ptr = Impl->PoolId.FindPtr(group); + if (Impl->Records[*ptr]->MutableStats.Current().get()->Weight > MinEntitiesWeight) { + return false; + } + Impl->Records[*ptr]->MutableStats.Next()->Disabled = true; + AdvanceTime(now); + return true; +} + +void TComputeScheduler::UpdateMaxShare(TString group, double share, TMonotonic now) { + auto ptr = Impl->PoolId.FindPtr(group); + if (!ptr) { + Impl->CreateGroup(group, share, now); + } else { + auto& record = Impl->Records[*ptr]; + record->MutableStats.Next()->Weight = share; + } + AdvanceTime(now); +} + + +::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter(TString group) const { + return Impl->Counters + ->GetKqpCounters() + ->GetSubgroup("NodeScheduler/Group", group) + ->GetCounter("Usage", true); +} + + +struct TEvPingPool : public TEventLocal { + TString Database; + TString Pool; + + TEvPingPool(TString database, TString pool) + : Database(database) + , Pool(pool) + { + } +}; + +class TSchedulerActor : public TActorBootstrapped { +public: + TSchedulerActor(TSchedulerActorOptions options) + : Opts(options) + { + if (!Opts.Scheduler) { + Opts.Scheduler = std::make_shared(); + } + Opts.Scheduler->SetForgetInterval(Opts.ForgetOverflowTimeout); + Opts.Scheduler->ReportCounters(Opts.Counters); + } + + void Bootstrap() { + Schedule(Opts.AdvanceTimeInterval, new TEvents::TEvWakeup()); + + ui32 tableServiceConfigKind = (ui32) NKikimrConsole::TConfigItem::TableServiceConfigItem; + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), + new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({tableServiceConfigKind}), + IEventHandle::FlagTrackDelivery); + + Become(&TSchedulerActor::State); + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + + hFunc(NWorkload::TEvUpdatePoolInfo, Handle); + + hFunc(TEvSchedulerDeregister, Handle); + hFunc(TEvSchedulerNewPool, Handle); + hFunc(TEvPingPool, Handle); + hFunc(TEvents::TEvWakeup, Handle); + default: { + Y_ABORT("Unexpected event 0x%x for TKqpSchedulerService", ev->GetTypeRewrite()); + } + } + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_NODE, "Subscribed for config changes"); + } + + void Handle(TEvSchedulerDeregister::TPtr& ev) { + if (ev->Get()->SchedulerEntity) { + Opts.Scheduler->Deregister(*ev->Get()->SchedulerEntity, TlsActivationContext->Monotonic()); + } + } + + void Handle(TEvSchedulerNewPool::TPtr& ev) { + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + Opts.Scheduler->UpdateMaxShare(ev->Get()->Pool, ev->Get()->MaxShare, TlsActivationContext->Monotonic()); + } + + void Handle(TEvPingPool::TPtr& ev) { + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(ev->Get()->Database, ev->Get()->Pool)); + } + + void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { + if (ev->Get()->Config.has_value()) { + Opts.Scheduler->UpdateMaxShare(ev->Get()->PoolId, ev->Get()->Config->TotalCpuLimitPercentPerNode / 100.0, TlsActivationContext->Monotonic()); + } else { + if (!Opts.Scheduler->Disable(ev->Get()->PoolId, TlsActivationContext->Monotonic())) { + Schedule(Opts.ActivePoolPollingTimeout.ToDeadLine(), new TEvPingPool(ev->Get()->Database, ev->Get()->PoolId)); + } + } + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + Opts.Scheduler->AdvanceTime(TlsActivationContext->Monotonic()); + Schedule(Opts.AdvanceTimeInterval, new TEvents::TEvWakeup()); + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + auto &event = ev->Get()->Record; + auto& config = event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings(); + + Opts.AdvanceTimeInterval = TDuration::MicroSeconds(config.GetAdvanceTimeIntervalUsec()); + Opts.ActivePoolPollingTimeout = TDuration::Seconds(config.GetActivePoolPollingSec()); + Opts.Scheduler->SetForgetInterval(TDuration::MicroSeconds(config.GetForgetOverflowTimeoutUsec())); + } + +private: + TSchedulerActorOptions Opts; +}; + +IActor* CreateSchedulerActor(TSchedulerActorOptions opts) { + return new TSchedulerActor(opts); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h new file mode 100644 index 000000000000..fc58b4e8dc98 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -0,0 +1,334 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +#include + +namespace NKikimr { +namespace NKqp { + +class TSchedulerEntity; +class TSchedulerEntityHandle { +private: + std::unique_ptr Ptr; + +public: + TSchedulerEntityHandle(TSchedulerEntity*); + + TSchedulerEntityHandle(); + TSchedulerEntityHandle(TSchedulerEntityHandle&&); + + TSchedulerEntityHandle& operator = (TSchedulerEntityHandle&&); + + bool Defined() const { + return Ptr.get() != nullptr; + } + + operator bool () const { + return Defined(); + } + + TSchedulerEntity& operator*() { + return *Ptr; + } + + void TrackTime(TDuration time, TMonotonic now); + void ReportBatchTime(TDuration time); + + TMaybe Delay(TMonotonic now); + + void MarkThrottled(); + void MarkResumed(); + + double EstimateWeight(TMonotonic now, TDuration minTime); + + void Clear(); + + ~TSchedulerEntityHandle(); +}; + +class TComputeScheduler { +public: + TComputeScheduler(); + ~TComputeScheduler(); + + void ReportCounters(TIntrusivePtr); + + void UpdateMaxShare(TString, double, TMonotonic now); + + void SetMaxDeviation(TDuration); + void SetForgetInterval(TDuration); + ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const; + + TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now); + + void AdvanceTime(TMonotonic now); + + void Deregister(TSchedulerEntity& self, TMonotonic now); + + bool Disabled(TString group); + bool Disable(TString group, TMonotonic now); + +private: + struct TImpl; + std::unique_ptr Impl; +}; + +struct TComputeActorSchedulingOptions { + TMonotonic Now; + NActors::TActorId SchedulerActorId; + TSchedulerEntityHandle Handle; + TComputeScheduler* Scheduler; + TString Group = ""; + double Weight = 1; + bool NoThrottle = true; + TIntrusivePtr Counters = nullptr; +}; + +struct TKqpComputeSchedulerEvents { + enum EKqpComputeSchedulerEvents { + EvDeregister = EventSpaceBegin(TKikimrEvents::ES_KQP) + 400, + EvNewPool, + EvPingPool, + }; +}; + +struct TEvSchedulerDeregister : public TEventLocal { + TSchedulerEntityHandle SchedulerEntity; + + TEvSchedulerDeregister(TSchedulerEntityHandle entity) + : SchedulerEntity(std::move(entity)) + { + } +}; + +struct TEvSchedulerNewPool : public TEventLocal { + TString Database; + TString Pool; + double MaxShare; + + TEvSchedulerNewPool(TString database, TString pool, double maxShare) + : Database(database) + , Pool(pool) + , MaxShare(maxShare) + { + } +}; + + +template +class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase { +private: + using TBase = NYql::NDq::TDqSyncComputeActorBase; + + static constexpr double SecToUsec = 1e6; + +public: + template + TSchedulableComputeActorBase(TComputeActorSchedulingOptions options, TArgs&&... args) + : TBase(std::forward(args)...) + , SelfHandle(std::move(options.Handle)) + , SchedulerActorId(options.SchedulerActorId) + , NoThrottle(options.NoThrottle) + , Counters(options.Counters) + , Group(options.Group) + , Weight(options.Weight) + { + if (!NoThrottle) { + Y_ABORT_UNLESS(Counters); + Y_ABORT_UNLESS(SelfHandle); + GroupUsage = options.Scheduler->GetGroupUsageCounter(options.Group); + } else { + Y_ABORT_UNLESS(!SelfHandle); + } + } + + static constexpr ui64 ResumeWakeupTag = 201; + + TMonotonic Now() { + return TMonotonic::Now(); + //return TlsActivationContext->Monotonic(); + } + + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { + auto tag = ev->Get()->Tag; + CA_LOG_D("wakeup with tag " << tag); + if (tag == ResumeWakeupTag) { + TBase::DoExecute(); + } else { + TBase::HandleExecuteBase(ev); + } + } + + STFUNC(BaseStateFuncBody) { + AccountActorSystemStats(TlsActivationContext->Monotonic()); + // we assume that exception handling is done in parents/descendents + switch (ev->GetTypeRewrite()) { + hFunc(NActors::TEvents::TEvWakeup, TSchedulableComputeActorBase::HandleWakeup); + default: + TBase::BaseStateFuncBody(ev); + } + } + + void DoBoostrap() { + if (!SelfHandle.Defined()) { + return; + } + + OldActivationStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + if (!OldActivationStats.has_value()) { + TlsActivationContext->AsActorContext().Mailbox.EnableStats(); + OldActivationStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + } + + Y_ABORT_UNLESS(OldActivationStats.has_value()); + } + +private: + void ReportThrottledTime(TMonotonic now) { + if (Counters && Throttled) { + Counters->SchedulerThrottled->Add((now - *Throttled).MicroSeconds()); + } + if (Throttled) { + SelfHandle.MarkResumed(); + Throttled.Clear(); + } + } + +protected: + void DoExecuteImpl() override { + if (!SelfHandle.Defined()) { + if (NoThrottle) { + return TBase::DoExecuteImpl(); + } else { + return; + } + } + + TMonotonic now = Now(); + AccountActorSystemStats(now); + TMaybe delay = CalcDelay(now); + bool executed = false; + if (NoThrottle || !delay) { + ReportThrottledTime(now); + executed = true; + + ExecutionTimer.ConstructInPlace(); + TBase::DoExecuteImpl(); + + TDuration passed = TDuration::MicroSeconds(ExecutionTimer->Passed() * SecToUsec); + + if (Finished) { + return; + } + TrackedWork += passed; + SelfHandle.ReportBatchTime(passed); + SelfHandle.TrackTime(passed, now); + GroupUsage->Add(passed.MicroSeconds()); + Counters->ComputeActorExecutions->Collect(passed.MicroSeconds()); + } + if (delay) { + Counters->SchedulerDelays->Collect(delay->MicroSeconds()); + CA_LOG_D("schedule wakeup after " << delay->MicroSeconds() << " msec "); + this->Schedule(*delay, new NActors::TEvents::TEvWakeup(ResumeWakeupTag)); + } + + if (!executed) { + if (!Throttled) { + SelfHandle.MarkThrottled(); + Throttled = now; + } else { + Counters->ThrottledActorsSpuriousActivations->Inc(); + } + } + ExecutionTimer.Clear(); + } + + void AccountActorSystemStats(NMonotonic::TMonotonic now) { + if (!SelfHandle.Defined()) { + return; + } + + auto newStats = TlsActivationContext->AsActorContext().Mailbox.GetElapsedCycles(); + Y_ABORT_UNLESS(OldActivationStats.has_value()); + Y_ABORT_UNLESS(newStats.has_value()); + Y_ABORT_UNLESS(*newStats >= *OldActivationStats); + auto toAccount = TDuration::MicroSeconds(NHPTimer::GetSeconds(*newStats - *OldActivationStats) * 1e6); + { + auto minTime = Min(toAccount, TrackedWork); + TrackedWork -= minTime; + toAccount -= minTime; + } + + GroupUsage->Add(toAccount.MicroSeconds()); + SelfHandle.TrackTime(toAccount, now); + OldActivationStats = newStats; + } + + TMaybe CalcDelay(NMonotonic::TMonotonic now) { + auto result = SelfHandle.Delay(now); + Counters->ComputeActorDelays->Collect(result.GetOrElse(TDuration::Zero()).MicroSeconds()); + if (NoThrottle || !result.Defined()) { + return {}; + } else { + return result; + } + } + + void PassAway() override { + Finished = true; + if (SelfHandle) { + auto now = Now(); + if (Throttled) { + SelfHandle.MarkResumed(); + } + if (ExecutionTimer) { + TDuration passed = TDuration::MicroSeconds(ExecutionTimer->Passed() * SecToUsec); + SelfHandle.TrackTime(passed, now); + GroupUsage->Add(passed.MicroSeconds()); + } + } + if (SelfHandle) { + auto finishEv = MakeHolder(std::move(SelfHandle)); + this->Send(SchedulerActorId, finishEv.Release()); + } + TBase::PassAway(); + } + +private: + TMaybe ExecutionTimer; + TDuration TrackedWork = TDuration::Zero(); + TMaybe Throttled; + TSchedulerEntityHandle SelfHandle; + NActors::TActorId SchedulerActorId; + bool NoThrottle; + bool Finished = false; + + std::optional OldActivationStats; + + TIntrusivePtr Counters; + ::NMonitoring::TDynamicCounters::TCounterPtr GroupUsage; + + TString Group; + double Weight; +}; + +struct TSchedulerActorOptions { + std::shared_ptr Scheduler; + TDuration AdvanceTimeInterval; + TDuration ForgetOverflowTimeout; + TDuration ActivePoolPollingTimeout; + TIntrusivePtr Counters; +}; + +IActor* CreateSchedulerActor(TSchedulerActorOptions); + +} // namespace NKqp +} // namespace NKikimR diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b9bb9f748dd4..aa17440a948d 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -1,4 +1,5 @@ #include "kqp_read_actor.h" +#include "kqp_compute_scheduler.h" #include #include diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index 3801d1df0ea0..615ccbc9cbf4 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -5,6 +5,7 @@ SRCS( kqp_effects.cpp kqp_output_stream.cpp kqp_program_builder.cpp + kqp_compute_scheduler.cpp kqp_read_actor.cpp kqp_read_iterator_common.cpp kqp_read_table.cpp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index c9b107e2732b..fa9a42f981a8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -584,6 +584,10 @@ message TEvStartKqpTasksRequest { optional bool StartAllOrFail = 6 [default = true]; optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value optional string SerializedGUCSettings = 8; + optional string SchedulerGroup = 9; + optional double MemoryPoolPercent = 10 [default = 100]; + optional string Database = 11; + optional double MaxCpuShare = 12; optional uint64 LockTxId = 13; optional uint32 LockNodeId = 14; } diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index ce437818af6a..a0fc6876ea9a 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -309,4 +309,16 @@ message TTableServiceConfig { optional bool EnableImplicitQueryParameterTypes = 66 [ default = false ]; + optional string EnableSpillingNodes = 67 [ default = "All" ]; + + message TComputeSchedulerSettings { + optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000]; + optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000]; + optional uint64 ActivePoolPollingSec = 3 [default = 10]; + } + + reserved 68; + optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; + + optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; }; diff --git a/ydb/core/resource_pools/resource_pool_settings.cpp b/ydb/core/resource_pools/resource_pool_settings.cpp index 5303229073f5..77d0e3a8d493 100644 --- a/ydb/core/resource_pools/resource_pool_settings.cpp +++ b/ydb/core/resource_pools/resource_pool_settings.cpp @@ -48,7 +48,8 @@ std::unordered_map TPoolSettings::GetProperti {"concurrent_query_limit", &ConcurrentQueryLimit}, {"queue_size", &QueueSize}, {"query_memory_limit_percent_per_node", &QueryMemoryLimitPercentPerNode}, - {"database_load_cpu_threshold", &DatabaseLoadCpuThreshold} + {"database_load_cpu_threshold", &DatabaseLoadCpuThreshold}, + {"total_cpu_limit_percent_per_node", &TotalCpuLimitPercentPerNode} }; if (!restricted) { properties.insert({"query_cancel_after_seconds", &QueryCancelAfter}); diff --git a/ydb/core/resource_pools/resource_pool_settings.h b/ydb/core/resource_pools/resource_pool_settings.h index edc24422d5a4..73a4031dea10 100644 --- a/ydb/core/resource_pools/resource_pool_settings.h +++ b/ydb/core/resource_pools/resource_pool_settings.h @@ -35,6 +35,7 @@ struct TPoolSettings : public TSettingsBase { TDuration QueryCancelAfter = TDuration::Zero(); // 0 = disabled TPercent QueryMemoryLimitPercentPerNode = -1; // Percent from node memory capacity, -1 = disabled TPercent DatabaseLoadCpuThreshold = -1; // -1 = disabled + TPercent TotalCpuLimitPercentPerNode = -1; // -1 = disabled }; } // namespace NKikimr::NResourcePool diff --git a/ydb/library/actors/core/actor.cpp b/ydb/library/actors/core/actor.cpp index 179989fa2309..355e80c0df02 100644 --- a/ydb/library/actors/core/actor.cpp +++ b/ydb/library/actors/core/actor.cpp @@ -161,6 +161,10 @@ namespace NActors { return NHPTimer::GetSeconds(GetCurrentEventTicks()); } + void TActivationContext::EnableMailboxStats() { + TlsActivationContext->Mailbox.EnableStats(); + } + TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept { return TlsActivationContext->ExecutorThread.RegisterActor(actor, mailboxType, poolId, SelfActorId); } diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h index 7385472d26e0..504335fa6a3d 100644 --- a/ydb/library/actors/core/actor.h +++ b/ydb/library/actors/core/actor.h @@ -3,6 +3,7 @@ #include "actorsystem.h" #include "event.h" #include "executor_thread.h" +#include "mailbox.h" #include "monotonic.h" #include "thread_context.h" @@ -130,6 +131,8 @@ namespace NActors { static i64 GetCurrentEventTicks(); static double GetCurrentEventTicksAsSeconds(); + + static void EnableMailboxStats(); }; struct TActorContext: public TActivationContext { diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index e9287e2059d9..acc82a046489 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -269,6 +269,7 @@ namespace NActors { Ctx.AddElapsedCycles(activityType, hpnow - hpprev); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter); + mailbox->AddElapsedCycles(elapsed); if (elapsed > 1000000) { LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); } @@ -372,7 +373,7 @@ namespace NActors { break; // empty queue, leave } } - TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release); + TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release); TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); NProfiling::TMemoryTagScope::Reset(0); diff --git a/ydb/library/actors/core/mailbox.cpp b/ydb/library/actors/core/mailbox.cpp index c208a4813f81..6b7ba86431a1 100644 --- a/ydb/library/actors/core/mailbox.cpp +++ b/ydb/library/actors/core/mailbox.cpp @@ -373,9 +373,9 @@ namespace NActors { CleanupActors(); } - bool TMailboxHeader::CleanupActors() { + bool TMailboxHeader::CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo) { bool done = true; - switch (ActorPack) { + switch (actorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId != 0) { delete ActorsInfo.Simple.Actor; @@ -399,13 +399,31 @@ namespace NActors { done = false; break; } + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } - ActorPack = TMailboxActorPack::Simple; + actorPack = TMailboxActorPack::Simple; ActorsInfo.Simple.ActorId = 0; ActorsInfo.Simple.Actor = nullptr; return done; } + bool TMailboxHeader::CleanupActors() { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + bool done = CleanupActors(pack, ActorsInfo); + ActorPack = pack; + return done; + } else { + bool done = CleanupActors(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo); + delete ActorsInfo.Complex; + ActorPack = TMailboxActorPack::Simple; + ActorsInfo.Simple.ActorId = 0; + ActorsInfo.Simple.Actor = nullptr; + return done; + } + } + std::pair TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) { switch (Type) { case TMailboxType::Simple: diff --git a/ydb/library/actors/core/mailbox.h b/ydb/library/actors/core/mailbox.h index e086b831c405..2ae1ff997e2d 100644 --- a/ydb/library/actors/core/mailbox.h +++ b/ydb/library/actors/core/mailbox.h @@ -5,6 +5,7 @@ #include "executor_pool.h" #include "mailbox_queue_simple.h" #include "mailbox_queue_revolving.h" +#include #include #include #include @@ -27,6 +28,10 @@ namespace NActors { struct TMailboxHeader; + struct TMailboxStats { + ui64 ElapsedCycles = 0; + }; + template struct TMailboxUsageImpl { void Push(ui64 /*localId*/) {} @@ -53,7 +58,8 @@ namespace NActors { enum EType { Simple = 0, Array = 1, - Map = 2 + Map = 2, + Complex = 3, }; }; @@ -79,7 +85,7 @@ namespace NActors { volatile ui32 ExecutionState; ui32 Reserved : 4; // never changes, always zero ui32 Type : 4; // never changes - ui32 ActorPack : 2; + TMailboxActorPack::EType ActorPack : 2; ui32 Knobs : 22; struct TActorPair { @@ -91,6 +97,8 @@ namespace NActors { TActorPair Actors[ARRAY_CAPACITY]; }; + struct alignas(64) TComplexActorInfo; + union TActorsInfo { TActorPair Simple; struct { @@ -100,11 +108,19 @@ namespace NActors { struct { TActorMap* ActorsMap; } Map; + TComplexActorInfo* Complex; } ActorsInfo; + struct alignas(64) TComplexActorInfo{ + TActorsInfo ActorsInfo; + TMailboxActorPack::EType ActorPack; + TMailboxStats Stats; + }; + TMailboxHeader(TMailboxType::EType type); ~TMailboxHeader(); + static bool CleanupActors(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo); bool CleanupActors(); // this interface is used exclusively by executor thread, so implementation is there @@ -119,12 +135,13 @@ namespace NActors { bool UnlockAsFree(bool wouldReschedule); // preceed with releasing lock, but mark as free one bool IsEmpty() const noexcept { - return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0); + return (ActorPack == TMailboxActorPack::Simple && ActorsInfo.Simple.ActorId == 0) || + (ActorPack == TMailboxActorPack::Complex && ActorsInfo.Complex->ActorPack == TMailboxActorPack::Simple && ActorsInfo.Complex->ActorsInfo.Simple.ActorId == 0); } template - void ForEach(T&& callback) noexcept { - switch (ActorPack) { + static void ForEach(TMailboxActorPack::EType actorPack, TActorsInfo &ActorsInfo, T&& callback) noexcept { + switch (actorPack) { case TMailboxActorPack::Simple: if (ActorsInfo.Simple.ActorId) { callback(ActorsInfo.Simple.ActorId, ActorsInfo.Simple.Actor); @@ -143,10 +160,22 @@ namespace NActors { callback(row.ActorId, row.Actor); } break; + + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } } - IActor* FindActor(ui64 localActorId) noexcept { + template + void ForEach(T&& callback) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + ForEach(static_cast(ActorPack), ActorsInfo, std::move(callback)); + } else { + ForEach(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, std::move(callback)); + } + } + + static IActor* FindActor(TMailboxActorPack::EType ActorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept { switch (ActorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId == localActorId) @@ -167,14 +196,22 @@ namespace NActors { } break; } - default: - Y_ABORT(); + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } return nullptr; } - void AttachActor(ui64 localActorId, IActor* actor) noexcept { - switch (ActorPack) { + IActor* FindActor(ui64 localActorId) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + return FindActor(static_cast(ActorPack), ActorsInfo, localActorId); + } else { + return FindActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId); + } + } + + static void AttachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId, IActor* actor) noexcept { + switch (actorPack) { case TMailboxActorPack::Simple: { if (ActorsInfo.Simple.ActorId == 0) { ActorsInfo.Simple.ActorId = localActorId; @@ -185,7 +222,7 @@ namespace NActors { ar->Actors[0] = ActorsInfo.Simple; ar->Actors[1] = TActorPair{actor, localActorId}; ActorsInfo.Array.ActorsCount = 2; - ActorPack = TMailboxActorPack::Array; + actorPack = TMailboxActorPack::Array; ActorsInfo.Array.ActorsArray = ar; } break; @@ -201,7 +238,7 @@ namespace NActors { mp->emplace(ActorsInfo.Array.ActorsArray->Actors[i].ActorId, ActorsInfo.Array.ActorsArray->Actors[i].Actor); } mp->emplace(localActorId, actor); - ActorPack = TMailboxActorPack::Map; + actorPack = TMailboxActorPack::Map; ActorsInfo.Array.ActorsCount = 0; delete ActorsInfo.Array.ActorsArray; ActorsInfo.Map.ActorsMap = mp; @@ -210,17 +247,27 @@ namespace NActors { } break; } - default: - Y_ABORT(); + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } } - IActor* DetachActor(ui64 localActorId) noexcept { - Y_DEBUG_ABORT_UNLESS(FindActor(localActorId) != nullptr); + void AttachActor(ui64 localActorId, IActor* actor) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + AttachActor(pack, ActorsInfo, localActorId, actor); + ActorPack = pack; + } else { + AttachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId, actor); + } + } + + static IActor* DetachActor(TMailboxActorPack::EType &actorPack, TActorsInfo &ActorsInfo, ui64 localActorId) noexcept { + Y_DEBUG_ABORT_UNLESS(FindActor(actorPack, ActorsInfo, localActorId) != nullptr); IActor* actorToDestruct = nullptr; - switch (ActorPack) { + switch (actorPack) { case TMailboxActorPack::Simple: { Y_ABORT_UNLESS(ActorsInfo.Simple.ActorId == localActorId); actorToDestruct = ActorsInfo.Simple.Actor; @@ -243,7 +290,7 @@ namespace NActors { ar->Actors[i++] = TActorPair{actor, actorId}; } delete ActorsInfo.Map.ActorsMap; - ActorPack = TMailboxActorPack::Array; + actorPack = TMailboxActorPack::Array; ActorsInfo.Array.ActorsArray = ar; ActorsInfo.Array.ActorsCount = ARRAY_CAPACITY; } @@ -265,16 +312,50 @@ namespace NActors { if (ActorsInfo.Array.ActorsCount == 1) { const TActorPair Actor = ActorsInfo.Array.ActorsArray->Actors[0]; delete ActorsInfo.Array.ActorsArray; - ActorPack = TMailboxActorPack::Simple; + actorPack = TMailboxActorPack::Simple; ActorsInfo.Simple = Actor; } break; } + case TMailboxActorPack::Complex: + Y_ABORT("Unexpected ActorPack type"); } return actorToDestruct; } + IActor* DetachActor(ui64 localActorId) noexcept { + if (ActorPack != TMailboxActorPack::Complex) { + TMailboxActorPack::EType pack = ActorPack; + IActor* result = DetachActor(pack, ActorsInfo, localActorId); + ActorPack = pack; + return result; + } else { + return DetachActor(ActorsInfo.Complex->ActorPack, ActorsInfo.Complex->ActorsInfo, localActorId); + } + } + + void EnableStats() { + TComplexActorInfo* complex = new TComplexActorInfo; + complex->ActorPack = ActorPack; + complex->ActorsInfo = std::move(ActorsInfo); + ActorPack = TMailboxActorPack::Complex; + ActorsInfo.Complex = complex; + } + + void AddElapsedCycles(ui64 elapsed) { + if (ActorPack == TMailboxActorPack::Complex) { + ActorsInfo.Complex->Stats.ElapsedCycles += elapsed; + } + } + + std::optional GetElapsedCycles() { + if (ActorPack == TMailboxActorPack::Complex) { + return ActorsInfo.Complex->Stats.ElapsedCycles; + } + return std::nullopt; + } + std::pair CountMailboxEvents(ui64 localActorId, ui32 maxTraverse); };