diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 4ff458126b60..696b8732c1d1 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -258,8 +258,14 @@ std::unique_ptr TKqpPlanner::AssignTasksToNodes() { auto placingOptions = ResourceManager_->GetPlacingOptions(); + ui64 nonParallelLimit = placingOptions.MaxNonParallelTasksExecutionLimit; + if (MayRunTasksLocally) { + // not applied to column shards and external sources + nonParallelLimit = placingOptions.MaxNonParallelDataQueryTasksLimit; + } + bool singleNodeExecutionMakeSence = ( - ResourceEstimations.size() <= placingOptions.MaxNonParallelTasksExecutionLimit || + ResourceEstimations.size() <= nonParallelLimit || // all readers are located on the one node. TasksPerNode.size() == 1 ); diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 34c1ac61eb65..ea16a237c12d 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -145,6 +145,7 @@ class TKqpResourceManager : public IKqpResourceManager { TPlannerPlacingOptions GetPlacingOptions() override { return TPlannerPlacingOptions{ .MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(), + .MaxNonParallelDataQueryTasksLimit = MaxNonParallelDataQueryTasksLimit.load(), .MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(), .PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(), }; @@ -425,6 +426,7 @@ class TKqpResourceManager : public IKqpResourceManager { MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit()); MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit()); PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution()); + MaxNonParallelDataQueryTasksLimit.store(config.GetMaxNonParallelDataQueryTasksLimit()); } ui32 GetNodeId() override { @@ -474,6 +476,7 @@ class TKqpResourceManager : public IKqpResourceManager { std::atomic MaxNonParallelTopStageExecutionLimit = 1; std::atomic MaxNonParallelTasksExecutionLimit = 8; std::atomic PreferLocalDatacenterExecution = true; + std::atomic MaxNonParallelDataQueryTasksLimit = 1000; // current state std::atomic LastResourceBrokerTaskId = 0; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 54af98a91913..c4cf4ba60f91 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -204,6 +204,7 @@ struct TKqpLocalNodeResources { struct TPlannerPlacingOptions { ui64 MaxNonParallelTasksExecutionLimit = 8; + ui64 MaxNonParallelDataQueryTasksLimit = 1000; ui64 MaxNonParallelTopStageExecutionLimit = 1; bool PreferLocalDatacenterExecution = true; }; diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 6597ec295b26..8ea48a1cdb34 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -50,6 +50,7 @@ message TTableServiceConfig { optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8]; optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1]; optional bool PreferLocalDatacenterExecution = 27 [ default = true ]; + optional uint64 MaxNonParallelDataQueryTasksLimit = 28 [default = 1000]; } message TSpillingServiceConfig { @@ -301,6 +302,6 @@ message TTableServiceConfig { optional uint64 QueryReplayCacheUploadTTLSec = 62 [default = 36000]; optional bool EnableQueryServiceSpilling = 63 [ default = true ]; - + optional bool EnableImplicitQueryParameterTypes = 66 [ default = false ]; };