Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDq
TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecompute& inputRows,
const TDqPhyPrecompute& lookupDict, const THashSet<TStringBuf>& inputColumns,
const THashSet<TStringBuf>& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos,
TExprContext& ctx)
TExprContext& ctx, bool opt)
{
// Check if we can update index table from just input data
bool allColumnFromInput = true; // - indicate all data from input
Expand Down Expand Up @@ -250,14 +250,18 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Build()
.Done()
);
TExprNode::TPtr member = payload.Ptr();
if (opt) {
member = Build<TCoNth>(ctx, pos)
.Tuple(member)
.Index().Build(0)
.Done().Ptr();
}
presentKeyRow.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name(columnAtom)
.Value<TCoMember>()
.Struct<TCoNth>()
.Tuple(payload)
.Index().Build(0)
.Build()
.Struct(member)
.Name(columnAtom)
.Build()
.Done()
Expand All @@ -269,19 +273,29 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Add(presentKeyRow)
.Done();

TExprNode::TPtr b;

if (opt) {
b = Build<TCoFlatOptionalIf>(ctx, pos)
.Predicate<TCoNth>()
.Tuple(payload)
.Index().Build(1)
.Build()
.Value<TCoJust>()
.Input(presentKeyRowStruct)
.Build()
.Done().Ptr();
} else {
b = Build<TCoJust>(ctx, pos)
.Input(presentKeyRowStruct)
.Done().Ptr();
}

TExprBase flatmapBody = Build<TCoIfPresent>(ctx, pos)
.Optional(lookup)
.PresentHandler<TCoLambda>()
.Args(payload)
.Body<TCoFlatOptionalIf>()
.Predicate<TCoNth>()
.Tuple(payload)
.Index().Build(1)
.Build()
.Value<TCoJust>()
.Input(presentKeyRowStruct)
.Build()
.Build()
.Body(b)
.Build()
.MissingValue<TCoJust>()
.Input<TCoAsStruct>()
Expand Down Expand Up @@ -531,12 +545,18 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
auto indexTableColumnsWithoutData = indexTableColumns;

bool indexDataColumnsUpdated = false;
bool optUpsert = true;
for (const auto& column : indexDesc->DataColumns) {
// TODO: Conder not fetching/updating data columns without input value.
YQL_ENSURE(indexTableColumns.emplace(column).second);

if (inputColumnsSet.contains(column)) {
indexDataColumnsUpdated = true;
// 'skip index update' optimization checks given value equal to saved one
// so the type must be equatable to perform it
auto t = table.GetColumnType(column);
YQL_ENSURE(t);
optUpsert &= t->IsEquatable();
}
}

Expand Down Expand Up @@ -695,7 +715,9 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,

if (indexKeyColumnsUpdated) {
// Have to delete old index value from index table in case when index key columns were updated
auto deleteIndexKeys = MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx);
auto deleteIndexKeys = optUpsert
? MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx)
: MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumnsWithoutData, pos, ctx);

auto indexDelete = Build<TKqlDeleteRows>(ctx, pos)
.Table(tableNode)
Expand All @@ -711,8 +733,11 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
needIndexTableUpdate = needIndexTableUpdate || indexKeyColumnsUpdated || indexDataColumnsUpdated;

if (needIndexTableUpdate) {
auto upsertIndexRows = MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed,
inputColumnsSet, indexTableColumns, table, pos, ctx);
auto upsertIndexRows = optUpsert
? MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed,
inputColumnsSet, indexTableColumns, table, pos, ctx, true)
: MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(),
inputColumnsSet, indexTableColumns, table, pos, ctx, false);

auto indexUpsert = Build<TKqlUpsertRows>(ctx, pos)
.Table(tableNode)
Expand Down
68 changes: 68 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,74 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
}
}

void CheckUpsertNonEquatableType(bool notNull) {
auto kqpSetting = NKikimrKqp::TKqpSetting();
kqpSetting.SetName("_KqpYqlSyntaxVersion");
kqpSetting.SetValue("1");

auto settings = TKikimrSettings()
.SetKqpSettings({kqpSetting});
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TString createTableSql = R"(
CREATE TABLE `TableWithJson` (
id Int64,
name Utf8,
slug Json %s,
parent_id Int64,
PRIMARY KEY (id),
INDEX json_parent_id_index GLOBAL ON (parent_id, id) COVER (name, slug)
);
)";

createTableSql = Sprintf(createTableSql.data(), notNull ? "NOT NULL" : "");

{
auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}

const TString query = R"(
UPSERT INTO `TableWithJson` (
id,
name,
slug,
parent_id
)
VALUES (
1,
'Q',
JSON(@@"dispenser"@@),
666
);
)";

auto result = session.ExecuteDataQuery(
query,
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())
.ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

{
const auto& yson = ReadTablePartToYson(session, "/Root/TableWithJson");
const TString expected = R"([[[1];["Q"];["\"dispenser\""];[666]]])";
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
}

{
const auto& yson = ReadTablePartToYson(session, "/Root/TableWithJson/json_parent_id_index/indexImplTable");
const TString expected = R"([[[666];[1];["Q"];["\"dispenser\""]]])";
UNIT_ASSERT_VALUES_EQUAL(yson, expected);
}
}

Y_UNIT_TEST_TWIN(CheckUpsertNonEquatableType, NotNull) {
CheckUpsertNonEquatableType(NotNull);
}

Y_UNIT_TEST(UniqAndNoUniqSecondaryIndexWithCover) {
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
Expand Down