diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 6c0231ff1a11..ea23aa42bea9 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1522,7 +1522,7 @@ message TEvLocalKMeansRequest { // id of parent cluster optional uint32 Parent = 15; - // [Child ... Child + K] ids reserved for our clusters + // [Child ... Child + K) ids reserved for this kmeans clusters optional uint32 Child = 16; optional string LevelName = 17; @@ -1532,7 +1532,7 @@ message TEvLocalKMeansRequest { repeated string DataColumns = 20; } -message TEvLocalKMeansProgressResponse { +message TEvLocalKMeansResponse { optional uint64 Id = 1; optional uint64 TabletId = 2; @@ -1552,6 +1552,53 @@ message TEvLocalKMeansProgressResponse { // optional uint32 DoneRounds = 11; } +message TEvReshuffleKMeansRequest { + optional uint64 Id = 1; + + optional uint64 TabletId = 2; + optional NKikimrProto.TPathID PathId = 3; + + optional uint64 SnapshotTxId = 4; + optional uint64 SnapshotStep = 5; + + optional uint64 SeqNoGeneration = 6; + optional uint64 SeqNoRound = 7; + + optional Ydb.Table.VectorIndexSettings Settings = 8; + + optional TEvLocalKMeansRequest.EState Upload = 9; + + // id of parent cluster + optional uint32 Parent = 10; + // [Child ... Child + ClustersSize) ids of this kmeans clusters + optional uint32 Child = 11; + // centroids of clusters + repeated string Clusters = 12; + + optional string PostingName = 13; + + optional string EmbeddingColumn = 14; + repeated string DataColumns = 15; +} + +message TEvReshuffleKMeansResponse { + optional uint64 Id = 1; + + optional uint64 TabletId = 2; + optional NKikimrProto.TPathID PathId = 3; + + optional uint64 RequestSeqNoGeneration = 4; + optional uint64 RequestSeqNoRound = 5; + + optional NKikimrIndexBuilder.EBuildStatus Status = 6; + repeated Ydb.Issue.IssueMessage Issues = 7; + + // TODO(mbkkt) implement slow-path (reliable-path) + // optional uint64 RowsDelta = 8; + // optional uint64 BytesDelta = 9; + // optional last written primary key +} + message TEvCdcStreamScanRequest { message TLimits { optional uint32 BatchMaxBytes = 1 [default = 512000]; diff --git a/ydb/core/tx/datashard/buffer_data.h b/ydb/core/tx/datashard/buffer_data.h index f075ae5fc858..0b5cdddf777c 100644 --- a/ydb/core/tx/datashard/buffer_data.h +++ b/ydb/core/tx/datashard/buffer_data.h @@ -1,3 +1,5 @@ +#pragma once + #include "ydb/core/scheme/scheme_tablecell.h" #include "ydb/core/tx/datashard/upload_stats.h" #include "ydb/core/tx/tx_proxy/upload_rows.h" diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index a574bcd2d900..3d9bc4ee5e07 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -333,7 +333,10 @@ struct TEvDataShard { EvSampleKResponse, EvLocalKMeansRequest, - EvLocalKMeansProgressResponse, + EvLocalKMeansResponse, + + EvReshuffleKMeansRequest, + EvReshuffleKMeansResponse, EvEnd }; @@ -1457,16 +1460,28 @@ struct TEvDataShard { TEvDataShard::EvSampleKResponse> { }; + struct TEvReshuffleKMeansRequest + : public TEventPB { + }; + + struct TEvReshuffleKMeansResponse + : public TEventPB { + }; + struct TEvLocalKMeansRequest : public TEventPB { }; - struct TEvLocalKMeansProgressResponse - : public TEventPB { + struct TEvLocalKMeansResponse + : public TEventPB { }; struct TEvKqpScan diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 078e0b57a0ea..ae40268d991b 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -254,6 +254,7 @@ class TDataShard class TTxHandleSafeBuildIndexScan; class TTxHandleSafeSampleKScan; class TTxHandleSafeLocalKMeansScan; + class TTxHandleSafeReshuffleKMeansScan; class TTxHandleSafeStatisticsScan; class TTxMediatorStateRestored; @@ -1325,6 +1326,8 @@ class TDataShard void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx); void HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx); + void HandleSafe(TEvDataShard::TEvReshuffleKMeansRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx); void HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx); @@ -3117,6 +3120,7 @@ class TDataShard HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle); HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle); HFunc(TEvDataShard::TEvSampleKRequest, Handle); + HFunc(TEvDataShard::TEvReshuffleKMeansRequest, Handle); HFunc(TEvDataShard::TEvLocalKMeansRequest, Handle); HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle); diff --git a/ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp b/ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp index 43cbd61d03c0..eb275e24efbd 100644 --- a/ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp +++ b/ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp @@ -81,7 +81,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardLocalKMeansScan) { runtime.SendToPipe(tid, sender, ev.release(), 0, GetPipeConfigWithRetries()); TAutoPtr handle; - auto reply = runtime.GrabEdgeEventRethrow(handle); + auto reply = runtime.GrabEdgeEventRethrow(handle); UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST); } } @@ -148,7 +148,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardLocalKMeansScan) { runtime.SendToPipe(tid, sender, ev2.release(), 0, GetPipeConfigWithRetries()); TAutoPtr handle; - auto reply = runtime.GrabEdgeEventRethrow(handle); + auto reply = runtime.GrabEdgeEventRethrow(handle); NYql::TIssues issues; NYql::IssuesFromMessage(reply->Record.GetIssues(), issues); diff --git a/ydb/core/tx/datashard/datashard_ut_reshuffle_kmeans.cpp b/ydb/core/tx/datashard/datashard_ut_reshuffle_kmeans.cpp new file mode 100644 index 000000000000..c817bbf3d0ea --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_reshuffle_kmeans.cpp @@ -0,0 +1,648 @@ +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace NKikimr { +using namespace Tests; +using Ydb::Table::VectorIndexSettings; +using namespace NTableIndex::NTableVectorKmeansTreeIndex; + +static std::atomic sId = 1; +static constexpr const char* kMainTable = "/Root/table-main"; +static constexpr const char* kPostingTable = "/Root/table-posting"; + +Y_UNIT_TEST_SUITE (TTxDataShardReshuffleKMeansScan) { + static void DoBadRequest(Tests::TServer::TPtr server, TActorId sender, std::unique_ptr & ev, + size_t dims = 2, VectorIndexSettings::VectorType type = VectorIndexSettings::VECTOR_TYPE_FLOAT, VectorIndexSettings::Distance metric = VectorIndexSettings::DISTANCE_COSINE) { + auto id = sId.fetch_add(1, std::memory_order_relaxed); + auto& runtime = *server->GetRuntime(); + auto snapshot = CreateVolatileSnapshot(server, {kMainTable}); + auto datashards = GetTableShards(server, sender, kMainTable); + TTableId tableId = ResolveTableId(server, sender, kMainTable); + + TStringBuilder data; + TString err; + UNIT_ASSERT(datashards.size() == 1); + + for (auto tid : datashards) { + auto& rec = ev->Record; + rec.SetId(1); + + rec.SetSeqNoGeneration(id); + rec.SetSeqNoRound(1); + + if (!rec.HasTabletId()) { + rec.SetTabletId(tid); + } + if (!rec.HasPathId()) { + PathIdFromPathId(tableId.PathId, rec.MutablePathId()); + } + + rec.SetSnapshotTxId(snapshot.TxId); + rec.SetSnapshotStep(snapshot.Step); + + VectorIndexSettings settings; + settings.set_vector_dimension(dims); + settings.set_vector_type(type); + settings.set_distance(metric); + *rec.MutableSettings() = settings; + + rec.SetUpload(NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING); + + rec.SetParent(0); + rec.SetChild(1); + + if (rec.ClustersSize() == 0) { + rec.AddClusters("something"); + } else { + rec.ClearClusters(); + } + + if (rec.HasEmbeddingColumn()) { + rec.ClearEmbeddingColumn(); + } else { + rec.SetEmbeddingColumn("embedding"); + } + + rec.SetPostingName(kPostingTable); + + runtime.SendToPipe(tid, sender, ev.release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto reply = runtime.GrabEdgeEventRethrow(handle); + UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST); + } + } + + static TString DoReshuffleKMeans(Tests::TServer::TPtr server, TActorId sender, ui32 parent, + const std::vector& level, + NKikimrTxDataShard::TEvLocalKMeansRequest::EState upload, + VectorIndexSettings::VectorType type, auto metric) { + auto id = sId.fetch_add(1, std::memory_order_relaxed); + auto& runtime = *server->GetRuntime(); + auto snapshot = CreateVolatileSnapshot(server, {kMainTable}); + auto datashards = GetTableShards(server, sender, kMainTable); + TTableId tableId = ResolveTableId(server, sender, kMainTable); + + TString err; + + for (auto tid : datashards) { + auto ev1 = std::make_unique(); + auto ev2 = std::make_unique(); + auto fill = [&](std::unique_ptr& ev) { + auto& rec = ev->Record; + rec.SetId(1); + + rec.SetSeqNoGeneration(id); + rec.SetSeqNoRound(1); + + rec.SetTabletId(tid); + PathIdFromPathId(tableId.PathId, rec.MutablePathId()); + + rec.SetSnapshotTxId(snapshot.TxId); + rec.SetSnapshotStep(snapshot.Step); + + VectorIndexSettings settings; + settings.set_vector_dimension(2); + settings.set_vector_type(type); + if constexpr (std::is_same_v) { + settings.set_distance(metric); + } else { + settings.set_similarity(metric); + } + *rec.MutableSettings() = settings; + + rec.SetUpload(upload); + + *rec.MutableClusters() = {level.begin(), level.end()}; + + rec.SetParent(parent); + rec.SetChild(parent + 1); + + rec.SetEmbeddingColumn("embedding"); + rec.AddDataColumns("data"); + + rec.SetPostingName(kPostingTable); + }; + fill(ev1); + fill(ev2); + + runtime.SendToPipe(tid, sender, ev1.release(), 0, GetPipeConfigWithRetries()); + runtime.SendToPipe(tid, sender, ev2.release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr handle; + auto reply = runtime.GrabEdgeEventRethrow(handle); + + NYql::TIssues issues; + NYql::IssuesFromMessage(reply->Record.GetIssues(), issues); + UNIT_ASSERT_EQUAL_C(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::DONE, issues.ToOneLineString()); + } + + auto posting = ReadShardedTable(server, kPostingTable); + return std::move(posting); + } + + static void DropTable(Tests::TServer::TPtr server, TActorId sender, const char* name) { + ui64 txId = AsyncDropTable(server, sender, "/Root", name); + WaitTxNotification(server, sender, txId); + } + + static void CreateMainTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options) { + options.AllowSystemColumnNames(false); + options.Columns({ + {"key", "Uint32", true, true}, + {"embedding", "String", false, false}, + {"data", "String", false, false}, + }); + CreateShardedTable(server, sender, "/Root", "table-main", options); + } + + static void CreatePostingTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options) { + options.AllowSystemColumnNames(true); + options.Columns({ + {PostingTable_ParentIdColumn, "Uint32", true, true}, + {"key", "Uint32", true, true}, + {"data", "String", false, false}, + }); + CreateShardedTable(server, sender, "/Root", "table-posting", options); + } + + static void CreateTmpTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options, const char* name) { + options.AllowSystemColumnNames(true); + options.Columns({ + {PostingTable_ParentIdColumn, "Uint32", true, true}, + {"key", "Uint32", true, true}, + {"embedding", "String", false, false}, + {"data", "String", false, false}, + }); + CreateShardedTable(server, sender, "/Root", name, options); + } + + Y_UNIT_TEST (BadRequest) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root"); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-main", 1); + + { + auto ev = std::make_unique(); + auto& rec = ev->Record; + + rec.AddClusters("to make it empty"); + DoBadRequest(server, sender, ev); + } + { + auto ev = std::make_unique(); + auto& rec = ev->Record; + + rec.SetEmbeddingColumn("to make it empty"); + DoBadRequest(server, sender, ev); + } + { + auto ev = std::make_unique(); + auto& rec = ev->Record; + + rec.SetTabletId(0); + DoBadRequest(server, sender, ev); + } + { + auto ev = std::make_unique(); + auto& rec = ev->Record; + + PathIdFromPathId({0, 0}, rec.MutablePathId()); + DoBadRequest(server, sender, ev); + } + { + auto ev = std::make_unique(); + + DoBadRequest(server, sender, ev, 0); + } + { + auto ev = std::make_unique(); + + // TODO(mbkkt) bit vector not supported for now + DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_BIT); + } + { + auto ev = std::make_unique(); + + DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_UNSPECIFIED); + } + { + auto ev = std::make_unique(); + + DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_FLOAT, VectorIndexSettings::DISTANCE_UNSPECIFIED); + } + // TODO(mbkkt) For now all build_index, sample_k, build_columns, local_kmeans doesn't really check this + // { + // auto ev = std::make_unique(); + // auto snapshotCopy = snapshot; + // snapshotCopy.Step++; + // DoBadRequest(server, sender, ev); + // } + // { + // auto ev = std::make_unique(); + // auto snapshotCopy = snapshot; + // snapshotCopy.TxId++; + // DoBadRequest(server, sender, ev); + // } + } + + Y_UNIT_TEST (MainToPosting) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root"); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TShardedTableOptions options; + options.EnableOutOfOrder(true); // TODO(mbkkt) what is it? + options.Shards(1); + + CreateMainTable(server, sender, options); + // Upsert some initial values + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-main` + (key, embedding, data) + VALUES )" + "(1, \"\x30\x30\3\", \"one\")," + "(2, \"\x31\x31\3\", \"two\")," + "(3, \"\x32\x32\3\", \"three\")," + "(4, \"\x65\x65\3\", \"four\")," + "(5, \"\x75\x75\3\", \"five\");"); + + auto create = [&] { + CreatePostingTable(server, sender, options); + }; + create(); + auto recreate = [&] { + DropTable(server, sender, "table-posting"); + create(); + }; + + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "mm\3", + "11\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 4, data = four\n" + "__ydb_parent = 1, key = 5, data = five\n" + "__ydb_parent = 2, key = 1, data = one\n" + "__ydb_parent = 2, key = 2, data = two\n" + "__ydb_parent = 2, key = 3, data = three\n"); + recreate(); + } + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "11\3", + "mm\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, data = one\n" + "__ydb_parent = 1, key = 2, data = two\n" + "__ydb_parent = 1, key = 3, data = three\n" + "__ydb_parent = 2, key = 4, data = four\n" + "__ydb_parent = 2, key = 5, data = five\n"); + recreate(); + } + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, data = one\n" + "__ydb_parent = 1, key = 2, data = two\n" + "__ydb_parent = 1, key = 3, data = three\n" + "__ydb_parent = 1, key = 4, data = four\n" + "__ydb_parent = 1, key = 5, data = five\n"); + recreate(); + } + { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, data = one\n" + "__ydb_parent = 1, key = 2, data = two\n" + "__ydb_parent = 1, key = 3, data = three\n" + "__ydb_parent = 1, key = 4, data = four\n" + "__ydb_parent = 1, key = 5, data = five\n"); + recreate(); + } + } + + Y_UNIT_TEST (MainToTmp) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root"); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TShardedTableOptions options; + options.EnableOutOfOrder(true); // TODO(mbkkt) what is it? + options.Shards(1); + + CreateMainTable(server, sender, options); + // Upsert some initial values + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-main` + (key, embedding, data) + VALUES )" + "(1, \"\x30\x30\3\", \"one\")," + "(2, \"\x31\x31\3\", \"two\")," + "(3, \"\x32\x32\3\", \"three\")," + "(4, \"\x65\x65\3\", \"four\")," + "(5, \"\x75\x75\3\", \"five\");"); + + auto create = [&] { + CreateTmpTable(server, sender, options, "table-posting"); + }; + create(); + auto recreate = [&] { + DropTable(server, sender, "table-posting"); + create(); + }; + + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "mm\3", + "11\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n" + "__ydb_parent = 2, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 2, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 2, key = 3, embedding = \x32\x32\3, data = three\n"); + recreate(); + } + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "11\3", + "mm\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 2, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 2, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 0, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + } + + Y_UNIT_TEST (TmpToPosting) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root"); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TShardedTableOptions options; + options.EnableOutOfOrder(true); // TODO(mbkkt) what is it? + options.Shards(1); + + CreateTmpTable(server, sender, options, "table-main"); + // Upsert some initial values + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-main` + (__ydb_parent, key, embedding, data) + VALUES )" + "(39, 1, \"\x30\x30\3\", \"one\")," + "(40, 1, \"\x30\x30\3\", \"one\")," + "(40, 2, \"\x31\x31\3\", \"two\")," + "(40, 3, \"\x32\x32\3\", \"three\")," + "(40, 4, \"\x65\x65\3\", \"four\")," + "(40, 5, \"\x75\x75\3\", \"five\")," + "(41, 5, \"\x75\x75\3\", \"five\");"); + + auto create = [&] { + CreatePostingTable(server, sender, options); + }; + create(); + auto recreate = [&] { + DropTable(server, sender, "table-posting"); + create(); + }; + + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "mm\3", + "11\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 4, data = four\n" + "__ydb_parent = 41, key = 5, data = five\n" + "__ydb_parent = 42, key = 1, data = one\n" + "__ydb_parent = 42, key = 2, data = two\n" + "__ydb_parent = 42, key = 3, data = three\n"); + recreate(); + } + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "11\3", + "mm\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, data = one\n" + "__ydb_parent = 41, key = 2, data = two\n" + "__ydb_parent = 41, key = 3, data = three\n" + "__ydb_parent = 42, key = 4, data = four\n" + "__ydb_parent = 42, key = 5, data = five\n"); + recreate(); + } + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, data = one\n" + "__ydb_parent = 41, key = 2, data = two\n" + "__ydb_parent = 41, key = 3, data = three\n" + "__ydb_parent = 41, key = 4, data = four\n" + "__ydb_parent = 41, key = 5, data = five\n"); + recreate(); + } + { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, data = one\n" + "__ydb_parent = 41, key = 2, data = two\n" + "__ydb_parent = 41, key = 3, data = three\n" + "__ydb_parent = 41, key = 4, data = four\n" + "__ydb_parent = 41, key = 5, data = five\n"); + recreate(); + } + } + + Y_UNIT_TEST (TmpToTmp) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root"); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TShardedTableOptions options; + options.EnableOutOfOrder(true); // TODO(mbkkt) what is it? + options.Shards(1); + + CreateTmpTable(server, sender, options, "table-main"); + // Upsert some initial values + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-main` + (__ydb_parent, key, embedding, data) + VALUES )" + "(39, 1, \"\x30\x30\3\", \"one\")," + "(40, 1, \"\x30\x30\3\", \"one\")," + "(40, 2, \"\x31\x31\3\", \"two\")," + "(40, 3, \"\x32\x32\3\", \"three\")," + "(40, 4, \"\x65\x65\3\", \"four\")," + "(40, 5, \"\x75\x75\3\", \"five\")," + "(41, 5, \"\x75\x75\3\", \"five\");"); + + auto create = [&] { + CreateTmpTable(server, sender, options, "table-posting"); + }; + create(); + auto recreate = [&] { + DropTable(server, sender, "table-posting"); + create(); + }; + + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "mm\3", + "11\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n" + "__ydb_parent = 42, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 42, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 42, key = 3, embedding = \x32\x32\3, data = three\n"); + recreate(); + } + for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) { + std::vector level = { + "11\3", + "mm\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 42, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 42, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + { + std::vector level = { + "II\3", + }; + auto posting = DoReshuffleKMeans(server, sender, 40, level, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE); + UNIT_ASSERT_VALUES_EQUAL(posting, + "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n" + "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n" + "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n" + "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n" + "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n"); + recreate(); + } + } +} + +} diff --git a/ydb/core/tx/datashard/kmeans_helper.cpp b/ydb/core/tx/datashard/kmeans_helper.cpp new file mode 100644 index 000000000000..88de12f29968 --- /dev/null +++ b/ydb/core/tx/datashard/kmeans_helper.cpp @@ -0,0 +1,124 @@ +#include "kmeans_helper.h" + +#include + +namespace NKikimr::NDataShard::NKMeans { + +TTableRange CreateRangeFrom(const TUserTable& table, ui32 parent, TCell& from, TCell& to) { + if (parent == 0) { + return table.GetTableRange(); + } + from = TCell::Make(parent - 1); + to = TCell::Make(parent); + TTableRange range{{&from, 1}, false, {&to, 1}, true}; + return Intersect(table.KeyColumnTypes, range, table.GetTableRange()); +} + +NTable::TLead CreateLeadFrom(const TTableRange& range) { + NTable::TLead lead; + if (range.From) { + lead.To(range.From, range.InclusiveFrom ? NTable::ESeek::Lower : NTable::ESeek::Upper); + } else { + lead.To({}, NTable::ESeek::Lower); + } + if (range.To) { + lead.Until(range.To, range.InclusiveTo); + } + return lead; +} + +void AddRowMain2Tmp(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row) { + std::array cells; + cells[0] = TCell::Make(parent); + auto pk = TSerializedCellVec::Serialize(cells); + TSerializedCellVec::UnsafeAppendCells(key, pk); + buffer.AddRow(TSerializedCellVec{key}, TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize(*row)); +} + +void AddRowMain2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, ui32 dataPos) { + std::array cells; + cells[0] = TCell::Make(parent); + auto pk = TSerializedCellVec::Serialize(cells); + TSerializedCellVec::UnsafeAppendCells(key, pk); + buffer.AddRow(TSerializedCellVec{key}, TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize((*row).Slice(dataPos))); +} + +void AddRowTmp2Tmp(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row) { + std::array cells; + cells[0] = TCell::Make(parent); + auto pk = TSerializedCellVec::Serialize(cells); + TSerializedCellVec::UnsafeAppendCells(key.Slice(1), pk); + buffer.AddRow(TSerializedCellVec{key}, TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize(*row)); +} + +void AddRowTmp2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, ui32 dataPos) { + std::array cells; + cells[0] = TCell::Make(parent); + auto pk = TSerializedCellVec::Serialize(cells); + TSerializedCellVec::UnsafeAppendCells(key.Slice(1), pk); + buffer.AddRow(TSerializedCellVec{key}, TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize((*row).Slice(dataPos))); +} + +TTags MakeUploadTags(const TUserTable& table, + const TProtoStringType& embedding, + const google::protobuf::RepeatedPtrField& data, + ui32& embeddingPos, + ui32& dataPos, + NTable::TTag& embeddingTag) { + auto tags = GetAllTags(table); + TTags uploadTags; + uploadTags.reserve(1 + data.size()); + embeddingTag = tags.at(embedding); + if (auto it = std::find(data.begin(), data.end(), embedding); it != data.end()) { + embeddingPos = it - data.begin(); + dataPos = 0; + } else { + uploadTags.push_back(embeddingTag); + } + for (const auto& column : data) { + uploadTags.push_back(tags.at(column)); + } + return uploadTags; +} + +std::shared_ptr MakeUploadTypes(const TUserTable& table, + NKikimrTxDataShard::TEvLocalKMeansRequest::EState uploadState, + const TProtoStringType& embedding, + const google::protobuf::RepeatedPtrField& data) { + auto types = GetAllTypes(table); + + auto uploadTypes = std::make_shared(); + uploadTypes->reserve(1 + 1 + std::min(table.KeyColumnTypes.size() + data.size(), types.size())); + + Ydb::Type type; + type.set_type_id(Ydb::Type::UINT32); + uploadTypes->emplace_back(NTableIndex::NTableVectorKmeansTreeIndex::PostingTable_ParentIdColumn, type); + + auto addType = [&](const auto& column) { + auto it = types.find(column); + Y_ABORT_UNLESS(it != types.end()); + ProtoYdbTypeFromTypeInfo(&type, it->second); + uploadTypes->emplace_back(it->first, type); + types.erase(it); + }; + for (const auto& column : table.KeyColumnIds) { + addType(table.Columns.at(column).Name); + } + switch (uploadState) { + case NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP: + case NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP: + addType(embedding); + [[fallthrough]]; + case NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING: + case NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING: { + for (const auto& column : data) { + addType(column); + } + } break; + default: + Y_UNREACHABLE(); + } + return uploadTypes; +} + +} diff --git a/ydb/core/tx/datashard/kmeans_helper.h b/ydb/core/tx/datashard/kmeans_helper.h new file mode 100644 index 000000000000..ea0dac181bad --- /dev/null +++ b/ydb/core/tx/datashard/kmeans_helper.h @@ -0,0 +1,269 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include + +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream) + +namespace NKikimr::NDataShard::NKMeans { + +// TODO(mbkkt) We are know all tempalte parameters for these functions and classes so we can move them to cpp + +template +Y_PURE_FUNCTION TTriWayDotProduct CosineImpl(const float* lhs, const float* rhs, size_t length) noexcept { + auto r = TriWayDotProduct(lhs, rhs, length); + return {static_cast(r.LL), static_cast(r.LR), static_cast(r.RR)}; +} + +template +Y_PURE_FUNCTION TTriWayDotProduct CosineImpl(const i8* lhs, const i8* rhs, size_t length) noexcept { + const auto ll = DotProduct(lhs, lhs, length); + const auto lr = DotProduct(lhs, rhs, length); + const auto rr = DotProduct(rhs, rhs, length); + return {static_cast(ll), static_cast(lr), static_cast(rr)}; +} + +template +Y_PURE_FUNCTION TTriWayDotProduct CosineImpl(const ui8* lhs, const ui8* rhs, size_t length) noexcept { + const auto ll = DotProduct(lhs, lhs, length); + const auto lr = DotProduct(lhs, rhs, length); + const auto rr = DotProduct(rhs, rhs, length); + return {static_cast(ll), static_cast(lr), static_cast(rr)}; +} + +TTableRange CreateRangeFrom(const TUserTable& table, ui32 parent, TCell& from, TCell& to); + +NTable::TLead CreateLeadFrom(const TTableRange& range); + +// TODO(mbkkt) separate implementation for bit +template +struct TMetric { + using TCoord = T; + // TODO(mbkkt) maybe compute floating sum in double? Needs benchmark + using TSum = std::conditional_t, T, int64_t>; + + ui32 Dimensions = 0; + + bool IsExpectedSize(TArrayRef data) const noexcept { + return data.size() == 1 + sizeof(TCoord) * Dimensions; + } + + auto GetCoords(const char* coords) { + return std::span{reinterpret_cast(coords), Dimensions}; + } + + auto GetData(char* data) { + return std::span{reinterpret_cast(data), Dimensions}; + } + + void Fill(TString& d, TSum* embedding, ui64& c) { + const auto count = static_cast(std::exchange(c, 0)); + auto data = GetData(d.MutRef().data()); + for (auto& coord : data) { + coord = *embedding / count; + *embedding++ = 0; + } + } +}; + +template +struct TCosineSimilarity: TMetric { + using TCoord = typename TMetric::TCoord; + using TSum = typename TMetric::TSum; + // double used to avoid precision issues + using TRes = double; + + static TRes Init() { + return std::numeric_limits::max(); + } + + auto Distance(const char* cluster, const char* embedding) const noexcept { + const auto r = CosineImpl(reinterpret_cast(cluster), reinterpret_cast(embedding), this->Dimensions); + // sqrt(ll) * sqrt(rr) computed instead of sqrt(ll * rr) to avoid precision issues + const auto norm = std::sqrt(r.LL) * std::sqrt(r.RR); + const TRes similarity = norm != 0 ? static_cast(r.LR) / static_cast(norm) : 0; + return -similarity; + } +}; + +template +struct TL1Distance: TMetric { + using TCoord = typename TMetric::TCoord; + using TSum = typename TMetric::TSum; + using TRes = std::conditional_t, T, ui64>; + + static TRes Init() { + return std::numeric_limits::max(); + } + + auto Distance(const char* cluster, const char* embedding) const noexcept { + const auto distance = L1Distance(reinterpret_cast(cluster), reinterpret_cast(embedding), this->Dimensions); + return distance; + } +}; + +template +struct TL2Distance: TMetric { + using TCoord = typename TMetric::TCoord; + using TSum = typename TMetric::TSum; + using TRes = std::conditional_t, T, ui64>; + + static TRes Init() { + return std::numeric_limits::max(); + } + + auto Distance(const char* cluster, const char* embedding) const noexcept { + const auto distance = L2SqrDistance(reinterpret_cast(cluster), reinterpret_cast(embedding), this->Dimensions); + return distance; + } +}; + +template +struct TMaxInnerProductSimilarity: TMetric { + using TCoord = typename TMetric::TCoord; + using TSum = typename TMetric::TSum; + using TRes = std::conditional_t, T, i64>; + + static TRes Init() { + return std::numeric_limits::max(); + } + + auto Distance(const char* cluster, const char* embedding) const noexcept { + const TRes similarity = DotProduct(reinterpret_cast(cluster), reinterpret_cast(embedding), this->Dimensions); + return -similarity; + } +}; + +template +struct TCalculation: TMetric { + ui32 FindClosest(std::span clusters, const char* embedding) const { + auto min = this->Init(); + ui32 closest = std::numeric_limits::max(); + for (size_t i = 0; const auto& cluster : clusters) { + auto distance = this->Distance(cluster.data(), embedding); + if (distance < min) { + min = distance; + closest = i; + } + ++i; + } + return closest; + } +}; + +struct TStats { + ui64 Rows = 0; + ui64 Bytes = 0; +}; + +template +ui32 FeedEmbedding(const TCalculation& calculation, std::span clusters, const NTable::TRowState& row, NTable::TPos embeddingPos, TStats& stats) { + Y_ASSERT(embeddingPos < row.Size()); + const auto embedding = row.Get(embeddingPos).AsRef(); + stats.Rows += 1; + stats.Bytes += embedding.size(); // TODO(mbkkt) add some constant overhead? + if (!calculation.IsExpectedSize(embedding)) { + return std::numeric_limits::max(); + } + return calculation.FindClosest(clusters, embedding.data()); +} + +void AddRowMain2Tmp(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row); + +void AddRowMain2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, ui32 dataPos); + +void AddRowTmp2Tmp(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row); + +void AddRowTmp2Posting(TBufferData& buffer, ui32 parent, TArrayRef key, const NTable::TRowState& row, ui32 dataPos); + +TTags MakeUploadTags(const TUserTable& table, + const TProtoStringType& embedding, + const google::protobuf::RepeatedPtrField& data, + ui32& embeddingPos, + ui32& dataPos, + NTable::TTag& embeddingTag); + +std::shared_ptr MakeUploadTypes(const TUserTable& table, + NKikimrTxDataShard::TEvLocalKMeansRequest::EState uploadState, + const TProtoStringType& embedding, + const google::protobuf::RepeatedPtrField& data); + +void MakeScan(auto& record, const auto& createScan, const auto& badRequest) { + if (!record.HasEmbeddingColumn()) { + badRequest(TStringBuilder() << "Should be specified embedding column"); + return; + } + + const auto& settings = record.GetSettings(); + if (settings.vector_dimension() < 1) { + badRequest(TStringBuilder() << "Dimension of vector should be at least one"); + return; + } + + auto handleType = [&]