Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@
"Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"},
"Children": [
{"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
{"Index": 4, "Name": "GenerateColumnsIfInsert", "Type": "TCoAtomList"},
{"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,16 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis
const TKikimrTableDescription& table, TExprContext& ctx)
{
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx);
auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert");
YQL_ENSURE(generateColumnsIfInsertNode);
TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast<TCoAtomList>();

auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos())
.Table(BuildTableMeta(table, write.Pos(), ctx))
.Input(input.Ptr())
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.GenerateColumnsIfInsert(generateColumnsIfInsert)
.Done();

return effect;
Expand Down Expand Up @@ -348,6 +353,7 @@ TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomLi
.Input(input.Ptr())
.Columns(columns.Ptr())
.ReturningColumns(write.ReturningColumns())
.GenerateColumnsIfInsert<TCoAtomList>().Build()
.Done();

return effect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ NYql::NNodes::TMaybeNode<NYql::NNodes::TExprList> KqpPhyUpsertIndexEffectsImpl(T
const NYql::NNodes::TExprBase& inputRows,
const NYql::NNodes::TCoAtomList& inputColumns,
const NYql::NNodes::TCoAtomList& returningColumns,
const NYql::NNodes::TCoAtomList& columnsWithDefaults,

const NYql::TKikimrTableDescription& table, const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ TExprBase KqpBuildUpdateIndexStages(TExprBase node, TExprContext& ctx, const TKq
auto update = node.Cast<TKqlUpdateRowsIndex>();
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, update.Table().Path());

TCoAtomList empty = Build<TCoAtomList>(ctx, node.Pos()).Done();

auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::UpdateOn, update.Input(),
update.Columns(), update.ReturningColumns(), table, update.Settings(), update.Pos(), ctx);
update.Columns(), update.ReturningColumns(), empty, table, update.Settings(), update.Pos(), ctx);

if (!effects) {
return node;
Expand Down
126 changes: 42 additions & 84 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,59 +24,8 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeCurrentDefaultsForKeys(const TDqPhyPrecom
lookupColumns.push_back(atom);
}

auto lookupColumnsList = Build<TCoAtomList>(ctx, pos)
.Add(lookupColumns)
.Done();

auto lookupStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(lookupKeys)
.Build()
.Program()
.Args({"keys_list"})
.Body<TKqpLookupTable>()
.Table(BuildTableMeta(table, pos, ctx))
.LookupKeys<TCoIterator>()
.List("keys_list")
.Build()
.Columns(lookupColumnsList)
.Build()
.Build()
.Settings().Build()
.Done();

auto lookup = Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage(lookupStage)
.Index().Build("0")
.Build()
.Done();

auto lookupPayloadSelector = MakeRowsPayloadSelector(lookupColumnsList, table, lookupKeys.Pos(), ctx);
auto condenseLookupResult = CondenseInputToDictByPk(lookup, table, lookupPayloadSelector, ctx);
if (!condenseLookupResult) {
return {};
}

auto computeDictStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(condenseLookupResult->StageInputs)
.Build()
.Program()
.Args(condenseLookupResult->StageArgs)
.Body(condenseLookupResult->Stream)
.Build()
.Settings().Build()
.Done();

return Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(computeDictStage)
.Index().Build("0")
.Build()
.Build()
.Done();
return PrecomputeTableLookupDict(
lookupKeys, table, lookupColumns, pos, ctx, false);
}

TCoAtomList BuildNonDefaultColumns(
Expand Down Expand Up @@ -115,46 +64,27 @@ TCoAtomList BuildNonDefaultColumns(
.Done();
}

TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>();
if (!maybeInsertOnConlictUpdate) {
return node;
}

auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast();
YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0);
TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert();

auto input = insertOnConlictUpdate.Input();
auto pos = insertOnConlictUpdate.Input().Pos();

const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path());

auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);
auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx);
if (!lookupDict) {
return node;
}

auto nonDefaultColumns = BuildNonDefaultColumns(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, pos, ctx);
TDqStage BuildInsertOnConflictUpdateInputStage(
const TKikimrTableDescription& table,
const TCoAtomList& upsertColumns,
const TCoAtomList& columnsWithDefault,
const TDictAndKeysResult& inputDictAndKeys,
const TDqPhyPrecompute& lookupDict,
TPositionHandle pos, TExprContext& ctx)
{
auto nonDefaultColumns = BuildNonDefaultColumns(table, upsertColumns, columnsWithDefault, pos, ctx);

auto inputKeysArg = TCoArgument(ctx.NewArgument(pos, "input_keys"));
auto inputDictArg = TCoArgument(ctx.NewArgument(pos, "input_dict"));
auto inputKeyArg = TCoArgument(ctx.NewArgument(pos, "input_key"));
auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "lookup_dict"));
auto presetHandlerPayload = TCoArgument(ctx.NewArgument(pos, "payload"));

auto filterStage = Build<TDqStage>(ctx, pos)
return Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputDictAndKeys.KeysPrecompute)
.Add(inputDictAndKeys.DictPrecompute)
.Add(lookupDict.Cast())
.Add(lookupDict)
.Build()
.Program()
.Args({inputKeysArg, inputDictArg, lookupDictArg})
Expand Down Expand Up @@ -215,10 +145,38 @@ TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TK
.Build()
.Settings().Build()
.Done();
}

TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>();
if (!maybeInsertOnConlictUpdate) {
return node;
}

auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast();
YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0);
TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert();

auto input = insertOnConlictUpdate.Input();
auto pos = insertOnConlictUpdate.Input().Pos();

const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path());

auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx);
auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx);
if (!condenseResult) {
return node;
}

auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx);
auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx);
if (!lookupDict) {
return node;
}

auto newInput = Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage(filterStage)
.Stage(BuildInsertOnConflictUpdateInputStage(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, inputDictAndKeys, lookupDict.Cast(), pos, ctx))
.Index().Build("0")
.Build()
.Done();
Expand Down
74 changes: 42 additions & 32 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Done();
}

TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns,
TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns,
const THashSet<TString>& checkDefaults,
const TKikimrTableDescription& table, const TSecondaryIndexes& indexes,
TPositionHandle pos, TExprContext& ctx)
{
Expand Down Expand Up @@ -379,28 +380,37 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
}
}

if (missedKeyInput && hasUniqIndex) {
if (!hasUniqIndex) {
missedKeyInput.clear();
}

if (!missedKeyInput.empty() || !checkDefaults.empty()) {
TVector<TExprBase> columns;

TCoArgument inLambdaArg(ctx.NewArgument(pos, "in_lambda_arg"));
auto missedFromMain = TCoArgument(ctx.NewArgument(pos, "missed_from_main"));
auto lookupRowArgument = TCoArgument(ctx.NewArgument(pos, "lookup_row"));

TVector<TExprBase> resCol;
TVector<TExprBase> existingRow, nonExistingRow;
auto getterFromValue = [&ctx, &pos] (const TStringBuf& x, const TCoArgument& arg) -> TExprBase {
return Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoMember>()
.Struct(arg)
.Name().Build(x)
.Build()
.Done();
};

for (const auto& x : inputColumns) {
if (!missedKeyInput.contains(x)) {
resCol.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoMember>()
.Struct(inLambdaArg)
.Name().Build(x)
.Build()
.Done());
bool hasDefaultToCheck = checkDefaults.contains(x);
if (hasDefaultToCheck) {
existingRow.push_back(getterFromValue(x, lookupRowArgument));
} else {
existingRow.emplace_back(getterFromValue(x, inLambdaArg));
}
}

TVector<TExprBase> resNullCol = resCol;
nonExistingRow.push_back(getterFromValue(x, inLambdaArg));
}

for (const auto& x : missedKeyInput) {
auto atom = Build<TCoAtom>(ctx, pos)
Expand All @@ -411,16 +421,8 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
auto columnType = table.GetColumnType(TString(x));
YQL_ENSURE(columnType);

resCol.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoMember>()
.Struct(missedFromMain)
.Name().Build(x)
.Build()
.Done());

resNullCol.emplace_back(
existingRow.emplace_back(getterFromValue(x, lookupRowArgument));
nonExistingRow.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(x)
.Value<TCoNothing>()
Expand All @@ -436,8 +438,11 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
columns.emplace_back(atom);
}

auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx);
for(const auto& x: checkDefaults) {
columns.push_back(Build<TCoAtom>(ctx, pos).Value(x).Done());
}

auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx);
auto precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true);

TVector<TExprBase> keyLookupTuples;
Expand Down Expand Up @@ -475,13 +480,13 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
.Build()
.Build()
.PresentHandler<TCoLambda>()
.Args({missedFromMain})
.Args({lookupRowArgument})
.Body<TCoAsStruct>()
.Add(resCol)
.Add(existingRow)
.Build()
.Build()
.MissingValue<TCoAsStruct>()
.Add(resNullCol)
.Add(nonExistingRow)
.Build()
.Build()
.Build()
Expand Down Expand Up @@ -595,7 +600,7 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
} // namespace

TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows,
const TCoAtomList& inputColumns, const TCoAtomList& returningColumns, const TKikimrTableDescription& table,
const TCoAtomList& inputColumns, const TCoAtomList& returningColumns, const TCoAtomList& columnsWithDefaults, const TKikimrTableDescription& table,
const TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, TPositionHandle pos, TExprContext& ctx)
{
switch (mode) {
Expand All @@ -614,10 +619,15 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
inputColumnsSet.emplace(column.Value());
}

THashSet<TString> columnsWithDefaultsSet;
for(const auto& column: columnsWithDefaults) {
columnsWithDefaultsSet.emplace(column.Value());
}

auto filter = (mode == TKqpPhyUpsertIndexMode::UpdateOn) ? &inputColumnsSet : nullptr;
const auto indexes = BuildSecondaryIndexVector(table, pos, ctx, filter);

auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, table, indexes, pos, ctx);
auto checkedInput = RewriteInputForConstraint(inputRows, inputColumnsSet, columnsWithDefaultsSet, table, indexes, pos, ctx);

if (!checkedInput) {
return {};
Expand Down Expand Up @@ -901,7 +911,7 @@ TExprBase KqpBuildUpsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, upsert.Table().Path());

auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::Upsert, upsert.Input(), upsert.Columns(),
upsert.ReturningColumns(), table, upsert.Settings(), upsert.Pos(), ctx);
upsert.ReturningColumns(), upsert.GenerateColumnsIfInsert(), table, upsert.Settings(), upsert.Pos(), ctx);

if (!effects) {
return node;
Expand Down
Loading