Skip to content

Commit e0ad7b8

Browse files
authored
Explain for CTAS (#17128)
1 parent ade5e4f commit e0ad7b8

28 files changed

+807
-161
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16371637
shardTasks.emplace(shardId, task.Id);
16381638

16391639
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1640-
BuildSinks(stage, task);
1640+
BuildSinks(stage, stageInfo, task);
16411641

16421642
return task;
16431643
};

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
924924
task.Meta.Type = TTaskMeta::TTaskType::Compute;
925925

926926
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
927-
BuildSinks(stage, task);
927+
BuildSinks(stage, stageInfo, task);
928928

929929
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
930930
}
@@ -948,15 +948,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
948948
output.SinkSettings = extSink.GetSettings();
949949
}
950950

951-
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
951+
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
952952
const auto& intSink = sink.GetInternalSink();
953953
auto& output = task.Outputs[sink.GetOutputIndex()];
954954
output.Type = TTaskOutputType::Sink;
955955
output.SinkType = intSink.GetType();
956956

957957
if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
958958
NKikimrKqp::TKqpTableSinkSettings settings;
959-
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
959+
if (!stageInfo.Meta.ResolvedSinkSettings) {
960+
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
961+
} else {
962+
settings = *stageInfo.Meta.ResolvedSinkSettings;
963+
}
964+
960965
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
961966
if (lockTxId) {
962967
settings.SetLockTxId(*lockTxId);
@@ -984,14 +989,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
984989
}
985990
}
986991

987-
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
992+
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
988993
if (stage.SinksSize() > 0) {
989994
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
990995
const auto& sink = stage.GetSinks(0);
991996
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
992997

993998
if (sink.HasInternalSink()) {
994-
BuildInternalSinks(sink, task);
999+
BuildInternalSinks(sink, stageInfo, task);
9951000
} else if (sink.HasExternalSink()) {
9961001
BuildExternalSinks(sink, task);
9971002
} else {
@@ -1072,7 +1077,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
10721077

10731078
// finish building
10741079
for (auto taskId : tasksIds) {
1075-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1080+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
10761081
}
10771082
}
10781083

@@ -1329,7 +1334,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
13291334

13301335
auto buildSinks = [&]() {
13311336
for (const ui64 taskId : createdTasksIds) {
1332-
BuildSinks(stage, TasksGraph.GetTask(taskId));
1337+
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
13331338
}
13341339
};
13351340

@@ -1450,7 +1455,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
14501455
task.Meta.Type = TTaskMeta::TTaskType::Compute;
14511456
task.Meta.ExecuterId = SelfId();
14521457
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1453-
BuildSinks(stage, task);
1458+
BuildSinks(stage, stageInfo, task);
14541459
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
14551460
}
14561461
}
@@ -1729,7 +1734,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17291734
auto& task = TasksGraph.GetTask(taskIdx);
17301735
task.Meta.SetEnableShardsSequentialScan(readSettings.IsSorted());
17311736
PrepareScanMetaForUsage(task.Meta, keyTypes);
1732-
BuildSinks(stage, task);
1737+
BuildSinks(stage, stageInfo, task);
17331738
}
17341739
}
17351740

@@ -1783,7 +1788,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17831788
task.Meta.Type = TTaskMeta::TTaskType::Scan;
17841789
task.SetMetaId(t);
17851790
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1786-
BuildSinks(stage, task);
1791+
BuildSinks(stage, stageInfo, task);
17871792

17881793
for (const auto& readInfo: *task.Meta.Reads) {
17891794
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
@@ -1825,7 +1830,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18251830
task.Meta.Type = TTaskMeta::TTaskType::Scan;
18261831
task.SetMetaId(metaGlueingId);
18271832
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
1828-
BuildSinks(stage, task);
1833+
BuildSinks(stage, stageInfo, task);
18291834
}
18301835
}
18311836
}

0 commit comments

Comments
 (0)