Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c48fa2d
Initial plan
Copilot Oct 10, 2025
0c26c78
Add MakeInsertFulltextIndexRows function (WIP)
Copilot Oct 10, 2025
23d21cf
Add FulltextAnalyze callable and insert row tests
Copilot Oct 10, 2025
d6f1315
Update FulltextAnalyze to use NewCallable
Copilot Oct 10, 2025
2b6efda
Implement FulltextTokenize MKQL computation node
Copilot Oct 10, 2025
a2c9532
fixing build
kungasc Oct 10, 2025
e28592a
Implement TKqpProgramBuilder::FulltextAnalyze instead of using NewCal…
Copilot Oct 10, 2025
4b9eb71
add more FulltextAnalyze
kungasc Oct 10, 2025
5f72000
Add type annotation for FulltextAnalyze callable
Copilot Oct 10, 2025
006b522
Fix FlatMap lambda to return list instead of using raw callable
Copilot Oct 10, 2025
35e4079
Inline FlatMap construction in lambda body for proper type inference
Copilot Oct 10, 2025
69331a9
Wrap FulltextAnalyze result with ToStream for FlatMap compatibility
Copilot Oct 10, 2025
2251990
Wrap tokenRowsLambda result in Just for FlatMap compatibility
Copilot Oct 10, 2025
54146c6
Skip __ydb_token in indexColumns loop to avoid duplicate column
Copilot Oct 10, 2025
63f18cb
Add BuildFulltextIndexColumns to transform text column to __ydb_token
Copilot Oct 10, 2025
bcc2b38
Fix FulltextAnalyze to handle optional string types
Copilot Oct 10, 2025
599297a
fixes
kungasc Oct 10, 2025
7f6a701
clean up tests
kungasc Oct 13, 2025
1917025
add returning tests
kungasc Oct 14, 2025
7002fea
extract fulltext phy
kungasc Oct 14, 2025
22d3e75
revert 3034d4f47af3028995e4a2e228185c0e497f8fbe
kungasc Oct 14, 2025
89bd51a
revert f922841fc41f340637ecccada943a8ff71f5df2c
kungasc Oct 14, 2025
c0a36da
fix naming
kungasc Oct 15, 2025
a65db23
remove extra dq diff
kungasc Oct 15, 2025
aec3e9d
better switch-case
kungasc Oct 15, 2025
b5777d6
Simplify FulltextAnalyze callable creation using ctx.Builder
Copilot Oct 15, 2025
23dd514
Cache parsed fulltext analyzer settings in computation context
Copilot Oct 16, 2025
a88e1c4
clean up and add test
kungasc Oct 16, 2025
ee4a9a8
Fix settings allocation
kungasc Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
return WrapDqHashCombine(callable, ctx);
}

if (name == "FulltextAnalyze"sv) {
return WrapFulltextAnalyze(callable, ctx);
}

return nullptr;
};
}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,15 @@
{"Index": 3, "Name": "Message", "Type": "TExprBase"}
]
},
{
"Name": "TFulltextAnalyze",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "FulltextAnalyze"},
"Children": [
{"Index": 0, "Name": "Text", "Type": "TCoAtom"},
{"Index": 1, "Name": "Settings", "Type": "TCoAtom"}
]
},
{
"Name": "TKqpOpRead",
"Base": "TCallable",
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/kqp/opt/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,54 @@ TStatus AnnotateKqpEnsure(const TExprNode::TPtr& node, TExprContext& ctx) {
return TStatus::Ok;
}

TStatus AnnotateFulltextAnalyze(const TExprNode::TPtr& node, TExprContext& ctx) {
if (!EnsureArgsCount(*node, 2, ctx)) {
return TStatus::Error;
}

// First argument: text (should be String or Utf8)
const auto* textArg = node->Child(0);
if (!EnsureComputable(*textArg, ctx)) {
return TStatus::Error;
}

const TDataExprType* textDataType;
bool isOptional;
if (!EnsureDataOrOptionalOfData(*textArg, isOptional, textDataType, ctx)) {
return TStatus::Error;
}

if (textDataType->GetSlot() != EDataSlot::String && textDataType->GetSlot() != EDataSlot::Utf8) {
ctx.AddError(TIssue(ctx.GetPosition(textArg->Pos()), TStringBuilder()
<< "Expected String or Utf8 for text argument, but got: " << *textArg->GetTypeAnn()));
return TStatus::Error;
}

// Second argument: settings (should be String - serialized proto)
const auto* settingsArg = node->Child(1);
if (!EnsureComputable(*settingsArg, ctx)) {
return TStatus::Error;
}

const TDataExprType* settingsDataType;
if (!EnsureDataOrOptionalOfData(*settingsArg, isOptional, settingsDataType, ctx)) {
return TStatus::Error;
}

if (settingsDataType->GetSlot() != EDataSlot::String) {
ctx.AddError(TIssue(ctx.GetPosition(settingsArg->Pos()), TStringBuilder()
<< "Expected String for settings argument, but got: " << *settingsArg->GetTypeAnn()));
return TStatus::Error;
}

// Return type: List<String>
auto stringType = ctx.MakeType<TDataExprType>(EDataSlot::String);
auto listType = ctx.MakeType<TListExprType>(stringType);
node->SetTypeAnn(listType);

return TStatus::Ok;
}

TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
Expand Down Expand Up @@ -2577,6 +2625,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateKqpEnsure(input, ctx);
}

if (TFulltextAnalyze::Match(input.Get())) {
return AnnotateFulltextAnalyze(input, ctx);
}

if (TKqpReadRangesSourceSettings::Match(input.Get())) {
return AnnotateKqpSourceSettings(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ NYql::NNodes::TExprBase BuildVectorIndexPostingRows(const NYql::TKikimrTableDesc
bool withData,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);

TVector<TStringBuf> BuildVectorIndexPostingColumns(const NYql::TKikimrTableDescription& table,
const NYql::TIndexDescription* indexDesc);
TVector<TStringBuf> BuildVectorIndexPostingColumns(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc);

NYql::NNodes::TExprBase BuildVectorIndexPrefixRows(const NYql::TKikimrTableDescription& table, const NYql::TKikimrTableDescription& prefixTable,
bool withData, const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows,
Expand All @@ -118,4 +117,10 @@ std::pair<NYql::NNodes::TExprBase, NYql::NNodes::TExprBase> BuildVectorIndexPref
const NYql::TIndexDescription* indexDesc, const NYql::NNodes::TExprBase& inputRows,
TVector<TStringBuf>& indexTableColumns, NYql::TPositionHandle pos, NYql::TExprContext& ctx);

NYql::NNodes::TExprBase BuildFulltextIndexRows(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc,
const NYql::NNodes::TExprBase& inputRows, const THashSet<TStringBuf>& inputColumns, const TVector<TStringBuf>& indexTableColumns,
NYql::TPositionHandle pos, NYql::TExprContext& ctx);

TVector<TStringBuf> BuildFulltextIndexColumns(const NYql::TKikimrTableDescription& table, const NYql::TIndexDescription* indexDesc);

} // NKikimr::NKqp::NOpt
154 changes: 154 additions & 0 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_fulltext_index.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#include "kqp_opt_phy_effects_impl.h"
#include <yql/essentials/providers/common/provider/yql_provider.h>

namespace NKikimr::NKqp::NOpt {

using namespace NYql;
using namespace NYql::NDq;
using namespace NYql::NNodes;

TExprBase BuildFulltextIndexRows(const TKikimrTableDescription& table, const TIndexDescription* indexDesc,
const NNodes::TExprBase& inputRows, const THashSet<TStringBuf>& inputColumns, const TVector<TStringBuf>& indexTableColumns,
TPositionHandle pos, NYql::TExprContext& ctx)
{
// Extract fulltext index settings
const auto* fulltextDesc = std::get_if<NKikimrKqp::TFulltextIndexDescription>(&indexDesc->SpecializedIndexDescription);
YQL_ENSURE(fulltextDesc, "Expected fulltext index description");

const auto& settings = fulltextDesc->GetSettings();
YQL_ENSURE(settings.columns().size() == 1, "Expected single text column in fulltext index");

const TString textColumn = settings.columns().at(0).column();
const auto& analyzers = settings.columns().at(0).analyzers();

// Serialize analyzer settings for runtime usage
TString settingsProto;
YQL_ENSURE(analyzers.SerializeToString(&settingsProto));

auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row"));
auto tokenArg = TCoArgument(ctx.NewArgument(pos, "token"));

// Build output row structure for each token
TVector<TExprBase> tokenRowTuples;

// Add token column (first column in fulltext index)
auto tokenTuple = Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(NTableIndex::NFulltext::TokenColumn)
.Value(tokenArg)
.Done();
tokenRowTuples.emplace_back(tokenTuple);

// Add all other columns (primary key + data columns)
for (const auto& column : indexTableColumns) {
auto columnAtom = ctx.NewAtom(pos, column);

if (inputColumns.contains(column)) {
auto tuple = Build<TCoNameValueTuple>(ctx, pos)
.Name(columnAtom)
.Value<TCoMember>()
.Struct(inputRowArg)
.Name(columnAtom)
.Build()
.Done();

tokenRowTuples.emplace_back(tuple);
} else {
auto columnType = table.GetColumnType(TString(column));

auto tuple = Build<TCoNameValueTuple>(ctx, pos)
.Name(columnAtom)
.Value<TCoNothing>()
.OptionalType(NCommon::BuildTypeExpr(pos, *columnType, ctx))
.Build()
.Done();

tokenRowTuples.emplace_back(tuple);
}
}

// Create lambda that builds output row for each token
// FlatMap expects lambda to return list/stream/optional, so wrap struct in Just
auto tokenRowsLambda = Build<TCoLambda>(ctx, pos)
.Args({tokenArg})
.Body<TCoJust>()
.Input<TCoAsStruct>()
.Add(tokenRowTuples)
.Build()
.Build()
.Done();

// Get text member from input row
auto textMember = Build<TCoMember>(ctx, pos)
.Struct(inputRowArg)
.Name().Build(textColumn)
.Done();

// Create callable for fulltext tokenization
// Format: FulltextAnalyze(text: String, settings: String) -> List<String>
auto settingsLiteral = Build<TCoString>(ctx, pos)
.Literal().Build(settingsProto)
.Done();

auto analyzeCallable = ctx.Builder(pos)
.Callable("FulltextAnalyze")
.Add(0, textMember.Ptr())
.Add(1, settingsLiteral.Ptr())
.Seal()
.Build();

auto analyzeStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputRows)
.Build()
.Program()
.Args({"rows"})
.Body<TCoIterator>()
.List<TCoFlatMap>()
.Input("rows")
.Lambda()
.Args({inputRowArg})
.Body<TCoFlatMap>()
.Input(analyzeCallable)
.Lambda(tokenRowsLambda)
.Build()
.Build()
.Build()
.Build()
.Build()
.Settings().Build()
.Done();

return Build<TDqCnUnionAll>(ctx, pos)
.Output()
.Stage(analyzeStage)
.Index().Build("0")
.Build()
.Done();
}

TVector<TStringBuf> BuildFulltextIndexColumns(const TKikimrTableDescription& table, const TIndexDescription* indexDesc) {
TVector<TStringBuf> indexTableColumns;
THashSet<TStringBuf> indexTableColumnSet;

// Add token column first (replaces the text column)
indexTableColumns.emplace_back(NTableIndex::NFulltext::TokenColumn);
indexTableColumnSet.insert(NTableIndex::NFulltext::TokenColumn);

// Add primary key columns
for (const auto& column : table.Metadata->KeyColumnNames) {
if (indexTableColumnSet.insert(column).second) {
indexTableColumns.emplace_back(column);
}
}

// Add data columns (covered columns)
for (const auto& column : indexDesc->DataColumns) {
if (indexTableColumnSet.insert(column).second) {
indexTableColumns.emplace_back(column);
}
}

return indexTableColumns;
}

}
59 changes: 40 additions & 19 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/base/table_index.h>
#include <ydb/core/base/fulltext.h>

#include "kqp_opt_phy_effects_rules.h"
#include "kqp_opt_phy_effects_impl.h"
Expand Down Expand Up @@ -195,31 +196,51 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
}
}

auto upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns,
insert.Pos(), ctx, true);

if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
if (indexDesc->KeyColumns.size() > 1) {
// First resolve prefix IDs using StreamLookup
const auto& prefixTable = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, TStringBuilder() << insert.Table().Path().Value()
<< "/" << indexDesc->Name << "/" << NKikimr::NTableIndex::NKMeans::PrefixTable);
if (prefixTable.Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, prefixTable, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows, indexTableColumns, insert.Pos(), ctx);
std::optional<TExprBase> upsertIndexRows;
switch (indexDesc->Type) {
case TIndexDescription::EType::GlobalSync:
case TIndexDescription::EType::GlobalAsync:
case TIndexDescription::EType::GlobalSyncUnique: {
upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns,
insert.Pos(), ctx, true);
break;
}
case TIndexDescription::EType::GlobalSyncVectorKMeansTree: {
upsertIndexRows = MakeInsertIndexRows(insertRowsPrecompute, table, inputColumnsSet, indexTableColumns,
insert.Pos(), ctx, true);
if (indexDesc->KeyColumns.size() > 1) {
// First resolve prefix IDs using StreamLookup
const auto& prefixTable = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, TStringBuilder() << insert.Table().Path().Value()
<< "/" << indexDesc->Name << "/" << NKikimr::NTableIndex::NKMeans::PrefixTable);
if (prefixTable.Metadata->Columns.at(NTableIndex::NKMeans::IdColumn).DefaultKind == NKikimrKqp::TKqpColumnMetadataProto::DEFAULT_KIND_SEQUENCE) {
auto res = BuildVectorIndexPrefixRowsWithNew(table, prefixTable, indexDesc, upsertIndexRows.value(), indexTableColumns, insert.Pos(), ctx);
upsertIndexRows = std::move(res.first);
effects.emplace_back(std::move(res.second));
} else {
// Handle old prefixed vector index tables without the sequence
upsertIndexRows = BuildVectorIndexPrefixRows(table, prefixTable, true, indexDesc, upsertIndexRows.value(), indexTableColumns, insert.Pos(), ctx);
}
}
upsertIndexRows = BuildVectorIndexPostingRows(table, insert.Table(), indexDesc->Name, indexTableColumns,
upsertIndexRows.value(), true, insert.Pos(), ctx);
indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc);
break;
}
case TIndexDescription::EType::GlobalFulltext: {
// For fulltext indexes, we need to tokenize the text and create index rows
upsertIndexRows = BuildFulltextIndexRows(table, indexDesc, insertRowsPrecompute, inputColumnsSet, indexTableColumns,
insert.Pos(), ctx);
// Update columns to reflect transformation: text column -> __ydb_token
indexTableColumns = BuildFulltextIndexColumns(table, indexDesc);
break;
}
upsertIndexRows = BuildVectorIndexPostingRows(table, insert.Table(), indexDesc->Name, indexTableColumns,
upsertIndexRows, true, insert.Pos(), ctx);
indexTableColumns = BuildVectorIndexPostingColumns(table, indexDesc);
}
Y_ENSURE(upsertIndexRows.has_value());
Y_ENSURE(indexTableColumns);

auto upsertIndex = Build<TKqlUpsertRows>(ctx, insert.Pos())
.Table(tableNode)
.Input(upsertIndexRows)
.Input(upsertIndexRows.value())
.Columns(BuildColumnsList(indexTableColumns, insert.Pos(), ctx))
.ReturningColumns<TCoAtomList>().Build()
.IsBatch(ctx.NewAtom(insert.Pos(), "false"))
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/physical/effects/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
kqp_opt_phy_delete_index.cpp
kqp_opt_phy_effects.cpp
kqp_opt_phy_fulltext_index.cpp
kqp_opt_phy_indexes.cpp
kqp_opt_phy_insert_index.cpp
kqp_opt_phy_insert.cpp
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/base/fulltext.h>

#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
Expand Down Expand Up @@ -515,6 +516,16 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
return ctx.PgmBuilder().DqHashCombine(flow, memLimit, keyExtractor, init, update, finish);
});

compiler->AddCallable("FulltextAnalyze",
[&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
YQL_ENSURE(node.ChildrenSize() == 2, "FulltextAnalyze should have 2 arguments: text and settings");

auto textArg = MkqlBuildExpr(*node.Child(0), buildCtx);
auto settingsArg = MkqlBuildExpr(*node.Child(1), buildCtx);

return ctx.PgmBuilder().FulltextAnalyze(textArg, settingsArg);
});

return compiler;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_compute.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "kqp_compute.h"
#include "kqp_stream_lookup_join_helpers.h"
#include "kqp_fulltext_analyze.h"

#include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/kqp_compute.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class TKqpEnsureFail : public yexception {

IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapFulltextAnalyze(TCallable& callable, const TComputationNodeFactoryContext& ctx);

} // namespace NMiniKQL
} // namespace NKikimr
Loading
Loading