Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,14 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {

auto placingOptions = ResourceManager_->GetPlacingOptions();

ui64 nonParallelLimit = placingOptions.MaxNonParallelTasksExecutionLimit;
if (MayRunTasksLocally) {
// not applied to column shards and external sources
nonParallelLimit = placingOptions.MaxNonParallelDataQueryTasksLimit;
}

bool singleNodeExecutionMakeSence = (
ResourceEstimations.size() <= placingOptions.MaxNonParallelTasksExecutionLimit ||
ResourceEstimations.size() <= nonParallelLimit ||
// all readers are located on the one node.
TasksPerNode.size() == 1
);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class TKqpResourceManager : public IKqpResourceManager {
TPlannerPlacingOptions GetPlacingOptions() override {
return TPlannerPlacingOptions{
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
.MaxNonParallelDataQueryTasksLimit = MaxNonParallelDataQueryTasksLimit.load(),
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
};
Expand Down Expand Up @@ -425,6 +426,7 @@ class TKqpResourceManager : public IKqpResourceManager {
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
MaxNonParallelDataQueryTasksLimit.store(config.GetMaxNonParallelDataQueryTasksLimit());
}

ui32 GetNodeId() override {
Expand Down Expand Up @@ -474,6 +476,7 @@ class TKqpResourceManager : public IKqpResourceManager {
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
std::atomic<bool> PreferLocalDatacenterExecution = true;
std::atomic<ui64> MaxNonParallelDataQueryTasksLimit = 1000;

// current state
std::atomic<ui64> LastResourceBrokerTaskId = 0;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ struct TKqpLocalNodeResources {

struct TPlannerPlacingOptions {
ui64 MaxNonParallelTasksExecutionLimit = 8;
ui64 MaxNonParallelDataQueryTasksLimit = 1000;
ui64 MaxNonParallelTopStageExecutionLimit = 1;
bool PreferLocalDatacenterExecution = true;
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message TTableServiceConfig {
optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
optional uint64 MaxNonParallelDataQueryTasksLimit = 28 [default = 1000];
}

message TSpillingServiceConfig {
Expand Down Expand Up @@ -301,6 +302,6 @@ message TTableServiceConfig {
optional uint64 QueryReplayCacheUploadTTLSec = 62 [default = 36000];

optional bool EnableQueryServiceSpilling = 63 [ default = true ];

optional bool EnableImplicitQueryParameterTypes = 66 [ default = false ];
};