Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ struct TSerializerCtx {
const TIntrusivePtr<NYql::TKikimrTablesData> tablesData,
const TKikimrConfiguration::TPtr config, ui32 txCount,
TVector<TVector<NKikimrMiniKQL::TResult>> pureTxResults,
TTypeAnnotationContext& typeCtx,
TTypeAnnotationContext& typeCtx,
TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx)
: ExprCtx(exprCtx)
, Cluster(cluster)
Expand Down Expand Up @@ -348,7 +348,7 @@ class TxPlanSerializer {

writer.WriteKey("Inputs");
writer.BeginList();

for (const auto& input : op.Inputs) {

if (std::holds_alternative<ui32>(input)) {
Expand Down Expand Up @@ -461,6 +461,15 @@ class TxPlanSerializer {
}

void FillConnectionPlanNode(const TDqConnection& connection, TQueryPlanNode& planNode) {
TDqStageSettings settings = TDqStageSettings::Parse(connection.Output().Stage());
auto GetNarrowColumnName = [&](const TString& wideColumnName) {
ui32 idx;
if (!TryFromString(wideColumnName, idx)) {
return wideColumnName;
}
return TString(settings.OutputNarrowType->GetItems()[idx]->GetName());
};

planNode.Type = EPlanNodeType::Connection;

if (connection.Maybe<TDqCnUnionAll>()) {
Expand All @@ -473,15 +482,23 @@ class TxPlanSerializer {
planNode.TypeName = "HashShuffle";
auto& keyColumns = planNode.NodeInfo["KeyColumns"];
for (const auto& column : hashShuffle.Cast().KeyColumns()) {
keyColumns.AppendValue(TString(column.Value()));
if (settings.WideChannels) {
keyColumns.AppendValue(GetNarrowColumnName(TString(column.Value())));
} else {
keyColumns.AppendValue(TString(column.Value()));
}
}
} else if (auto merge = connection.Maybe<TDqCnMerge>()) {
planNode.TypeName = "Merge";
auto& sortColumns = planNode.NodeInfo["SortColumns"];
for (const auto& sortColumn : merge.Cast().SortColumns()) {
TStringBuilder sortColumnDesc;
sortColumnDesc << sortColumn.Column().Value() << " ("
<< sortColumn.SortDirection().Value() << ")";
if (settings.WideChannels) {
sortColumnDesc << GetNarrowColumnName(TString(sortColumn.Column().Value()));
} else {
sortColumnDesc << sortColumn.Column().Value();
}
sortColumnDesc << " (" << sortColumn.SortDirection().Value() << ")";

sortColumns.AppendValue(sortColumnDesc);
}
Expand Down Expand Up @@ -1376,7 +1393,7 @@ class TxPlanSerializer {

TOperator op;
op.Properties["Name"] = name;

return AddOperator(planNode, name, std::move(op));
}

Expand Down Expand Up @@ -1728,8 +1745,8 @@ class TxPlanSerializer {

template <typename TReadTableSettings>
void AddReadTableSettings(
TOperator& op,
const TReadTableSettings& readTableSettings,
TOperator& op,
const TReadTableSettings& readTableSettings,
TTableRead& readInfo
) {
auto settings = NYql::TKqpReadTableSettings::Parse(readTableSettings);
Expand All @@ -1751,9 +1768,9 @@ class TxPlanSerializer {
}

if (settings.SequentialInFlight) {
op.Properties["Scan"] = "Sequential";
op.Properties["Scan"] = "Sequential";
} else {
op.Properties["Scan"] = "Parallel";
op.Properties["Scan"] = "Parallel";
}
}

Expand Down Expand Up @@ -1953,8 +1970,8 @@ TVector<NJson::TJsonValue> RemoveRedundantNodes(NJson::TJsonValue& plan, const T
return {plan};
}

NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
int operatorIndex,
NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
int operatorIndex,
const THashMap<int, NJson::TJsonValue>& planIndex,
const THashMap<TString, NJson::TJsonValue>& precomputes,
int& nodeCounter) {
Expand Down Expand Up @@ -1993,14 +2010,14 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
NJson::TJsonValue newOps;
NJson::TJsonValue op;

op["Name"] = "TableLookup";
op["Name"] = "TableLookup";
op["Columns"] = plan.GetMapSafe().at("Columns");
op["LookupKeyColumns"] = plan.GetMapSafe().at("LookupKeyColumns");
op["Table"] = plan.GetMapSafe().at("Table");

if (plan.GetMapSafe().contains("E-Cost")) {
op["E-Cost"] = plan.GetMapSafe().at("E-Cost");
}
}
if (plan.GetMapSafe().contains("E-Rows")) {
op["E-Rows"] = plan.GetMapSafe().at("E-Rows");
}
Expand Down Expand Up @@ -2075,8 +2092,8 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
op.GetMapSafe().erase("Inputs");
}

if (op.GetMapSafe().contains("Input")
|| op.GetMapSafe().contains("ToFlow")
if (op.GetMapSafe().contains("Input")
|| op.GetMapSafe().contains("ToFlow")
|| op.GetMapSafe().contains("Member")
|| op.GetMapSafe().contains("AssumeSorted")
|| op.GetMapSafe().contains("Iterator")) {
Expand Down Expand Up @@ -2149,7 +2166,7 @@ double ComputeCpuTimes(NJson::TJsonValue& plan) {
}

void ComputeTotalRows(NJson::TJsonValue& plan) {

if (plan.GetMapSafe().contains("Plans")) {
for (auto& p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
ComputeTotalRows(p);
Expand Down Expand Up @@ -2201,7 +2218,7 @@ NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {
"ToFlow",
"Member",
"AssumeSorted"
};
};

THashMap<int, NJson::TJsonValue> planIndex;
THashMap<TString, NJson::TJsonValue> precomputes;
Expand Down Expand Up @@ -2237,7 +2254,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
optimizerStats["JoinsCount"] = optCtx->JoinsCount;
optimizerStats["EquiJoinsCount"] = optCtx->EquiJoinsCount;
simplifiedPlan["OptimizerStats"] = optimizerStats;
}
}
planJson["SimplifiedPlan"] = simplifiedPlan;

return planJson.GetStringRobust();
Expand Down