Skip to content

Commit

Permalink
single node scheduler has been added (ydb-platform#8445)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and Oleg Doronin committed Sep 6, 2024
1 parent 0fb80c7 commit 3526f32
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 68 deletions.
200 changes: 132 additions & 68 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <util/system/hostname.h>
#include <ydb/library/services/services.pb.h>

#include <library/cpp/scheme/scheme.h>

#include <random>


#define LOG_E(stream) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_NODES_MANAGER, stream)
Expand Down Expand Up @@ -86,93 +90,148 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
private:
void Handle(NDqs::TEvAllocateWorkersRequest::TPtr& ev) {
ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc();
const auto &rec = ev->Get()->Record;
const auto count = rec.GetCount();

auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
const auto &request = ev->Get()->Record;
const auto count = request.GetCount();
auto scheduler = request.GetScheduler();

auto response = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
if (count == 0) {
auto& error = *req->Record.MutableError();
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage("Incorrect request - 0 nodes requested");
} else if (!scheduler) {
ScheduleUniformly(request, response);
} else {
auto resourceId = rec.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
try {
auto schedulerSettings = NSc::TValue::FromJsonThrow(scheduler);
auto schedulerType = schedulerSettings["type"].GetString();
if (schedulerType == "single_node") {
ScheduleOnSingleNode(request, response);
} else {
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage(TStringBuilder{} << "Unknown scheduler type: " << schedulerType << ", settings: " << scheduler);
}
} catch (...) {
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage(TStringBuilder{} << "Error choosing scheduler. Invalid settings: " << scheduler << ", error: " << CurrentExceptionMessage());
}
}
LOG_D("TEvAllocateWorkersResponse " << response->Record.DebugString());

bool placementFailure = false;
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
ui64 totalMemoryLimit = 0;
if (rec.TaskSize() > i) {
totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit();
}
if (totalMemoryLimit == 0) {
totalMemoryLimit = MkqlInitialMemoryLimit;
}
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
auto& nextNode = Peers[NextPeer];

if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}
Send(ev->Sender, response.Release());
}

if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += totalMemoryLimit;
if (nextNode.NodeId == SelfId().NodeId()) {
// eventually synced self allocation info
memoryAllocated += totalMemoryLimit;
}
node = nextNode;
selfPlacement = false;
break;
}
void ScheduleUniformly(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
const auto count = request.GetCount();
auto resourceId = request.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}

bool placementFailure = false;
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
ui64 totalMemoryLimit = 0;
if (request.TaskSize() > i) {
totalMemoryLimit = request.GetTask(i).GetInitialTaskMemoryLimit();
}
if (totalMemoryLimit == 0) {
totalMemoryLimit = MkqlInitialMemoryLimit;
}
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
auto& nextNode = Peers[NextPeer];

if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}

if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
break;
if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += totalMemoryLimit;
if (nextNode.NodeId == SelfId().NodeId()) {
// eventually synced self allocation info
memoryAllocated += totalMemoryLimit;
}
node = nextNode;
selfPlacement = false;
break;
}
}
if (selfPlacement) {
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
memoryAllocated += totalMemoryLimit;
} else {
placementFailure = true;
auto& error = *req->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
error.SetMessage("Not enough free memory in the cluster");

if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
break;
}
}
nodes.push_back(node);
}

if (!placementFailure) {
req->Record.ClearError();
auto& group = *req->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
if (selfPlacement) {
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
memoryAllocated += totalMemoryLimit;
} else {
placementFailure = true;
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
error.SetMessage("Not enough free memory in the cluster");
break;
}
}
nodes.push_back(node);
}
LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());

Send(ev->Sender, req.Release());
if (!placementFailure) {
response->Record.ClearError();
auto& group = *response->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
}
}
}

void ScheduleOnSingleNode(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
const auto count = request.GetCount();
auto resourceId = request.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}

if (Peers.size() != SingleNodeScheduler.NodeOrder.size()) {
SingleNodeScheduler.NodeOrder.clear();
for (ui32 i = 0; i < Peers.size(); i++) {
SingleNodeScheduler.NodeOrder.push_back(i);
}
std::shuffle(SingleNodeScheduler.NodeOrder.begin(), SingleNodeScheduler.NodeOrder.end(), std::default_random_engine(TInstant::Now().MicroSeconds()));
}

TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
nodes.push_back(Peers[SingleNodeScheduler.NodeOrder[NextPeer]]);
}
if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}

response->Record.ClearError();
auto& group = *response->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
}
}

void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) {
Expand Down Expand Up @@ -338,6 +397,11 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
TString Address;
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize;
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssLimit;

struct TSingleNodeScheduler {
TVector<int> NodeOrder;
};
TSingleNodeScheduler SingleNodeScheduler;
};

TActorId MakeNodesManagerId() {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/actors/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {


const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync");
const TString scheduler = Settings->Scheduler.Get().GetOrElse({});

auto resourceAllocator = RegisterChild(CreateResourceAllocator(
GwmActorId, SelfId(), ControlId, workerCount,
Expand All @@ -204,6 +205,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
allocateRequest->Record.SetCreateComputeActor(enableComputeActor);
allocateRequest->Record.SetComputeActorType(computeActorType);
allocateRequest->Record.SetStatsMode(StatsMode);
allocateRequest->Record.SetScheduler(scheduler);
if (enableComputeActor) {
ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/dqs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ message TAllocateWorkersRequest {
uint64 FreeWorkerAfterMs = 14;
NYql.NDqProto.EDqStatsMode StatsMode = 16;
reserved 17;
string Scheduler = 18;
}

message TWorkerGroup {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ TDqConfiguration::TDqConfiguration() {

REGISTER_SETTING(*this, _MaxAttachmentsSize);
REGISTER_SETTING(*this, DisableCheckpoints);
REGISTER_SETTING(*this, Scheduler);
}

} // namespace NYql
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ struct TDqSettings {

NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
NCommon::TConfSetting<bool, false> DisableCheckpoints;
NCommon::TConfSetting<TString, false> Scheduler;

// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
Expand Down Expand Up @@ -186,6 +187,7 @@ struct TDqSettings {
SAVE_SETTING(TaskRunnerStats);
SAVE_SETTING(SpillingEngine);
SAVE_SETTING(DisableCheckpoints);
SAVE_SETTING(Scheduler);
#undef SAVE_SETTING
}

Expand Down
1 change: 1 addition & 0 deletions ydb/tests/fq/yds/test_3_selects.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestSelects(object):
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)
def test_3_selects(self, client):
sql = R'''
pragma dq.Scheduler=@@{"type": "single_node"}@@;
SELECT 1 AS SingleColumn;
SELECT "A" AS TextColumn;
SELECT 11 AS Column1, 22 AS Column2;
Expand Down

0 comments on commit 3526f32

Please sign in to comment.