Skip to content

Commit

Permalink
Merge 25bf47f into f4d6825
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt authored Nov 26, 2024
2 parents f4d6825 + 25bf47f commit 4ddb7dc
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 78 deletions.
4 changes: 3 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
}

auto index = GetIndex(childPath);
if (index->Type == NKikimrSchemeOp::EIndexTypeGlobalAsync) {
if (index->Type == NKikimrSchemeOp::EIndexTypeGlobalAsync
|| index->Type == NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree) {
continue;
}

Expand Down Expand Up @@ -267,6 +268,7 @@ struct TSchemeShard::TTxRunConditionalErase: public TSchemeShard::TRwTxBase {
}

static TVector<std::pair<ui32, ui32>> MakeColumnIds(TTableInfo::TPtr mainTable, TTableIndexInfo::TPtr index, TTableInfo::TPtr indexImplTable) {
Y_ABORT_UNLESS(index->Type != NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree);
TVector<std::pair<ui32, ui32>> result;
THashSet<TString> keys;

Expand Down
33 changes: 19 additions & 14 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,18 +987,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return LoadBackupStatusesImpl(statuses, byShardBackupStatus, byMigratedShardBackupStatus, byTxShardStatus);
}

typedef std::tuple<TPathId, ui64, NKikimrSchemeOp::EIndexType, NKikimrSchemeOp::EIndexState> TTableIndexRec;
typedef std::tuple<TPathId, ui64, NKikimrSchemeOp::EIndexType, NKikimrSchemeOp::EIndexState, TString> TTableIndexRec;
typedef TDeque<TTableIndexRec> TTableIndexRows;

template <typename SchemaTable, typename TRowSet>
static TTableIndexRec MakeTableIndexRec(const TPathId& pathId, TRowSet& rowSet) {
return std::make_tuple(pathId,
rowSet.template GetValue<typename SchemaTable::AlterVersion>(),
rowSet.template GetValue<typename SchemaTable::IndexType>(),
rowSet.template GetValue<typename SchemaTable::State>()
);
}

bool LoadTableIndexes(NIceDb::TNiceDb& db, TTableIndexRows& tableIndexes) const {
{
auto rowSet = db.Table<Schema::TableIndex>().Range().Select();
Expand All @@ -1007,7 +998,13 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
while (!rowSet.EndOfSet()) {
const auto pathId = Self->MakeLocalId(TLocalPathId(rowSet.GetValue<Schema::TableIndex::PathId>()));
tableIndexes.push_back(MakeTableIndexRec<Schema::TableIndex>(pathId, rowSet));
tableIndexes.emplace_back(
pathId,
rowSet.GetValue<Schema::TableIndex::AlterVersion>(),
rowSet.GetValue<Schema::TableIndex::IndexType>(),
rowSet.GetValue<Schema::TableIndex::State>(),
rowSet.GetValue<Schema::TableIndex::Description>()
);

if (!rowSet.Next()) {
return false;
Expand All @@ -1024,7 +1021,13 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TOwnerId(rowSet.GetValue<Schema::MigratedTableIndex::OwnerPathId>()),
TLocalPathId(rowSet.GetValue<Schema::MigratedTableIndex::LocalPathId>())
);
tableIndexes.push_back(MakeTableIndexRec<Schema::MigratedTableIndex>(pathId, rowSet));
tableIndexes.emplace_back(
pathId,
rowSet.GetValue<Schema::MigratedTableIndex::AlterVersion>(),
rowSet.GetValue<Schema::MigratedTableIndex::IndexType>(),
rowSet.GetValue<Schema::MigratedTableIndex::State>(),
TString{}
);

if (!rowSet.Next()) {
return false;
Expand Down Expand Up @@ -2791,6 +2794,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
ui64 alterVersion = std::get<1>(rec);
TTableIndexInfo::EType indexType = std::get<2>(rec);
TTableIndexInfo::EState state = std::get<3>(rec);
auto description = std::get<4>(rec);

Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
TPathElement::TPtr path = Self->PathsById.at(pathId);
Expand All @@ -2799,7 +2803,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType));

Y_ABORT_UNLESS(!Self->Indexes.contains(pathId));
Self->Indexes[pathId] = new TTableIndexInfo(alterVersion, indexType, state);
Self->Indexes[pathId] = new TTableIndexInfo(alterVersion, indexType, state, description);
Self->IncrementPathDbRefCount(pathId);
}

Expand All @@ -2816,6 +2820,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
ui64 alterVersion = rowset.GetValue<Schema::TableIndexAlterData::AlterVersion>();
TTableIndexInfo::EType indexType = rowset.GetValue<Schema::TableIndexAlterData::IndexType>();
TTableIndexInfo::EState state = rowset.GetValue<Schema::TableIndexAlterData::State>();
auto description = rowset.GetValue<Schema::TableIndexAlterData::Description>();

Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
TPathElement::TPtr path = Self->PathsById.at(pathId);
Expand All @@ -2828,7 +2833,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto tableIndex = Self->Indexes.at(pathId);
Y_ABORT_UNLESS(tableIndex->AlterData == nullptr);
Y_ABORT_UNLESS(tableIndex->AlterVersion < alterVersion);
tableIndex->AlterData = new TTableIndexInfo(alterVersion, indexType, state);
tableIndex->AlterData = new TTableIndexInfo(alterVersion, indexType, state, description);

Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", index pathId: " << pathId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ NKikimrSchemeOp::TModifyScheme CreateIndexTask(NKikimr::NSchemeShard::TTableInde
operation->SetName(dst.LeafName());

operation->SetType(indexInfo->Type);
Y_ABORT_UNLESS(indexInfo->Type != NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree);
for (const auto& keyName: indexInfo->IndexKeys) {
*operation->MutableKeyColumnNames()->Add() = keyName;
}
Expand Down Expand Up @@ -174,9 +175,14 @@ bool CreateConsistentCopyTables(
}

Y_ABORT_UNLESS(srcIndexPath.Base()->PathId == pathId);
TTableIndexInfo::TPtr indexInfo = context.SS->Indexes.at(pathId);
if (indexInfo->Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter,
"Consistent copy table doesn't support table with vector index")};
return false;
}
Y_VERIFY_S(srcIndexPath.Base()->GetChildren().size() == 1, srcIndexPath.PathString() << " has children " << srcIndexPath.Base()->GetChildren().size() << " but 1 expected");

TTableIndexInfo::TPtr indexInfo = context.SS->Indexes.at(pathId);
result.push_back(CreateNewTableIndex(NextPartId(nextId, result), CreateIndexTask(indexInfo, dstIndexPath)));

TString srcImplTableName = srcIndexPath.Base()->GetChildren().begin()->first;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,10 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
auto operation = schema.MutableCreateTableIndex();
operation->SetName(name);
operation->SetType(indexInfo->Type);
if (indexInfo->Type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
return {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter,
"Copy table doesn't support table with vector index")};
}
for (const auto& keyName: indexInfo->IndexKeys) {
*operation->MutableKeyColumnNames()->Add() = keyName;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder
NKikimrSchemeOp::TVectorIndexKmeansTreeDescription vectorIndexKmeansTreeDescription;
*vectorIndexKmeansTreeDescription.MutableSettings() = index.global_vector_kmeans_tree_index().vector_settings();
buildInfo.SpecializedIndexDescription = vectorIndexKmeansTreeDescription;
buildInfo.KMeans.K = std::max<ui32>(2, vectorIndexKmeansTreeDescription.GetSettings().clusters());
buildInfo.KMeans.Levels = std::max<ui32>(1, vectorIndexKmeansTreeDescription.GetSettings().levels());
break;
}
case Ydb::Table::TableIndex::TypeCase::TYPE_NOT_SET:
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,8 @@ void TSchemeShard::PersistTableIndex(NIceDb::TNiceDb& db, const TPathId& pathId)
db.Table<Schema::TableIndex>().Key(element->PathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::TableIndex::AlterVersion>(alterData->AlterVersion),
NIceDb::TUpdate<Schema::TableIndex::IndexType>(alterData->Type),
NIceDb::TUpdate<Schema::TableIndex::State>(alterData->State));
NIceDb::TUpdate<Schema::TableIndex::State>(alterData->State),
NIceDb::TUpdate<Schema::TableIndex::Description>(alterData->SerializeDescription()));

db.Table<Schema::TableIndexAlterData>().Key(element->PathId.LocalPathId).Delete();

Expand Down Expand Up @@ -1730,7 +1731,8 @@ void TSchemeShard::PersistTableIndexAlterData(NIceDb::TNiceDb& db, const TPathId
db.Table<Schema::TableIndexAlterData>().Key(elem->PathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::TableIndexAlterData::AlterVersion>(alterData->AlterVersion),
NIceDb::TUpdate<Schema::TableIndexAlterData::IndexType>(alterData->Type),
NIceDb::TUpdate<Schema::TableIndexAlterData::State>(alterData->State));
NIceDb::TUpdate<Schema::TableIndexAlterData::State>(alterData->State),
NIceDb::TUpdate<Schema::TableIndexAlterData::Description>(alterData->SerializeDescription()));

for (ui32 keyIdx = 0; keyIdx < alterData->IndexKeys.size(); ++keyIdx) {
db.Table<Schema::TableIndexKeysAlterData>().Key(elem->PathId.LocalPathId, keyIdx).Update(
Expand Down Expand Up @@ -7477,19 +7479,19 @@ void TSchemeShard::ResolveSA() {
StatisticsAggregatorId = subDomainInfo->GetTenantStatisticsAggregatorID();
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
"ResolveSA(), StatisticsAggregatorId=" << StatisticsAggregatorId
<< ", at schemeshard: " << TabletID());
<< ", at schemeshard: " << TabletID());
ConnectToSA();
}
}

void TSchemeShard::ConnectToSA() {
if (!EnableStatistics)
return;

if (!StatisticsAggregatorId) {
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
"ConnectToSA(), no StatisticsAggregatorId"
<< ", at schemeshard: " << TabletID());
<< ", at schemeshard: " << TabletID());
return;
}
auto policy = NTabletPipe::TClientRetryPolicy::WithRetries();
Expand Down Expand Up @@ -7606,8 +7608,8 @@ TDuration TSchemeShard::SendBaseStatsToSA() {
<< ", path count: " << count
<< ", at schemeshard: " << TabletID());

return TDuration::Seconds(SendStatsIntervalMinSeconds
+ RandomNumber<ui64>(SendStatsIntervalMaxSeconds - SendStatsIntervalMinSeconds));
return TDuration::Seconds(SendStatsIntervalMinSeconds
+ RandomNumber<ui64>(SendStatsIntervalMaxSeconds - SendStatsIntervalMinSeconds));
}

} // namespace NSchemeShard
Expand Down
25 changes: 21 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2371,11 +2371,16 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> {
using EType = NKikimrSchemeOp::EIndexType;
using EState = NKikimrSchemeOp::EIndexState;

TTableIndexInfo(ui64 version, EType type, EState state)
TTableIndexInfo(ui64 version, EType type, EState state, std::string_view description)
: AlterVersion(version)
, Type(type)
, State(state)
{}
{
if (type == NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree) {
Y_ABORT_UNLESS(SpecializedIndexDescription.emplace<NKikimrSchemeOp::TVectorIndexKmeansTreeDescription>()
.ParseFromString(description));
}
}

TTableIndexInfo(const TTableIndexInfo&) = default;

Expand All @@ -2391,8 +2396,20 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> {
return result;
}

TString SerializeDescription() const {
return std::visit([]<typename T>(const T& v) {
if constexpr (std::is_same_v<std::monostate, T>) {
return TString{};
} else {
TString str{v.SerializeAsString()};
Y_ABORT_UNLESS(!str.empty());
return str;
}
}, SpecializedIndexDescription);
}

static TPtr NotExistedYet(EType type) {
return new TTableIndexInfo(0, type, EState::EIndexStateInvalid);
return new TTableIndexInfo(0, type, EState::EIndexStateInvalid, {});
}

static TPtr Create(const NKikimrSchemeOp::TIndexCreationConfig& config, TString& errMsg) {
Expand All @@ -2405,7 +2422,7 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> {

TPtr alterData = result->CreateNextVersion();
alterData->IndexKeys.assign(config.GetKeyColumnNames().begin(), config.GetKeyColumnNames().end());
Y_ABORT_UNLESS(alterData->IndexKeys.size());
Y_ABORT_UNLESS(!alterData->IndexKeys.empty());
alterData->IndexDataColumns.assign(config.GetDataColumnNames().begin(), config.GetDataColumnNames().end());

alterData->State = config.HasState() ? config.GetState() : EState::EIndexStateReady;
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1022,9 +1022,11 @@ struct Schema : NIceDb::Schema {
struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {};
struct IndexType : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EIndexType; static constexpr Type Default = NKikimrSchemeOp::EIndexTypeInvalid; };
struct State : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EIndexState; static constexpr Type Default = NKikimrSchemeOp::EIndexStateInvalid; };
// One of the SpecializedIndexDescription protobufs serialized as a string.
struct Description : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<PathId>;
using TColumns = TableColumns<PathId, AlterVersion, IndexType, State>;
using TColumns = TableColumns<PathId, AlterVersion, IndexType, State, Description>;
};

struct MigratedTableIndex : Table<67> {
Expand All @@ -1043,9 +1045,11 @@ struct Schema : NIceDb::Schema {
struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {};
struct IndexType : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EIndexType; static constexpr Type Default = NKikimrSchemeOp::EIndexTypeInvalid; };
struct State : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EIndexState; static constexpr Type Default = NKikimrSchemeOp::EIndexStateInvalid; };
// One of the SpecializedIndexDescription protobufs serialized as a string.
struct Description : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<PathId>;
using TColumns = TableColumns<PathId, AlterVersion, IndexType, State>;
using TColumns = TableColumns<PathId, AlterVersion, IndexType, State, Description>;
};

struct TableIndexKeys : Table<40> {
Expand Down Expand Up @@ -1326,7 +1330,7 @@ struct Schema : NIceDb::Schema {
struct AlterMainTableTxDone : Column<33, NScheme::NTypeIds::Bool> {};

// Serialized as string NKikimrSchemeOp::TIndexCreationConfig protobuf.
struct CreationConfig : Column<34, NScheme::NTypeIds::String> { using Type = TString; };
struct CreationConfig : Column<34, NScheme::NTypeIds::String> {};

struct ReadRowsBilled : Column<35, NScheme::NTypeIds::Uint64> {};
struct ReadBytesBilled : Column<36, NScheme::NTypeIds::Uint64> {};
Expand Down Expand Up @@ -1440,7 +1444,7 @@ struct Schema : NIceDb::Schema {
struct LocalShardIdx : Column<3, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; };

struct Range : Column<4, NScheme::NTypeIds::String> { using Type = NKikimrTx::TKeyRange; };
struct LastKeyAck : Column<5, NScheme::NTypeIds::String> { using Type = TString; };
struct LastKeyAck : Column<5, NScheme::NTypeIds::String> {};

struct Status : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrIndexBuilder::EBuildStatus; };
struct Message : Column<7, NScheme::NTypeIds::Utf8> {};
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1703,14 +1703,16 @@ namespace NSchemeShardUT_Private {
case NKikimrSchemeOp::EIndexTypeGlobalVectorKmeansTree: {
auto& settings = *index.mutable_global_vector_kmeans_tree_index();

auto& vectorIndexSettings = *settings.mutable_vector_settings()->mutable_settings();
if (cfg.VectorIndexSettings) {
cfg.VectorIndexSettings->SerializeTo(vectorIndexSettings);
auto& kmeansTreeSettings = *settings.mutable_vector_settings();
if (cfg.KMeansTreeSettings) {
cfg.KMeansTreeSettings->SerializeTo(kmeansTreeSettings);
} else {
// some random valid settings
vectorIndexSettings.set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
vectorIndexSettings.set_vector_dimension(42);
vectorIndexSettings.set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
kmeansTreeSettings.mutable_settings()->set_vector_type(Ydb::Table::VectorIndexSettings::VECTOR_TYPE_FLOAT);
kmeansTreeSettings.mutable_settings()->set_vector_dimension(42);
kmeansTreeSettings.mutable_settings()->set_metric(Ydb::Table::VectorIndexSettings::DISTANCE_COSINE);
kmeansTreeSettings.set_clusters(4);
kmeansTreeSettings.set_levels(5);
}

if (cfg.GlobalIndexSettings) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

namespace NYdb::NTable {
struct TGlobalIndexSettings;
struct TVectorIndexSettings;
struct TKMeansTreeSettings;
}

namespace NSchemeShardUT_Private {
Expand Down Expand Up @@ -375,7 +375,7 @@ namespace NSchemeShardUT_Private {
TVector<TString> DataColumns;
TVector<NYdb::NTable::TGlobalIndexSettings> GlobalIndexSettings = {};
// implementation note: it was made a pointer, not optional, to enable forward declaration
std::unique_ptr<NYdb::NTable::TVectorIndexSettings> VectorIndexSettings = {};
std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> KMeansTreeSettings = {};
};

std::unique_ptr<TEvIndexBuilder::TEvCreateRequest> CreateBuildColumnRequest(ui64 id, const TString& dbName, const TString& src, const TString& columnName, const Ydb::TypedValue& literal);
Expand Down
Loading

0 comments on commit 4ddb7dc

Please sign in to comment.