Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardTasks.emplace(shardId, task.Id);

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);

return task;
};
Expand Down
27 changes: 16 additions & 11 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
task.Meta.Type = TTaskMeta::TTaskType::Compute;

FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);

LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
}
Expand All @@ -938,15 +938,20 @@ class TKqpExecuterBase : public TActor<TDerived> {
output.SinkSettings = extSink.GetSettings();
}

void BuildInternalSinks(const NKqpProto::TKqpSink& sink, TKqpTasksGraph::TTaskType& task) {
void BuildInternalSinks(const NKqpProto::TKqpSink& sink, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
const auto& intSink = sink.GetInternalSink();
auto& output = task.Outputs[sink.GetOutputIndex()];
output.Type = TTaskOutputType::Sink;
output.SinkType = intSink.GetType();

if (intSink.GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>()) {
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
if (!stageInfo.Meta.ResolvedSinkSettings) {
YQL_ENSURE(intSink.GetSettings().UnpackTo(&settings), "Failed to unpack settings");
} else {
settings = *stageInfo.Meta.ResolvedSinkSettings;
}

auto& lockTxId = TasksGraph.GetMeta().LockTxId;
if (lockTxId) {
settings.SetLockTxId(*lockTxId);
Expand Down Expand Up @@ -974,14 +979,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
}
}

void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
void BuildSinks(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, TKqpTasksGraph::TTaskType& task) {
if (stage.SinksSize() > 0) {
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
const auto& sink = stage.GetSinks(0);
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());

if (sink.HasInternalSink()) {
BuildInternalSinks(sink, task);
BuildInternalSinks(sink, stageInfo, task);
} else if (sink.HasExternalSink()) {
BuildExternalSinks(sink, task);
} else {
Expand Down Expand Up @@ -1062,7 +1067,7 @@ class TKqpExecuterBase : public TActor<TDerived> {

// finish building
for (auto taskId : tasksIds) {
BuildSinks(stage, TasksGraph.GetTask(taskId));
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
}
}

Expand Down Expand Up @@ -1313,7 +1318,7 @@ class TKqpExecuterBase : public TActor<TDerived> {

auto buildSinks = [&]() {
for (const ui64 taskId : createdTasksIds) {
BuildSinks(stage, TasksGraph.GetTask(taskId));
BuildSinks(stage, stageInfo, TasksGraph.GetTask(taskId));
}
};

Expand Down Expand Up @@ -1434,7 +1439,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
task.Meta.Type = TTaskMeta::TTaskType::Compute;
task.Meta.ExecuterId = SelfId();
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
}
Expand Down Expand Up @@ -1713,7 +1718,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
auto& task = TasksGraph.GetTask(taskIdx);
task.Meta.SetEnableShardsSequentialScan(readSettings.IsSorted());
PrepareScanMetaForUsage(task.Meta, keyTypes);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);
}
}

Expand Down Expand Up @@ -1767,7 +1772,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.SetMetaId(t);
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);

for (const auto& readInfo: *task.Meta.Reads) {
Y_ENSURE(hashByShardId.contains(readInfo.ShardId));
Expand Down Expand Up @@ -1809,7 +1814,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
task.Meta.Type = TTaskMeta::TTaskType::Scan;
task.SetMetaId(metaGlueingId);
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
BuildSinks(stage, task);
BuildSinks(stage, stageInfo, task);
}
}
}
Expand Down
Loading
Loading