From a16f62acf4c89acd6fc1b1add5618fbb6d8ebdda Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Thu, 29 Aug 2024 09:11:15 +0000 Subject: [PATCH] single node scheduler has been added --- ydb/core/fq/libs/actors/nodes_manager.cpp | 200 ++++++++++++------ .../providers/dq/actors/executer_actor.cpp | 2 + .../yql/providers/dq/api/protos/dqs.proto | 1 + .../providers/dq/common/yql_dq_settings.cpp | 1 + .../yql/providers/dq/common/yql_dq_settings.h | 2 + ydb/tests/fq/yds/test_3_selects.py | 1 + 6 files changed, 139 insertions(+), 68 deletions(-) diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index b422fc77c470..83622fde3a80 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -16,6 +16,10 @@ #include #include +#include + +#include + #define LOG_E(stream) \ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_NODES_MANAGER, stream) @@ -86,93 +90,148 @@ class TNodesManagerActor : public NActors::TActorBootstrappedGetCounter("EvAllocateWorkersRequest", true)->Inc(); - const auto &rec = ev->Get()->Record; - const auto count = rec.GetCount(); - - auto req = MakeHolder(); + const auto &request = ev->Get()->Record; + const auto count = request.GetCount(); + auto scheduler = request.GetScheduler(); + auto response = MakeHolder(); if (count == 0) { - auto& error = *req->Record.MutableError(); + auto& error = *response->Record.MutableError(); error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST); error.SetMessage("Incorrect request - 0 nodes requested"); + } else if (!scheduler) { + ScheduleUniformly(request, response); } else { - auto resourceId = rec.GetResourceId(); - if (!resourceId) { - resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + try { + auto schedulerSettings = NSc::TValue::FromJsonThrow(scheduler); + auto schedulerType = schedulerSettings["type"].GetString(); + if (schedulerType == "single_node") { + ScheduleOnSingleNode(request, response); + } else { + auto& error = *response->Record.MutableError(); + error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST); + error.SetMessage(TStringBuilder{} << "Unknown scheduler type: " << schedulerType << ", settings: " << scheduler); + } + } catch (...) { + auto& error = *response->Record.MutableError(); + error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST); + error.SetMessage(TStringBuilder{} << "Error choosing scheduler. Invalid settings: " << scheduler << ", error: " << CurrentExceptionMessage()); } + } + LOG_D("TEvAllocateWorkersResponse " << response->Record.DebugString()); - bool placementFailure = false; - ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()); - ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()); - TVector nodes; - for (ui32 i = 0; i < count; ++i) { - ui64 totalMemoryLimit = 0; - if (rec.TaskSize() > i) { - totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit(); - } - if (totalMemoryLimit == 0) { - totalMemoryLimit = MkqlInitialMemoryLimit; - } - TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter}; - bool selfPlacement = true; - if (!Peers.empty()) { - auto FirstPeer = NextPeer; - while (true) { - Y_ABORT_UNLESS(NextPeer < Peers.size()); - auto& nextNode = Peers[NextPeer]; - - if (++NextPeer >= Peers.size()) { - NextPeer = 0; - } + Send(ev->Sender, response.Release()); + } - if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match - && ( nextNode.MemoryLimit == 0 // memory is NOT limited - || nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough - ) { - // adjust allocated size to place next tasks correctly, will be reset after next health check - nextNode.MemoryAllocated += totalMemoryLimit; - if (nextNode.NodeId == SelfId().NodeId()) { - // eventually synced self allocation info - memoryAllocated += totalMemoryLimit; - } - node = nextNode; - selfPlacement = false; - break; - } + void ScheduleUniformly(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder& response) { + const auto count = request.GetCount(); + auto resourceId = request.GetResourceId(); + if (!resourceId) { + resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + } + + bool placementFailure = false; + ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()); + ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()); + TVector nodes; + for (ui32 i = 0; i < count; ++i) { + ui64 totalMemoryLimit = 0; + if (request.TaskSize() > i) { + totalMemoryLimit = request.GetTask(i).GetInitialTaskMemoryLimit(); + } + if (totalMemoryLimit == 0) { + totalMemoryLimit = MkqlInitialMemoryLimit; + } + TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter}; + bool selfPlacement = true; + if (!Peers.empty()) { + auto FirstPeer = NextPeer; + while (true) { + Y_ABORT_UNLESS(NextPeer < Peers.size()); + auto& nextNode = Peers[NextPeer]; + + if (++NextPeer >= Peers.size()) { + NextPeer = 0; + } - if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then - break; + if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match + && ( nextNode.MemoryLimit == 0 // memory is NOT limited + || nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough + ) { + // adjust allocated size to place next tasks correctly, will be reset after next health check + nextNode.MemoryAllocated += totalMemoryLimit; + if (nextNode.NodeId == SelfId().NodeId()) { + // eventually synced self allocation info + memoryAllocated += totalMemoryLimit; } + node = nextNode; + selfPlacement = false; + break; } - } - if (selfPlacement) { - if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) { - memoryAllocated += totalMemoryLimit; - } else { - placementFailure = true; - auto& error = *req->Record.MutableError(); - error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED); - error.SetMessage("Not enough free memory in the cluster"); + + if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then break; } } - nodes.push_back(node); } - - if (!placementFailure) { - req->Record.ClearError(); - auto& group = *req->Record.MutableNodes(); - group.SetResourceId(resourceId); - for (const auto& node : nodes) { - auto* worker = group.AddWorker(); - *worker->MutableGuid() = node.InstanceId; - worker->SetNodeId(node.NodeId); + if (selfPlacement) { + if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) { + memoryAllocated += totalMemoryLimit; + } else { + placementFailure = true; + auto& error = *response->Record.MutableError(); + error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED); + error.SetMessage("Not enough free memory in the cluster"); + break; } } + nodes.push_back(node); } - LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString()); - Send(ev->Sender, req.Release()); + if (!placementFailure) { + response->Record.ClearError(); + auto& group = *response->Record.MutableNodes(); + group.SetResourceId(resourceId); + for (const auto& node : nodes) { + auto* worker = group.AddWorker(); + *worker->MutableGuid() = node.InstanceId; + worker->SetNodeId(node.NodeId); + } + } + } + + void ScheduleOnSingleNode(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder& response) { + const auto count = request.GetCount(); + auto resourceId = request.GetResourceId(); + if (!resourceId) { + resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + } + + if (Peers.size() != SingleNodeScheduler.NodeOrder.size()) { + SingleNodeScheduler.NodeOrder.clear(); + for (ui32 i = 0; i < Peers.size(); i++) { + SingleNodeScheduler.NodeOrder.push_back(i); + } + std::shuffle(SingleNodeScheduler.NodeOrder.begin(), SingleNodeScheduler.NodeOrder.end(), std::default_random_engine(TInstant::Now().MicroSeconds())); + } + + TVector nodes; + for (ui32 i = 0; i < count; ++i) { + Y_ABORT_UNLESS(NextPeer < Peers.size()); + nodes.push_back(Peers[SingleNodeScheduler.NodeOrder[NextPeer]]); + } + if (++NextPeer >= Peers.size()) { + NextPeer = 0; + } + + response->Record.ClearError(); + auto& group = *response->Record.MutableNodes(); + group.SetResourceId(resourceId); + for (const auto& node : nodes) { + auto* worker = group.AddWorker(); + *worker->MutableGuid() = node.InstanceId; + worker->SetNodeId(node.NodeId); + } } void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) { @@ -338,6 +397,11 @@ class TNodesManagerActor : public NActors::TActorBootstrapped NodeOrder; + }; + TSingleNodeScheduler SingleNodeScheduler; }; TActorId MakeNodesManagerId() { diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index c57bde386d26..b2b852792e9a 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -191,6 +191,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync"); + const TString scheduler = Settings->Scheduler.Get().GetOrElse({}); auto resourceAllocator = RegisterChild(CreateResourceAllocator( GwmActorId, SelfId(), ControlId, workerCount, @@ -204,6 +205,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { allocateRequest->Record.SetCreateComputeActor(enableComputeActor); allocateRequest->Record.SetComputeActorType(computeActorType); allocateRequest->Record.SetStatsMode(StatsMode); + allocateRequest->Record.SetScheduler(scheduler); if (enableComputeActor) { ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index e04d943aae8c..41eaeedf109d 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -40,6 +40,7 @@ message TAllocateWorkersRequest { uint64 FreeWorkerAfterMs = 14; NYql.NDqProto.EDqStatsMode StatsMode = 16; reserved 17; + string Scheduler = 18; } message TWorkerGroup { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 53809de7b552..282fa1dffdcd 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -116,6 +116,7 @@ TDqConfiguration::TDqConfiguration() { return res; }); REGISTER_SETTING(*this, UseGraceJoinCoreForMap); + REGISTER_SETTING(*this, Scheduler); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 42137b7c8fef..85cfe3a256b3 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -139,6 +139,7 @@ struct TDqSettings { NCommon::TConfSetting _MaxAttachmentsSize; NCommon::TConfSetting DisableCheckpoints; NCommon::TConfSetting UseGraceJoinCoreForMap; + NCommon::TConfSetting Scheduler; // This options will be passed to executor_actor and worker_actor template @@ -193,6 +194,7 @@ struct TDqSettings { SAVE_SETTING(SpillingEngine); SAVE_SETTING(EnableSpillingInChannels); SAVE_SETTING(DisableCheckpoints); + SAVE_SETTING(Scheduler); #undef SAVE_SETTING } diff --git a/ydb/tests/fq/yds/test_3_selects.py b/ydb/tests/fq/yds/test_3_selects.py index a969becbab53..c733e8f51823 100644 --- a/ydb/tests/fq/yds/test_3_selects.py +++ b/ydb/tests/fq/yds/test_3_selects.py @@ -15,6 +15,7 @@ class TestSelects(object): @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True) def test_3_selects(self, client): sql = R''' + pragma dq.Scheduler=@@{"type": "single_node"}@@; SELECT 1 AS SingleColumn; SELECT "A" AS TextColumn; SELECT 11 AS Column1, 22 AS Column2;