Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,98 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
}

Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel3) {
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetFeatureFlags(featureFlags)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);

auto db = kikimr.GetTableClient();
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
{
const TString createIndex(Q_(R"(
ALTER TABLE `/Root/TestTable`
ADD INDEX index
GLOBAL USING vector_kmeans_tree
ON (user, emb)
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=3, clusters=2);
)"));

auto result = session.ExecuteSchemeQuery(createIndex)
.ExtractValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
UNIT_ASSERT_EQUAL(indexes.size(), 1);
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
std::vector<std::string> indexKeyColumns{"user", "emb"};
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
UNIT_ASSERT_EQUAL(settings.Levels, 3);
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
}
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
}

Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineDistanceNotNullableLevel4) {
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetFeatureFlags(featureFlags)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::BUILD_INDEX, NActors::NLog::PRI_TRACE);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);

auto db = kikimr.GetTableClient();
auto session = DoCreateTableForPrefixedVectorIndex(db, false);
{
const TString createIndex(Q_(R"(
ALTER TABLE `/Root/TestTable`
ADD INDEX index
GLOBAL USING vector_kmeans_tree
ON (user, emb)
WITH (distance=cosine, vector_type="uint8", vector_dimension=2, levels=4, clusters=2);
)"));

auto result = session.ExecuteSchemeQuery(createIndex)
.ExtractValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto result = session.DescribeTable("/Root/TestTable").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
const auto& indexes = result.GetTableDescription().GetIndexDescriptions();
UNIT_ASSERT_EQUAL(indexes.size(), 1);
UNIT_ASSERT_EQUAL(indexes[0].GetIndexName(), "index");
std::vector<std::string> indexKeyColumns{"user", "emb"};
UNIT_ASSERT_EQUAL(indexes[0].GetIndexColumns(), indexKeyColumns);
const auto& settings = std::get<TKMeansTreeSettings>(indexes[0].GetIndexSettings());
UNIT_ASSERT_EQUAL(settings.Settings.Metric, NYdb::NTable::TVectorIndexSettings::EMetric::CosineDistance);
UNIT_ASSERT_EQUAL(settings.Settings.VectorType, NYdb::NTable::TVectorIndexSettings::EVectorType::Uint8);
UNIT_ASSERT_EQUAL(settings.Settings.VectorDimension, 2);
UNIT_ASSERT_EQUAL(settings.Levels, 4);
UNIT_ASSERT_EQUAL(settings.Clusters, 2);
}
DoPositiveQueriesPrefixedVectorIndexOrderByCosine(session);
}

Y_UNIT_TEST(PrefixedVectorIndexOrderByCosineSimilarityNotNullableLevel2) {
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableVectorIndex(true);
Expand Down
108 changes: 71 additions & 37 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateBuildPropose(
static constexpr std::string_view LogPrefix = "Create build table boundaries for ";
LOG_D(buildInfo.Id << " table " << suffix
<< ", count: " << count << ", parts: " << parts << ", step: " << step
<< ", kmeans: " << buildInfo.KMeansTreeToDebugStr());
<< ", " << buildInfo.DebugString());
if (parts > 1) {
const auto from = buildInfo.KMeans.ChildBegin;
for (auto i = from + step, e = from + count; i < e; i += step) {
Expand Down Expand Up @@ -699,7 +699,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
ToTabletSend.emplace_back(shardId, ui64(BuildId), std::move(ev));
}

void SendBuildIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
void SendBuildSecondaryIndexRequest(TShardIdx shardIdx, TIndexBuildInfo& buildInfo) {
auto ev = MakeHolder<TEvDataShard::TEvBuildIndexCreateRequest>();
ev->Record.SetBuildIndexId(ui64(BuildId));

Expand Down Expand Up @@ -807,12 +807,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}
}

bool FillTable(TIndexBuildInfo& buildInfo) {
bool FillSecondaryIndex(TIndexBuildInfo& buildInfo) {
LOG_D("FillSecondaryIndex Start");

if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
AddAllShards(buildInfo);
}
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildIndexRequest(shardIdx, buildInfo); }) &&
auto done = SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendBuildSecondaryIndexRequest(shardIdx, buildInfo); }) &&
buildInfo.DoneShards.size() == buildInfo.Shards.size();

if (done) {
LOG_D("FillSecondaryIndex Done");
}

return done;
}

bool FillPrefixKMeans(TIndexBuildInfo& buildInfo) {
Expand All @@ -823,6 +831,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
buildInfo.DoneShards.size() == buildInfo.Shards.size();
}

bool FillLocalKMeans(TIndexBuildInfo& buildInfo) {
if (buildInfo.DoneShards.empty() && buildInfo.ToUploadShards.empty() && buildInfo.InProgressShards.empty()) {
AddAllShards(buildInfo);
}
return SendToShards(buildInfo, [&](TShardIdx shardIdx) { SendKMeansLocalRequest(shardIdx, buildInfo); }) &&
buildInfo.DoneShards.size() == buildInfo.Shards.size();
}

bool InitSingleKMeans(TIndexBuildInfo& buildInfo) {
if (!buildInfo.DoneShards.empty() || !buildInfo.InProgressShards.empty() || !buildInfo.ToUploadShards.empty()) {
return false;
Expand Down Expand Up @@ -934,102 +950,112 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
);
}

bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 1) {
LOG_D("FillIndex::Prefixed::Level1::Start " << buildInfo.KMeansTreeToDebugStr());
if (!FillTable(buildInfo)) {
bool FillPrefixedVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
LOG_D("FillPrefixedVectorIndex Start " << buildInfo.DebugString());

if (buildInfo.KMeans.Level == 1) {
if (!FillSecondaryIndex(buildInfo)) {
return false;
}
const ui64 doneShards = buildInfo.DoneShards.size();
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());

const ui64 doneShards = buildInfo.DoneShards.size();
ClearDoneShards(txc, buildInfo);
// it's approximate but upper bound, so it's ok
buildInfo.KMeans.TableSize = std::max<ui64>(1, buildInfo.Processed.GetUploadRows());
buildInfo.KMeans.PrefixIndexDone(doneShards);
LOG_D("FillPrefixedVectorIndex PrefixIndexDone " << buildInfo.DebugString());

PersistKMeansState(txc, buildInfo);
NIceDb::TNiceDb db{txc.DB};
Self->PersistBuildIndexUploadReset(db, buildInfo);
LOG_D("FillIndex::Prefixed::Level1::Done " << buildInfo.KMeansTreeToDebugStr());
ChangeState(BuildId, TIndexBuildInfo::EState::CreateBuild);
Progress(BuildId);
return false;
}

if (buildInfo.IsBuildPrefixedVectorIndex() && buildInfo.KMeans.Level == 2) {
LOG_D("FillIndex::Prefixed::Level2::Start " << buildInfo.KMeansTreeToDebugStr());
if (!FillPrefixKMeans(buildInfo)) {
} else {
bool filled = buildInfo.KMeans.Level == 2
? FillPrefixKMeans(buildInfo)
: FillLocalKMeans(buildInfo);
if (!filled) {
return false;
}
LOG_D("FillPrefixedVectorIndex DoneLevel " << buildInfo.DebugString());

ClearDoneShards(txc, buildInfo);
Y_ASSERT(buildInfo.KMeans.State == TIndexBuildInfo::TKMeans::MultiLocal);
const bool needsAnotherLevel = buildInfo.KMeans.NextLevel();
buildInfo.KMeans.State = TIndexBuildInfo::TKMeans::MultiLocal;
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
if (buildInfo.KMeans.Level == 2) {
buildInfo.KMeans.Parent = buildInfo.KMeans.ParentEnd();
}
LOG_D("FillPrefixedVectorIndex NextLevel " << buildInfo.DebugString());

PersistKMeansState(txc, buildInfo);
NIceDb::TNiceDb db{txc.DB};
Self->PersistBuildIndexUploadReset(db, buildInfo);
LOG_D("FillIndex::Prefixed::Level2::Done " << buildInfo.KMeansTreeToDebugStr());
if (!needsAnotherLevel) {
LOG_D("FillPrefixedVectorIndex Done " << buildInfo.DebugString());
return true;
}
ChangeState(BuildId, TIndexBuildInfo::EState::DropBuild);
Progress(BuildId);
return false;
}
}

bool FillVectorIndex(TTransactionContext& txc, TIndexBuildInfo& buildInfo) {
LOG_D("FillVectorIndex Start " << buildInfo.DebugString());

if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Upload) {
return false;
}
if (InitSingleKMeans(buildInfo)) {
LOG_D("FillIndex::SingleKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex SingleKMeans " << buildInfo.DebugString());
}
if (!SendVectorIndex(buildInfo)) {
return false;
}

LOG_D("FillIndex::SendVectorIndex::Done " << buildInfo.KMeansTreeToDebugStr());
if (!buildInfo.Sample.Rows.empty()) {
if (buildInfo.Sample.State == TIndexBuildInfo::TSample::EState::Collect) {
LOG_D("FillIndex::SendSample::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex SendUploadSampleKRequest " << buildInfo.DebugString());
SendUploadSampleKRequest(buildInfo);
return false;
}
LOG_D("FillIndex::SendSample::Done " << buildInfo.KMeansTreeToDebugStr());
}

LOG_D("FillIndex::ClearDoneShards " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex DoneLevel " << buildInfo.DebugString());
ClearDoneShards(txc, buildInfo);

if (!buildInfo.Sample.Rows.empty()) {
if (buildInfo.KMeans.NextState()) {
LOG_D("FillIndex::NextState::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex NextState " << buildInfo.DebugString());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}
buildInfo.Sample.Clear();
NIceDb::TNiceDb db{txc.DB};
Self->PersistBuildIndexSampleForget(db, buildInfo);
LOG_D("FillIndex::NextState::Done " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex DoneState " << buildInfo.DebugString());
}

if (buildInfo.KMeans.NextParent()) {
LOG_D("FillIndex::NextParent::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex NextParent " << buildInfo.DebugString());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}

if (InitMultiKMeans(buildInfo)) {
LOG_D("FillIndex::MultiKMeans::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex MultiKMeans " << buildInfo.DebugString());
PersistKMeansState(txc, buildInfo);
Progress(BuildId);
return false;
}

if (buildInfo.KMeans.NextLevel()) {
LOG_D("FillIndex::NextLevel::Start " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex NextLevel " << buildInfo.DebugString());
PersistKMeansState(txc, buildInfo);
NIceDb::TNiceDb db{txc.DB};
Self->PersistBuildIndexUploadReset(db, buildInfo);
Expand All @@ -1039,7 +1065,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Progress(BuildId);
return false;
}
LOG_D("FillIndex::Done " << buildInfo.KMeansTreeToDebugStr());
LOG_D("FillVectorIndex Done " << buildInfo.DebugString());
return true;
}

Expand All @@ -1057,15 +1083,20 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
Y_ABORT_UNLESS(buildInfo.SnapshotStep);
}
if (buildInfo.Shards.empty()) {
LOG_D("FillIndex::InitiateShards " << buildInfo.KMeansTreeToDebugStr());
NIceDb::TNiceDb db(txc.DB);
InitiateShards(db, buildInfo);
}
if (buildInfo.IsBuildVectorIndex()) {
return FillVectorIndex(txc, buildInfo);
} else {
Y_ASSERT(buildInfo.IsBuildSecondaryIndex() || buildInfo.IsBuildColumns());
return FillTable(buildInfo);
switch (buildInfo.BuildKind) {
case TIndexBuildInfo::EBuildKind::BuildSecondaryIndex:
case TIndexBuildInfo::EBuildKind::BuildColumns:
return FillSecondaryIndex(buildInfo);
case TIndexBuildInfo::EBuildKind::BuildVectorIndex:
return FillVectorIndex(txc, buildInfo);
case TIndexBuildInfo::EBuildKind::BuildPrefixedVectorIndex:
return FillPrefixedVectorIndex(txc, buildInfo);
default:
Y_ASSERT(false);
return true;
}
}

Expand Down Expand Up @@ -1302,6 +1333,8 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
}

void InitiateShards(NIceDb::TNiceDb& db, TIndexBuildInfo& buildInfo) {
LOG_D("InitiateShards " << buildInfo.DebugString());

Y_ASSERT(buildInfo.Shards.empty());
Y_ASSERT(buildInfo.ToUploadShards.empty());
Y_ASSERT(buildInfo.InProgressShards.empty());
Expand All @@ -1317,15 +1350,16 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
auto tableColumns = NTableIndex::ExtractInfo(table); // skip dropped columns
TSerializedTableRange shardRange = InfiniteRange(tableColumns.Keys.size());
static constexpr std::string_view LogPrefix = "";
LOG_D("infinite range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));

buildInfo.Cluster2Shards.clear();
for (const auto& x: table->GetPartitions()) {
Y_ABORT_UNLESS(Self->ShardInfos.contains(x.ShardIdx));
TSerializedCellVec bound{x.EndOfRange};
shardRange.To = bound;
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange, buildInfo.IsBuildPrefixedVectorIndex() ? 2 : 1));
buildInfo.AddParent(shardRange, x.ShardIdx);
if (buildInfo.BuildKind == TIndexBuildInfo::EBuildKind::BuildVectorIndex) {
LOG_D("shard " << x.ShardIdx << " range " << buildInfo.KMeans.RangeToDebugStr(shardRange));
buildInfo.AddParent(shardRange, x.ShardIdx);
}
auto [it, emplaced] = buildInfo.Shards.emplace(x.ShardIdx, TIndexBuildInfo::TShardStatus{std::move(shardRange), "", buildInfo.Shards.size()});
Y_ASSERT(emplaced);
shardRange.From = std::move(bound);
Expand Down
Loading
Loading