Skip to content

Commit fe4adb7

Browse files
committed
Return backward compatibility to YDB for spilling
1 parent a2af99e commit fe4adb7

File tree

6 files changed

+13
-7
lines changed

6 files changed

+13
-7
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2211,7 +2211,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22112211
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.size() == 0);
22122212

22132213
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
2214-
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, Nothing(),
2214+
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
22152215
ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, useDataQueryPool, localComputeTasks, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks, GetUserRequestContext());
22162216

22172217
auto err = Planner->PlanExecution();

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;
5959

6060
TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
6161
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
62-
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
62+
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
6363
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
6464
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
6565
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
@@ -72,6 +72,7 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
7272
, UserToken(userToken)
7373
, Deadline(deadline)
7474
, StatsMode(statsMode)
75+
, WithSpilling(withSpilling)
7576
, RlPath(rlPath)
7677
, ResourcesSnapshot(std::move(resourcesSnapshot))
7778
, ExecuterSpan(executerSpan)
@@ -186,6 +187,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
186187
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
187188
} else {
188189
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
190+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
189191
}
190192

191193
if (RlPath) {
@@ -332,6 +334,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
332334
settings.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
333335
settings.FailOnUndelivery = true;
334336
settings.StatsMode = GetDqStatsMode(StatsMode);
337+
settings.UseSpilling = false;
335338

336339
NYql::NDq::TComputeMemoryLimits limits;
337340
limits.ChannelBufferSize = 32_MB; // Depends on NYql::NDq::TDqOutputChannelSettings::ChunkSizeLimit (now 48 MB) with a ratio of 1.5
@@ -515,14 +518,14 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
515518
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
516519
const IKqpGateway::TKqpSnapshot& snapshot,
517520
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
518-
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
521+
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
519522
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
520523
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
521524
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
522525
const TIntrusivePtr<TUserRequestContext>& userRequestContext)
523526
{
524527
return std::make_unique<TKqpPlanner>(tasksGraph, txId, executer, snapshot,
525-
database, userToken, deadline, statsMode, rlPath, executerSpan,
528+
database, userToken, deadline, statsMode, withSpilling, rlPath, executerSpan,
526529
std::move(resourcesSnapshot), executerRetriesConfig, useDataQueryPool,
527530
localComputeTasks, mkqlMemoryLimit, asyncIoFactory, doOptimization, userRequestContext);
528531
}

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class TKqpPlanner {
4343
public:
4444
TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
4545
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
46-
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
46+
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
4747
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
4848
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
4949
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
@@ -84,6 +84,7 @@ class TKqpPlanner {
8484
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
8585
const TInstant Deadline;
8686
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
87+
const bool WithSpilling;
8788
const TMaybe<NKikimrKqp::TRlPath> RlPath;
8889
THashSet<ui32> TrackingNodes;
8990
const TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
@@ -113,7 +114,7 @@ class TKqpPlanner {
113114
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
114115
const IKqpGateway::TKqpSnapshot& snapshot,
115116
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
116-
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
117+
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
117118
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
118119
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
119120
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig,

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
332332
void ExecuteScanTx() {
333333

334334
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
335-
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode,
335+
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, AppData()->EnableKqpSpilling,
336336
Request.RlPath, ExecuterSpan, std::move(ResourcesSnapshot), ExecuterRetriesConfig, /* useDataQueryPool */ false, /* localComputeTasks */ false,
337337
Request.MkqlMemoryLimit, nullptr, false, GetUserRequestContext());
338338

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
410410
runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN;
411411

412412
runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode();
413+
runtimeSettingsBase.UseSpilling = msgRtSettings.GetUseSpilling();
413414

414415
if (msgRtSettings.HasRlPath()) {
415416
runtimeSettingsBase.RlPath = msgRtSettings.GetRlPath();

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ struct TComputeRuntimeSettings {
242242
ui32 ExtraMemoryAllocationPool = 0;
243243

244244
bool FailOnUndelivery = true;
245+
bool UseSpilling = false;
245246

246247
std::function<void(bool success, const TIssues& issues)> TerminateHandler;
247248
TMaybe<NDqProto::TRlPath> RlPath;

0 commit comments

Comments
 (0)