diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 113b59a5f5d2..df0a9834dfde 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -261,6 +261,16 @@ {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] }, + { + "Name": "TKqlInsertOnConflictUpdateRows", + "Base": "TKqlUpsertRowsBase", + "Match": {"Type": "Callable", "Name": "KqlInsertOnConflictUpdateRows"}, + "Children": [ + {"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "GenerateColumnsIfInsert", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] + }, { "Name": "TKqlUpsertRowsIndex", "Base": "TKqlUpsertRowsBase", diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index c635664012f2..dc4305e6e6bb 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -553,7 +553,12 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const itemType = input->GetTypeAnn()->Cast()->GetItemType(); isStream = true; } else { - YQL_ENSURE(TKqlUpsertRows::Match(node.Get()) || TKqlUpsertRowsIndex::Match(node.Get())); + + YQL_ENSURE( + TKqlUpsertRows::Match(node.Get()) || + TKqlUpsertRowsIndex::Match(node.Get()) || + TKqlInsertOnConflictUpdateRows::Match(node.Get()) + ); if (!EnsureListType(*input, ctx)) { return TStatus::Error; diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index c3a26241209b..50a02406d19f 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -283,7 +283,21 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC const TCoAtomList& autoincrement, const TKikimrTableDescription& table, TExprContext& ctx) { + auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert"); + YQL_ENSURE(generateColumnsIfInsertNode); + TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast(); + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); + if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) { + return Build(ctx, write.Pos()) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) + .ReturningColumns(write.ReturningColumns()) + .GenerateColumnsIfInsert(generateColumnsIfInsert) + .Done(); + } + auto effect = Build(ctx, write.Pos()) .Table(BuildTableMeta(table, write.Pos(), ctx)) .Input(input.Ptr()) diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h index 5ed944c27ead..d35c4718d091 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpBuildUpdateStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp new file mode 100644 index 000000000000..48e25ba53013 --- /dev/null +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp @@ -0,0 +1,235 @@ +#include "kqp_opt_phy_effects_rules.h" +#include "kqp_opt_phy_effects_impl.h" + +using namespace NYql; +using namespace NYql::NNodes; + +namespace NKikimr::NKqp::NOpt { + +TMaybeNode PrecomputeCurrentDefaultsForKeys(const TDqPhyPrecompute& lookupKeys, + const TCoAtomList& columnsWithDefault, + const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) +{ + TVector lookupColumns; + + for(const auto& key: table.Metadata->KeyColumnNames) { + auto atom = Build(ctx, pos) + .Value(key) + .Done(); + + lookupColumns.emplace_back(std::move(atom)); + } + + for(const auto& atom: columnsWithDefault) { + lookupColumns.push_back(atom); + } + + auto lookupColumnsList = Build(ctx, pos) + .Add(lookupColumns) + .Done(); + + auto lookupStage = Build(ctx, pos) + .Inputs() + .Add(lookupKeys) + .Build() + .Program() + .Args({"keys_list"}) + .Body() + .Table(BuildTableMeta(table, pos, ctx)) + .LookupKeys() + .List("keys_list") + .Build() + .Columns(lookupColumnsList) + .Build() + .Build() + .Settings().Build() + .Done(); + + auto lookup = Build(ctx, pos) + .Output() + .Stage(lookupStage) + .Index().Build("0") + .Build() + .Done(); + + auto lookupPayloadSelector = MakeRowsPayloadSelector(lookupColumnsList, table, lookupKeys.Pos(), ctx); + auto condenseLookupResult = CondenseInputToDictByPk(lookup, table, lookupPayloadSelector, ctx); + if (!condenseLookupResult) { + return {}; + } + + auto computeDictStage = Build(ctx, pos) + .Inputs() + .Add(condenseLookupResult->StageInputs) + .Build() + .Program() + .Args(condenseLookupResult->StageArgs) + .Body(condenseLookupResult->Stream) + .Build() + .Settings().Build() + .Done(); + + return Build(ctx, pos) + .Connection() + .Output() + .Stage(computeDictStage) + .Index().Build("0") + .Build() + .Build() + .Done(); +} + +TCoAtomList BuildNonDefaultColumns( + const TKikimrTableDescription& table, + const TCoAtomList& allColumns, + const TCoAtomList& columnsWithDefault, + TPositionHandle pos, TExprContext& ctx) +{ + TVector columnsToUpdateSet; + std::unordered_set unchangedColumns; + + for(const auto& column: columnsWithDefault) { + unchangedColumns.emplace(TString(column)); + } + + for(const TString& key: table.Metadata->KeyColumnNames) { + unchangedColumns.emplace(key); + } + + for (const auto& column : allColumns) { + auto colName = TString(column); + auto it = unchangedColumns.find(colName); + if (it != unchangedColumns.end()) { + continue; + } + + auto atom = Build(ctx, pos) + .Value(colName) + .Done(); + + columnsToUpdateSet.emplace_back(std::move(atom)); + } + + return Build(ctx, pos) + .Add(columnsToUpdateSet) + .Done(); +} + +TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + auto maybeInsertOnConlictUpdate = node.Maybe(); + if (!maybeInsertOnConlictUpdate) { + return node; + } + + auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast(); + YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0); + TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert(); + + auto input = insertOnConlictUpdate.Input(); + auto pos = insertOnConlictUpdate.Input().Pos(); + + const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path()); + + auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx); + auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx); + if (!condenseResult) { + return node; + } + + auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx); + auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx); + if (!lookupDict) { + return node; + } + + auto nonDefaultColumns = BuildNonDefaultColumns(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, pos, ctx); + + auto inputKeysArg = TCoArgument(ctx.NewArgument(pos, "input_keys")); + auto inputDictArg = TCoArgument(ctx.NewArgument(pos, "input_dict")); + auto inputKeyArg = TCoArgument(ctx.NewArgument(pos, "input_key")); + auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "lookup_dict")); + auto presetHandlerPayload = TCoArgument(ctx.NewArgument(pos, "payload")); + + auto filterStage = Build(ctx, pos) + .Inputs() + .Add(inputDictAndKeys.KeysPrecompute) + .Add(inputDictAndKeys.DictPrecompute) + .Add(lookupDict.Cast()) + .Build() + .Program() + .Args({inputKeysArg, inputDictArg, lookupDictArg}) + .Body() + .List() + .Input(inputKeysArg) + .Lambda() + .Args(inputKeyArg) + .Body() + .Optional() + .Collection(lookupDictArg) + .Lookup(inputKeyArg) + .Build() + .PresentHandler() + .Args(presetHandlerPayload) + .Body() + .Add() + .Name().Build("") + .Value(presetHandlerPayload) + .Build() + .Add() + .Name().Build("") + .Value() + .Optional() + .Input() + .Collection(inputDictArg) + .Lookup(inputKeyArg) + .Build() + .Members(nonDefaultColumns) + .Build() + .Build() + .Build() + .Add() + .Name().Build("") + .Value(inputKeyArg) + .Build() + .Build() + .Build() + .MissingValue() + .Add() + .Name().Build("") + .Value() + .Optional() + .Collection(inputDictArg) + .Lookup(inputKeyArg) + .Build() + .Build() + .Build() + .Add() + .Name().Build("") + .Value(inputKeyArg) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + auto newInput = Build(ctx, pos) + .Output() + .Stage(filterStage) + .Index().Build("0") + .Build() + .Done(); + + return Build(ctx, insertOnConlictUpdate.Pos()) + .Input(newInput.Ptr()) + .Table(insertOnConlictUpdate.Table()) + .Columns(insertOnConlictUpdate.Columns()) + .Settings(insertOnConlictUpdate.Settings()) + .ReturningColumns(insertOnConlictUpdate.ReturningColumns()) + .Done(); +} + +} // namespace NKikimr::NKqp::NOpt \ No newline at end of file diff --git a/ydb/core/kqp/opt/physical/effects/ya.make b/ydb/core/kqp/opt/physical/effects/ya.make index 68e6e7482f01..f0c0f0e0d327 100644 --- a/ydb/core/kqp/opt/physical/effects/ya.make +++ b/ydb/core/kqp/opt/physical/effects/ya.make @@ -11,6 +11,7 @@ SRCS( kqp_opt_phy_update.cpp kqp_opt_phy_upsert_index.cpp kqp_opt_phy_returning.cpp + kqp_opt_phy_upsert_defaults.cpp ) PEERDIR( diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 8167c1759508..330bcd400f93 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -68,6 +68,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage)); AddHandler(0, &TKqlInsertRows::Match, HNDL(BuildInsertStages)); AddHandler(0, &TKqlUpdateRows::Match, HNDL(BuildUpdateStages)); + AddHandler(0, &TKqlInsertOnConflictUpdateRows::Match, HNDL(RewriteGenerateIfInsert)); AddHandler(0, &TKqlUpdateRowsIndex::Match, HNDL(BuildUpdateIndexStages)); AddHandler(0, &TKqlUpsertRowsIndex::Match, HNDL(BuildUpsertIndexStages)); AddHandler(0, &TKqlInsertRowsIndex::Match, HNDL(BuildInsertIndexStages)); @@ -144,6 +145,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { return output; } + TMaybeNode RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx); + DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode BuildReadTableStage(TExprBase node, TExprContext& ctx) { TExprBase output = KqpBuildReadTableStage(node, ctx, KqpCtx); DumpAppliedRule("BuildReadTableStage", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 14d28479dace..2735e8a12ffa 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1095,7 +1095,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformeradd_column(); - columnBuild->SetColumnName(TString(constraint.Name().Value())); + columnBuild->SetColumnName(TString(columnName)); FillLiteralProto(constraint.Value().Cast(), *columnBuild->mutable_default_from_literal()); } } diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index abacd671c757..67c4396f9492 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -415,6 +415,8 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer defaultConstraintColumnsSet.emplace(keyColumnName); } + THashSet generateColumnsIfInsertColumnsSet; + for(const auto& [name, info] : table->Metadata->Columns) { if (rowType->FindItem(name)) { continue; @@ -424,7 +426,15 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer continue; } + if (defaultConstraintColumnsSet.find(name) != defaultConstraintColumnsSet.end()) { + continue; + } + if (info.IsDefaultKindDefined()) { + if (op == TYdbOperation::Upsert) { + generateColumnsIfInsertColumnsSet.emplace(name); + } + defaultConstraintColumnsSet.emplace(name); } } @@ -485,6 +495,11 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer defaultConstraintColumns.push_back(ctx.NewAtom(node.Pos(), generatedColumn)); } + TExprNode::TListType generateColumnsIfInsert; + for(auto& generatedColumn: generateColumnsIfInsertColumnsSet) { + generateColumnsIfInsert.push_back(ctx.NewAtom(node.Pos(), generatedColumn)); + } + node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build(ctx, node.Pos()) .Add(node.Settings()) .Add() @@ -499,6 +514,12 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer .Add(defaultConstraintColumns) .Build() .Build() + .Add() + .Name().Build("generate_columns_if_insert") + .Value() + .Add(generateColumnsIfInsert) + .Build() + .Build() .Done() .Ptr(); diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 5a8e89b75436..249a8a205d49 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -491,11 +491,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { result.GetIssues().ToString()); } - { - TString query = R"( - UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); - )"; - + auto fQuery = [&](TString query) -> TString { NYdb::NTable::TExecDataQuerySettings execSettings; execSettings.KeepInQueryCache(true); execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); @@ -508,12 +504,32 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } + if (result.GetResultSets().size() > 0) + return NYdb::FormatResultSetYson(result.GetResultSet(0)); + return ""; + }; + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); + )"); + + auto fCompareTable = [&](TString expected) { + TString query = R"( + SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; + )"; + CompareYson(expected, fQuery(query)); + }; + + fCompareTable(R"( + [ + [[1u];["Old"]] + ] + )"); { auto query = R"( --!syntax_v1 - ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Exists Int32 NOT NULL DEFAULT 1; + ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Value2 Int32 NOT NULL DEFAULT 1; )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -523,51 +539,40 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { Sleep(TDuration::Seconds(3)); - { - TString query = R"( - INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); - )"; - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.KeepInQueryCache(true); - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = - session - .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), - execSettings) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, - result.GetIssues().ToString()); - } - - { - TString query = R"( - SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; - )"; - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.KeepInQueryCache(true); - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = - session - .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), - execSettings) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, - result.GetIssues().ToString()); - - Cerr << NYdb::FormatResultSetYson(result.GetResultSet(0)) << Endl; - CompareYson(R"( - [ - [[1u];["Old"];[1]];[[2u];["New"];[1]] - ] - )", - NYdb::FormatResultSetYson(result.GetResultSet(0))); - } + fQuery(R"( + INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["New"];[1]] + ] + )"); + + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value, Value2) VALUES (2, "New", 2); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["New"];[2]] + ] + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "OldNew"); + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (3, "BrandNew"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["OldNew"];[2]];[[3u];["BrandNew"];[1]] + ] + )"); }