Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

single node scheduler has been added #8445

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 132 additions & 68 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include <util/system/hostname.h>
#include <ydb/library/services/services.pb.h>

#include <library/cpp/scheme/scheme.h>

#include <random>


#define LOG_E(stream) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_NODES_MANAGER, stream)
Expand Down Expand Up @@ -86,93 +90,148 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
private:
void Handle(NDqs::TEvAllocateWorkersRequest::TPtr& ev) {
ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc();
const auto &rec = ev->Get()->Record;
const auto count = rec.GetCount();

auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
const auto &request = ev->Get()->Record;
const auto count = request.GetCount();
auto scheduler = request.GetScheduler();

auto response = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
if (count == 0) {
auto& error = *req->Record.MutableError();
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage("Incorrect request - 0 nodes requested");
} else if (!scheduler) {
ScheduleUniformly(request, response);
} else {
auto resourceId = rec.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
try {
auto schedulerSettings = NSc::TValue::FromJsonThrow(scheduler);
auto schedulerType = schedulerSettings["type"].GetString();
if (schedulerType == "single_node") {
ScheduleOnSingleNode(request, response);
} else {
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage(TStringBuilder{} << "Unknown scheduler type: " << schedulerType << ", settings: " << scheduler);
}
} catch (...) {
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
error.SetMessage(TStringBuilder{} << "Error choosing scheduler. Invalid settings: " << scheduler << ", error: " << CurrentExceptionMessage());
}
}
LOG_D("TEvAllocateWorkersResponse " << response->Record.DebugString());

bool placementFailure = false;
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
ui64 totalMemoryLimit = 0;
if (rec.TaskSize() > i) {
totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit();
}
if (totalMemoryLimit == 0) {
totalMemoryLimit = MkqlInitialMemoryLimit;
}
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
auto& nextNode = Peers[NextPeer];

if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}
Send(ev->Sender, response.Release());
}

if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += totalMemoryLimit;
if (nextNode.NodeId == SelfId().NodeId()) {
// eventually synced self allocation info
memoryAllocated += totalMemoryLimit;
}
node = nextNode;
selfPlacement = false;
break;
}
void ScheduleUniformly(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
const auto count = request.GetCount();
auto resourceId = request.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}

bool placementFailure = false;
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
ui64 totalMemoryLimit = 0;
if (request.TaskSize() > i) {
totalMemoryLimit = request.GetTask(i).GetInitialTaskMemoryLimit();
}
if (totalMemoryLimit == 0) {
totalMemoryLimit = MkqlInitialMemoryLimit;
}
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
auto& nextNode = Peers[NextPeer];

if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}

if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
break;
if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += totalMemoryLimit;
if (nextNode.NodeId == SelfId().NodeId()) {
// eventually synced self allocation info
memoryAllocated += totalMemoryLimit;
}
node = nextNode;
selfPlacement = false;
break;
}
}
if (selfPlacement) {
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
memoryAllocated += totalMemoryLimit;
} else {
placementFailure = true;
auto& error = *req->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
error.SetMessage("Not enough free memory in the cluster");

if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
break;
}
}
nodes.push_back(node);
}

if (!placementFailure) {
req->Record.ClearError();
auto& group = *req->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
if (selfPlacement) {
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
memoryAllocated += totalMemoryLimit;
} else {
placementFailure = true;
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
error.SetMessage("Not enough free memory in the cluster");
break;
}
}
nodes.push_back(node);
}
LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());

Send(ev->Sender, req.Release());
if (!placementFailure) {
response->Record.ClearError();
auto& group = *response->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
}
}
}

void ScheduleOnSingleNode(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
const auto count = request.GetCount();
auto resourceId = request.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}

if (Peers.size() != SingleNodeScheduler.NodeOrder.size()) {
SingleNodeScheduler.NodeOrder.clear();
for (ui32 i = 0; i < Peers.size(); i++) {
SingleNodeScheduler.NodeOrder.push_back(i);
}
std::shuffle(SingleNodeScheduler.NodeOrder.begin(), SingleNodeScheduler.NodeOrder.end(), std::default_random_engine(TInstant::Now().MicroSeconds()));
}

TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
Y_ABORT_UNLESS(NextPeer < Peers.size());
nodes.push_back(Peers[SingleNodeScheduler.NodeOrder[NextPeer]]);
}
if (++NextPeer >= Peers.size()) {
NextPeer = 0;
}

response->Record.ClearError();
auto& group = *response->Record.MutableNodes();
group.SetResourceId(resourceId);
for (const auto& node : nodes) {
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
}
}

void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) {
Expand Down Expand Up @@ -338,6 +397,11 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
TString Address;
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize;
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssLimit;

struct TSingleNodeScheduler {
TVector<int> NodeOrder;
};
TSingleNodeScheduler SingleNodeScheduler;
};

TActorId MakeNodesManagerId() {
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/actors/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {


const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync");
const TString scheduler = Settings->Scheduler.Get().GetOrElse({});

auto resourceAllocator = RegisterChild(CreateResourceAllocator(
GwmActorId, SelfId(), ControlId, workerCount,
Expand All @@ -204,6 +205,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
allocateRequest->Record.SetCreateComputeActor(enableComputeActor);
allocateRequest->Record.SetComputeActorType(computeActorType);
allocateRequest->Record.SetStatsMode(StatsMode);
allocateRequest->Record.SetScheduler(scheduler);
if (enableComputeActor) {
ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/dqs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ message TAllocateWorkersRequest {
uint64 FreeWorkerAfterMs = 14;
NYql.NDqProto.EDqStatsMode StatsMode = 16;
reserved 17;
string Scheduler = 18;
}

message TWorkerGroup {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ TDqConfiguration::TDqConfiguration() {
return res;
});
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
REGISTER_SETTING(*this, Scheduler);
}

} // namespace NYql
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ struct TDqSettings {
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
NCommon::TConfSetting<bool, false> DisableCheckpoints;
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
NCommon::TConfSetting<TString, false> Scheduler;

// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
Expand Down Expand Up @@ -193,6 +194,7 @@ struct TDqSettings {
SAVE_SETTING(SpillingEngine);
SAVE_SETTING(EnableSpillingInChannels);
SAVE_SETTING(DisableCheckpoints);
SAVE_SETTING(Scheduler);
#undef SAVE_SETTING
}

Expand Down
1 change: 1 addition & 0 deletions ydb/tests/fq/yds/test_3_selects.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestSelects(object):
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)
def test_3_selects(self, client):
sql = R'''
pragma dq.Scheduler=@@{"type": "single_node"}@@;
SELECT 1 AS SingleColumn;
SELECT "A" AS TextColumn;
SELECT 11 AS Column1, 22 AS Column2;
Expand Down
Loading