diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 3c1ea590f5f2..611c07a37f2a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -74,6 +74,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput return WrapDqHashCombine(callable, ctx); } + if (name == "FulltextAnalyze"sv) { + return WrapFulltextAnalyze(callable, ctx); + } + return nullptr; }; } diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 0f576600dfd3..8c65edd0fb92 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -760,6 +760,15 @@ {"Index": 3, "Name": "Message", "Type": "TExprBase"} ] }, + { + "Name": "TFulltextAnalyze", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "FulltextAnalyze"}, + "Children": [ + {"Index": 0, "Name": "Text", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Settings", "Type": "TCoAtom"} + ] + }, { "Name": "TKqpOpRead", "Base": "TCallable", diff --git a/ydb/core/kqp/opt/kqp_type_ann.cpp b/ydb/core/kqp/opt/kqp_type_ann.cpp index 0697bcceba65..f3f19b243b61 100644 --- a/ydb/core/kqp/opt/kqp_type_ann.cpp +++ b/ydb/core/kqp/opt/kqp_type_ann.cpp @@ -1760,6 +1760,54 @@ TStatus AnnotateKqpEnsure(const TExprNode::TPtr& node, TExprContext& ctx) { return TStatus::Ok; } +TStatus AnnotateFulltextAnalyze(const TExprNode::TPtr& node, TExprContext& ctx) { + if (!EnsureArgsCount(*node, 2, ctx)) { + return TStatus::Error; + } + + // First argument: text (should be String or Utf8) + const auto* textArg = node->Child(0); + if (!EnsureComputable(*textArg, ctx)) { + return TStatus::Error; + } + + const TDataExprType* textDataType; + bool isOptional; + if (!EnsureDataOrOptionalOfData(*textArg, isOptional, textDataType, ctx)) { + return TStatus::Error; + } + + if (textDataType->GetSlot() != EDataSlot::String && textDataType->GetSlot() != EDataSlot::Utf8) { + ctx.AddError(TIssue(ctx.GetPosition(textArg->Pos()), TStringBuilder() + << "Expected String or Utf8 for text argument, but got: " << *textArg->GetTypeAnn())); + return TStatus::Error; + } + + // Second argument: settings (should be String - serialized proto) + const auto* settingsArg = node->Child(1); + if (!EnsureComputable(*settingsArg, ctx)) { + return TStatus::Error; + } + + const TDataExprType* settingsDataType; + if (!EnsureDataOrOptionalOfData(*settingsArg, isOptional, settingsDataType, ctx)) { + return TStatus::Error; + } + + if (settingsDataType->GetSlot() != EDataSlot::String) { + ctx.AddError(TIssue(ctx.GetPosition(settingsArg->Pos()), TStringBuilder() + << "Expected String for settings argument, but got: " << *settingsArg->GetTypeAnn())); + return TStatus::Error; + } + + // Return type: List + auto stringType = ctx.MakeType(EDataSlot::String); + auto listType = ctx.MakeType(stringType); + node->SetTypeAnn(listType); + + return TStatus::Ok; +} + TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) { @@ -2577,6 +2625,10 @@ TAutoPtr CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateKqpEnsure(input, ctx); } + if (TFulltextAnalyze::Match(input.Get())) { + return AnnotateFulltextAnalyze(input, ctx); + } + if (TKqpReadRangesSourceSettings::Match(input.Get())) { return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index 95916f6cc9ad..e743eaedea06 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -106,8 +106,7 @@ NYql::NNodes::TExprBase BuildVectorIndexPostingRows(const NYql::TKikimrTableDesc bool withData, NYql::TPositionHandle pos, NYql::TExprContext& ctx); -TVector BuildVectorIndexPostingColumns(const NYql::TKikimrTableDescription& table, - const NYql::TIndexDescription* indexDesc); +TVector BuildVectorIndexPostingColumns(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc); NYql::NNodes::TExprBase BuildVectorIndexPrefixRows(const NYql::TKikimrTableDescription& table, const NYql::TKikimrTableDescription& prefixTable, bool withData, const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows, @@ -118,4 +117,10 @@ std::pair BuildVectorIndexPref const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows, TVector& indexTableColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); +NYql::NNodes::TExprBase BuildFulltextIndexRows(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc, + const NYql::NNodes::TExprBase& inputRows, const THashSet& inputColumns, const TVector& indexTableColumns, + NYql::TPositionHandle pos, NYql::TExprContext& ctx); + +TVector BuildFulltextIndexColumns(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc); + } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_fulltext_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_fulltext_index.cpp new file mode 100644 index 000000000000..8f599a996c10 --- /dev/null +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_fulltext_index.cpp @@ -0,0 +1,154 @@ +#include "kqp_opt_phy_effects_impl.h" +#include + +namespace NKikimr::NKqp::NOpt { + +using namespace NYql; +using namespace NYql::NDq; +using namespace NYql::NNodes; + +TExprBase BuildFulltextIndexRows(const TKikimrTableDescription& table, const TIndexDescription* indexDesc, + const NNodes::TExprBase& inputRows, const THashSet& inputColumns, const TVector& indexTableColumns, + TPositionHandle pos, NYql::TExprContext& ctx) +{ + // Extract fulltext index settings + const auto* fulltextDesc = std::get_if(&indexDesc->SpecializedIndexDescription); + YQL_ENSURE(fulltextDesc, "Expected fulltext index description"); + + const auto& settings = fulltextDesc->GetSettings(); + YQL_ENSURE(settings.columns().size() == 1, "Expected single text column in fulltext index"); + + const TString textColumn = settings.columns().at(0).column(); + const auto& analyzers = settings.columns().at(0).analyzers(); + + // Serialize analyzer settings for runtime usage + TString settingsProto; + YQL_ENSURE(analyzers.SerializeToString(&settingsProto)); + + auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row")); + auto tokenArg = TCoArgument(ctx.NewArgument(pos, "token")); + + // Build output row structure for each token + TVector tokenRowTuples; + + // Add token column (first column in fulltext index) + auto tokenTuple = Build(ctx, pos) + .Name().Build(NTableIndex::NFulltext::TokenColumn) + .Value(tokenArg) + .Done(); + tokenRowTuples.emplace_back(tokenTuple); + + // Add all other columns (primary key + data columns) + for (const auto& column : indexTableColumns) { + auto columnAtom = ctx.NewAtom(pos, column); + + if (inputColumns.contains(column)) { + auto tuple = Build(ctx, pos) + .Name(columnAtom) + .Value() + .Struct(inputRowArg) + .Name(columnAtom) + .Build() + .Done(); + + tokenRowTuples.emplace_back(tuple); + } else { + auto columnType = table.GetColumnType(TString(column)); + + auto tuple = Build(ctx, pos) + .Name(columnAtom) + .Value() + .OptionalType(NCommon::BuildTypeExpr(pos, *columnType, ctx)) + .Build() + .Done(); + + tokenRowTuples.emplace_back(tuple); + } + } + + // Create lambda that builds output row for each token + // FlatMap expects lambda to return list/stream/optional, so wrap struct in Just + auto tokenRowsLambda = Build(ctx, pos) + .Args({tokenArg}) + .Body() + .Input() + .Add(tokenRowTuples) + .Build() + .Build() + .Done(); + + // Get text member from input row + auto textMember = Build(ctx, pos) + .Struct(inputRowArg) + .Name().Build(textColumn) + .Done(); + + // Create callable for fulltext tokenization + // Format: FulltextAnalyze(text: String, settings: String) -> List + auto settingsLiteral = Build(ctx, pos) + .Literal().Build(settingsProto) + .Done(); + + auto analyzeCallable = ctx.Builder(pos) + .Callable("FulltextAnalyze") + .Add(0, textMember.Ptr()) + .Add(1, settingsLiteral.Ptr()) + .Seal() + .Build(); + + auto analyzeStage = Build(ctx, pos) + .Inputs() + .Add(inputRows) + .Build() + .Program() + .Args({"rows"}) + .Body() + .List() + .Input("rows") + .Lambda() + .Args({inputRowArg}) + .Body() + .Input(analyzeCallable) + .Lambda(tokenRowsLambda) + .Build() + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + return Build(ctx, pos) + .Output() + .Stage(analyzeStage) + .Index().Build("0") + .Build() + .Done(); +} + +TVector BuildFulltextIndexColumns(const TKikimrTableDescription& table, const TIndexDescription* indexDesc) { + TVector indexTableColumns; + THashSet indexTableColumnSet; + + // Add token column first (replaces the text column) + indexTableColumns.emplace_back(NTableIndex::NFulltext::TokenColumn); + indexTableColumnSet.insert(NTableIndex::NFulltext::TokenColumn); + + // Add primary key columns + for (const auto& column : table.Metadata->KeyColumnNames) { + if (indexTableColumnSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } + } + + // Add data columns (covered columns) + for (const auto& column : indexDesc->DataColumns) { + if (indexTableColumnSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } + } + + return indexTableColumns; +} + +} diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp index 401757c79935..210ac9c0bf2d 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp @@ -1,4 +1,5 @@ #include +#include #include "kqp_opt_phy_effects_rules.h" #include "kqp_opt_phy_effects_impl.h" @@ -195,31 +196,51 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq } } - auto upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns, - insert.Pos(), ctx, true); - - if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) { - if (indexDesc->KeyColumns.size() > 1) { - // First resolve prefix IDs using StreamLookup - const auto& prefixTable = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, TStringBuilder() << insert.Table().Path().Value() - << "/" << indexDesc->Name << "/" << NKikimr::NTableIndex::NKMeans::PrefixTable); - if (prefixTable.Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) { - auto res = BuildVectorIndexPrefixRowsWithNew(table, prefixTable, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx); - upsertIndexRows = std::move(res.first); - effects.emplace_back(std::move(res.second)); - } else { - // Handle old prefixed vector index tables without the sequence - upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx); + std::optional upsertIndexRows; + switch (indexDesc->Type) { + case TIndexDescription::EType::GlobalSync: + case TIndexDescription::EType::GlobalAsync: + case TIndexDescription::EType::GlobalSyncUnique: { + upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns, + insert.Pos(), ctx, true); + break; + } + case TIndexDescription::EType::GlobalSyncVectorKMeansTree: { + upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns, + insert.Pos(), ctx, true); + if (indexDesc->KeyColumns.size() > 1) { + // First resolve prefix IDs using StreamLookup + const auto& prefixTable = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, TStringBuilder() << insert.Table().Path().Value() + << "/" << indexDesc->Name << "/" << NKikimr::NTableIndex::NKMeans::PrefixTable); + if (prefixTable.Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) { + auto res = BuildVectorIndexPrefixRowsWithNew(table, prefixTable, indexDesc, upsertIndexRows.value(), indexTableColumns, insert.Pos(), ctx); + upsertIndexRows = std::move(res.first); + effects.emplace_back(std::move(res.second)); + } else { + // Handle old prefixed vector index tables without the sequence + upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows.value(), indexTableColumns, insert.Pos(), ctx); + } } + upsertIndexRows = BuildVectorIndexPostingRows(table, insert.Table(), indexDesc->Name, indexTableColumns, + upsertIndexRows.value(), true, insert.Pos(), ctx); + indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc); + break; + } + case TIndexDescription::EType::GlobalFulltext: { + // For fulltext indexes, we need to tokenize the text and create index rows + upsertIndexRows = BuildFulltextIndexRows(table, indexDesc, insertRowsPrecompute, inputColumnsSet, indexTableColumns, + insert.Pos(), ctx); + // Update columns to reflect transformation: text column -> __ydb_token + indexTableColumns = BuildFulltextIndexColumns(table, indexDesc); + break; } - upsertIndexRows = BuildVectorIndexPostingRows(table, insert.Table(), indexDesc->Name, indexTableColumns, - upsertIndexRows, true, insert.Pos(), ctx); - indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc); } + Y_ENSURE(upsertIndexRows.has_value()); + Y_ENSURE(indexTableColumns); auto upsertIndex = Build(ctx, insert.Pos()) .Table(tableNode) - .Input(upsertIndexRows) + .Input(upsertIndexRows.value()) .Columns(BuildColumnsList(indexTableColumns, insert.Pos(), ctx)) .ReturningColumns().Build() .IsBatch(ctx.NewAtom(insert.Pos(), "false")) diff --git a/ydb/core/kqp/opt/physical/effects/ya.make b/ydb/core/kqp/opt/physical/effects/ya.make index 8ba379493b43..7fa1ebfb443a 100644 --- a/ydb/core/kqp/opt/physical/effects/ya.make +++ b/ydb/core/kqp/opt/physical/effects/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( kqp_opt_phy_delete_index.cpp kqp_opt_phy_effects.cpp + kqp_opt_phy_fulltext_index.cpp kqp_opt_phy_indexes.cpp kqp_opt_phy_insert_index.cpp kqp_opt_phy_insert.cpp diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index 6abb3fc478e3..4e21798455a6 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -515,6 +516,16 @@ TIntrusivePtr CreateKqlCompiler(const TKqlCompileContext& return ctx.PgmBuilder().DqHashCombine(flow, memLimit, keyExtractor, init, update, finish); }); + compiler->AddCallable("FulltextAnalyze", + [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { + YQL_ENSURE(node.ChildrenSize() == 2, "FulltextAnalyze should have 2 arguments: text and settings"); + + auto textArg = MkqlBuildExpr(*node.Child(0), buildCtx); + auto settingsArg = MkqlBuildExpr(*node.Child(1), buildCtx); + + return ctx.PgmBuilder().FulltextAnalyze(textArg, settingsArg); + }); + return compiler; } diff --git a/ydb/core/kqp/runtime/kqp_compute.cpp b/ydb/core/kqp/runtime/kqp_compute.cpp index d2ffcf73f2a7..bee0cb0e4913 100644 --- a/ydb/core/kqp/runtime/kqp_compute.cpp +++ b/ydb/core/kqp/runtime/kqp_compute.cpp @@ -1,5 +1,6 @@ #include "kqp_compute.h" #include "kqp_stream_lookup_join_helpers.h" +#include "kqp_fulltext_analyze.h" #include #include diff --git a/ydb/core/kqp/runtime/kqp_compute.h b/ydb/core/kqp/runtime/kqp_compute.h index 9065325a2766..a13e62feac7f 100644 --- a/ydb/core/kqp/runtime/kqp_compute.h +++ b/ydb/core/kqp/runtime/kqp_compute.h @@ -49,6 +49,7 @@ class TKqpEnsureFail : public yexception { IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapFulltextAnalyze(TCallable& callable, const TComputationNodeFactoryContext& ctx); } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_fulltext_analyze.cpp b/ydb/core/kqp/runtime/kqp_fulltext_analyze.cpp new file mode 100644 index 000000000000..aaaf8cea8aae --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_fulltext_analyze.cpp @@ -0,0 +1,104 @@ +#include "kqp_fulltext_analyze.h" + +#include + +#include +#include +#include +#include + +namespace NKikimr { +namespace NMiniKQL { + +using namespace NKikimr::NFulltext; + +namespace { + +class TFulltextAnalyzeWrapper : public TMutableComputationNode { + typedef TMutableComputationNode TBaseComputation; + + struct TSettings : public TComputationValue { + using TComputationValue::TComputationValue; + + bool IsValid = false; + Ydb::Table::FulltextIndexSettings::Analyzers Analyzers; + }; + +public: + TFulltextAnalyzeWrapper(TComputationMutables& mutables, IComputationNode* textArg, IComputationNode* settingsArg) + : TBaseComputation(mutables) + , TextArg(textArg) + , SettingsArg(settingsArg) + , CachedSettingsIndex(mutables.CurValueIndex++) + { + } + + NUdf::TUnboxedValue DoCalculate(TComputationContext& ctx) const { + auto text = TextArg->GetValue(ctx); + if (!text) { + // If text is null/empty, return empty list + return ctx.HolderFactory.GetEmptyContainerLazy(); + } + + auto& settings = GetSettings(ctx); + if (!settings.IsValid) { + // Failed to parse settings, return empty list + return ctx.HolderFactory.GetEmptyContainerLazy(); + } + + // Tokenize text using NKikimr::NFulltext::Analyze + TVector tokens = Analyze(TString(text.AsStringRef()), settings.Analyzers); + + // Convert tokens to TUnboxedValue list + NUdf::TUnboxedValue* items = nullptr; + auto result = ctx.HolderFactory.CreateDirectArrayHolder(tokens.size(), items); + for (size_t i = 0; i < tokens.size(); ++i) { + items[i] = MakeString(tokens[i]); + } + + return result; + } + + TSettings& GetSettings(TComputationContext& ctx) const { + auto& cachedSettings = ctx.MutableValues[CachedSettingsIndex]; + if (cachedSettings.IsInvalid()) { + // First time - get, parse, and cache the settings + cachedSettings = ctx.HolderFactory.Create(); + auto& settings = GetSettings(cachedSettings); + auto settingsProto = SettingsArg->GetValue(ctx); + settings.IsValid = settings.Analyzers.ParseFromString(settingsProto.AsStringRef()); + return settings; + } else { + // Return cached settings + return GetSettings(cachedSettings); + } + } + + TSettings& GetSettings(auto& cachedSettings) const { + return *static_cast(cachedSettings.AsBoxed().Get()); + } + +private: + void RegisterDependencies() const final { + DependsOn(TextArg); + DependsOn(SettingsArg); + } + + IComputationNode* const TextArg; + IComputationNode* const SettingsArg; + const ui32 CachedSettingsIndex; +}; + +} // namespace + +IComputationNode* WrapFulltextAnalyze(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 2, "FulltextAnalyze requires exactly 2 arguments"); + + auto textArg = LocateNode(ctx.NodeLocator, callable, 0); + auto settingsArg = LocateNode(ctx.NodeLocator, callable, 1); + + return new TFulltextAnalyzeWrapper(ctx.Mutables, textArg, settingsArg); +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_fulltext_analyze.h b/ydb/core/kqp/runtime/kqp_fulltext_analyze.h new file mode 100644 index 000000000000..470a71d3c4f4 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_fulltext_analyze.h @@ -0,0 +1,11 @@ +#pragma once + +#include + +namespace NKikimr { +namespace NMiniKQL { + +IComputationNode* WrapFulltextAnalyze(TCallable& callable, const TComputationNodeFactoryContext& ctx); + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index 109833dd0670..8af483845144 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -354,5 +354,39 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TKqpProgramBuilder::FulltextAnalyze(TRuntimeNode text, TRuntimeNode settings) +{ + // Validate text argument - should be a string or optional string + const auto& textType = text.GetStaticType(); + const TDataType* textDataType = nullptr; + + if (textType->IsOptional()) { + auto optionalType = static_cast(textType); + auto itemType = optionalType->GetItemType(); + MKQL_ENSURE(itemType->IsData(), "Expected data type inside optional for text."); + textDataType = static_cast(itemType); + } else { + MKQL_ENSURE(textType->IsData(), "Expected data or optional data type for text."); + textDataType = static_cast(textType); + } + + MKQL_ENSURE(textDataType->GetSchemeType() == NUdf::TDataType::Id, "Expected string for text."); + + // Validate settings argument - should be a string (serialized proto) + const auto& settingsType = settings.GetStaticType(); + MKQL_ENSURE(settingsType->IsData(), "Expected data type for settings."); + const auto& settingsTypeData = static_cast(*settingsType); + MKQL_ENSURE(settingsTypeData.GetSchemeType() == NUdf::TDataType::Id, "Expected string for settings."); + + // Return type: List + auto stringType = TDataType::Create(NUdf::TDataType::Id, Env); + auto listType = TListType::Create(stringType, Env); + + TCallableBuilder callableBuilder(Env, __func__, listType); + callableBuilder.Add(text); + callableBuilder.Add(settings); + return TRuntimeNode(callableBuilder.Build(), false); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h index f5160aa0b940..f31419d96ed6 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.h +++ b/ydb/core/kqp/runtime/kqp_program_builder.h @@ -67,6 +67,8 @@ class TKqpProgramBuilder: public TDqProgramBuilder { TRuntimeNode KqpEnsure(TRuntimeNode value, TRuntimeNode predicate, TRuntimeNode issueCode, TRuntimeNode message); TRuntimeNode KqpIndexLookupJoin(const TRuntimeNode& input, const TString& joinType, const TString& leftLabel, const TString& rightLabel); + + TRuntimeNode FulltextAnalyze(TRuntimeNode text, TRuntimeNode settings); }; } // namespace NMiniKQL diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index f0d9d0ee62fb..adfe4d98c042 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -4,6 +4,7 @@ SRCS( kqp_arrow_memory_pool.cpp kqp_compute.cpp kqp_effects.cpp + kqp_fulltext_analyze.cpp kqp_output_stream.cpp kqp_program_builder.cpp kqp_read_actor.cpp diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_fulltext_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_fulltext_ut.cpp index a6958b7352d2..5550f5239583 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_fulltext_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_fulltext_ut.cpp @@ -28,22 +28,23 @@ void CreateTexts(NQuery::TQueryClient& db) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } -void UpsertTexts(NQuery::TQueryClient& db) { +void UpsertSomeTexts(NQuery::TQueryClient& db) { TString query = R"sql( UPSERT INTO `/Root/Texts` (Key, Text, Data) VALUES - (100, "Cats chase small animals.", "cats data"), - (200, "Dogs chase small cats.", "dogs data"), - (300, "Cats love cats.", "cats cats data"), - (400, "Foxes love dogs.", "fox data") + (100, "Cats love cats.", "cats data"), + (200, "Dogs love foxes.", "cats data") )sql"; auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } -void UpsertRow(NQuery::TQueryClient& db) { +void UpsertTexts(NQuery::TQueryClient& db) { TString query = R"sql( UPSERT INTO `/Root/Texts` (Key, Text, Data) VALUES - (250, "Dogs are big animals.", "new dogs data") + (100, "Cats chase small animals.", "cats data"), + (200, "Dogs chase small cats.", "dogs data"), + (300, "Cats love cats.", "cats cats data"), + (400, "Foxes love dogs.", "fox data") )sql"; auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); @@ -87,7 +88,6 @@ Y_UNIT_TEST(AddIndex) { CreateTexts(db); UpsertTexts(db); AddIndex(db); - auto index = ReadIndex(db); CompareYson(R"([ [[100u];"animals"]; @@ -113,7 +113,6 @@ Y_UNIT_TEST(AddIndexCovered) { CreateTexts(db); UpsertTexts(db); AddIndexCovered(db); - auto index = ReadIndex(db); CompareYson(R"([ [["cats data"];[100u];"animals"]; @@ -132,44 +131,221 @@ Y_UNIT_TEST(AddIndexCovered) { ])", NYdb::FormatResultSetYson(index)); } -Y_UNIT_TEST(UpsertRow) { +Y_UNIT_TEST(InsertRow) { auto kikimr = Kikimr(); auto db = kikimr.GetQueryClient(); CreateTexts(db); - UpsertTexts(db); + UpsertSomeTexts(db); AddIndex(db); - return; // TODO: upserts are not implemented - UpsertRow(db); + auto index = ReadIndex(db); + CompareYson(R"([ + [[100u];"cats"]; + [[200u];"dogs"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); + + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (150, "Foxes love cats.", "foxes data") + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + index = ReadIndex(db); + CompareYson(R"([ + [[100u];"cats"]; + [[150u];"cats"]; + [[200u];"dogs"]; + [[150u];"foxes"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[150u];"love"]; + [[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); +} +Y_UNIT_TEST(InsertRowMultipleTimes) { + auto kikimr = Kikimr(); + auto db = kikimr.GetQueryClient(); + + CreateTexts(db); + UpsertSomeTexts(db); + AddIndex(db); auto index = ReadIndex(db); CompareYson(R"([ - + [[100u];"cats"]; + [[200u];"dogs"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); + + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (150, "Foxes love cats.", "foxes data"), + (151, "Wolfs love foxes.", "cows data") + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (152, "Rabbit love foxes.", "rabbit data") + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + index = ReadIndex(db); + CompareYson(R"([ + [[100u];"cats"]; + [[150u];"cats"]; + [[200u];"dogs"]; + [[150u];"foxes"]; + [[151u];"foxes"]; + [[152u];"foxes"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[150u];"love"]; + [[151u];"love"]; + [[152u];"love"]; + [[200u];"love"]; + [[152u];"rabbit"]; + [[151u];"wolfs"] ])", NYdb::FormatResultSetYson(index)); } -Y_UNIT_TEST(UpsertRowCovered) { +Y_UNIT_TEST(InsertRowReturning) { auto kikimr = Kikimr(); auto db = kikimr.GetQueryClient(); CreateTexts(db); - UpsertTexts(db); + UpsertSomeTexts(db); + AddIndex(db); + auto index = ReadIndex(db); + CompareYson(R"([ + [[100u];"cats"]; + [[200u];"dogs"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); + + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (150, "Foxes love cats.", "foxes data") + RETURNING * + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["foxes data"];[150u];["Foxes love cats."]] + ])", NYdb::FormatResultSetYson(result.GetResultSet(0))); + } + index = ReadIndex(db); + CompareYson(R"([ + [[100u];"cats"]; + [[150u];"cats"]; + [[200u];"dogs"]; + [[150u];"foxes"]; + [[200u];"foxes"]; + [[100u];"love"]; + [[150u];"love"]; + [[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); +} + +Y_UNIT_TEST(InsertRowCovered) { + auto kikimr = Kikimr(); + auto db = kikimr.GetQueryClient(); + + CreateTexts(db); + UpsertSomeTexts(db); AddIndexCovered(db); - return; // TODO: upserts are not implemented - UpsertRow(db); + auto index = ReadIndex(db); + CompareYson(R"([ + [["cats data"];[100u];"cats"]; + [["cats data"];[200u];"dogs"]; + [["cats data"];[200u];"foxes"]; + [["cats data"];[100u];"love"]; + [["cats data"];[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (150, "Foxes love cats.", "foxes data") + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + index = ReadIndex(db); + CompareYson(R"([ + [["cats data"];[100u];"cats"]; + [["foxes data"];[150u];"cats"]; + [["cats data"];[200u];"dogs"]; + [["foxes data"];[150u];"foxes"]; + [["cats data"];[200u];"foxes"]; + [["cats data"];[100u];"love"]; + [["foxes data"];[150u];"love"]; + [["cats data"];[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); +} + +Y_UNIT_TEST(InsertRowCoveredReturning) { + auto kikimr = Kikimr(); + auto db = kikimr.GetQueryClient(); + + CreateTexts(db); + UpsertSomeTexts(db); + AddIndexCovered(db); auto index = ReadIndex(db); CompareYson(R"([ - + [["cats data"];[100u];"cats"]; + [["cats data"];[200u];"dogs"]; + [["cats data"];[200u];"foxes"]; + [["cats data"];[100u];"love"]; + [["cats data"];[200u];"love"] + ])", NYdb::FormatResultSetYson(index)); + + { // InsertRow + TString query = R"sql( + INSERT INTO `/Root/Texts` (Key, Text, Data) VALUES + (150, "Foxes love cats.", "foxes data") + RETURNING * + )sql"; + auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["foxes data"];[150u];["Foxes love cats."]] + ])", NYdb::FormatResultSetYson(result.GetResultSet(0))); + } + index = ReadIndex(db); + CompareYson(R"([ + [["cats data"];[100u];"cats"]; + [["foxes data"];[150u];"cats"]; + [["cats data"];[200u];"dogs"]; + [["foxes data"];[150u];"foxes"]; + [["cats data"];[200u];"foxes"]; + [["cats data"];[100u];"love"]; + [["foxes data"];[150u];"love"]; + [["cats data"];[200u];"love"] ])", NYdb::FormatResultSetYson(index)); } -Y_UNIT_TEST(InsertRow) { - // TODO: inserts are not implemented +Y_UNIT_TEST(UpsertRow) { + // TODO: upserts are not implemented } -Y_UNIT_TEST(InsertRowCovered) { - // TODO: inserts are not implemented +Y_UNIT_TEST(UpsertRowCovered) { + // TODO: upserts are not implemented } Y_UNIT_TEST(DeleteRow) { @@ -218,7 +394,6 @@ Y_UNIT_TEST(CreateTable) { } return; // TODO: upserts are not implemented UpsertTexts(db); - auto index = ReadIndex(db); CompareYson(R"([ [[100u];"animals"]; @@ -259,7 +434,6 @@ Y_UNIT_TEST(CreateTableCovered) { } return; // TODO: upserts are not implemented UpsertTexts(db); - auto index = ReadIndex(db); CompareYson(R"([ [["cats data"];[100u];"animals"]; @@ -297,7 +471,6 @@ Y_UNIT_TEST(NoBulkUpsert) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Only async-indexed tables are supported by BulkUpsert"); } - auto index = ReadIndex(db); CompareYson(R"([])", NYdb::FormatResultSetYson(index)); } @@ -309,7 +482,7 @@ Y_UNIT_TEST(NoIndexImplTableUpdates) { CreateTexts(db); AddIndex(db); - { // UpsertRow + { // BulkUpsert TString query = R"sql( UPSERT INTO `/Root/Texts/fulltext_idx/indexImplTable` (__ydb_token, Key) VALUES ("dogs", 901) @@ -318,7 +491,6 @@ Y_UNIT_TEST(NoIndexImplTableUpdates) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Writing to index implementation tables is not allowed"); } - auto index = ReadIndex(db); CompareYson(R"([])", NYdb::FormatResultSetYson(index)); }