From 751776b2a2980caf6d1d8997de1f9b4de74d33d4 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Mon, 16 Sep 2024 15:40:03 +0000 Subject: [PATCH 1/5] Add reinit step for build index --- .../tx/schemeshard/schemeshard__operation.cpp | 2 +- .../schemeshard__operation_create_table.cpp | 10 +- ...emeshard__operation_drop_indexed_table.cpp | 9 +- .../schemeshard_build_index__progress.cpp | 250 ++++++++++++++++-- .../schemeshard_build_index_tx_base.cpp | 2 + .../tx/schemeshard/schemeshard_info_types.cpp | 2 +- .../tx/schemeshard/schemeshard_info_types.h | 30 ++- 7 files changed, 267 insertions(+), 38 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 4a8b548fb726..f51b938cc104 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1335,7 +1335,7 @@ TVector TOperation::ConstructParts(const TTxTransaction& tx Y_ABORT("multipart operations are handled before, also they require transaction details"); case NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexImplTable: - Y_ABORT("multipart operations are handled before, also they require transaction details"); + return {CreateInitializeBuildIndexImplTable(NextPartId(), tx)}; case NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexImplTable: Y_ABORT("multipart operations are handled before, also they require transaction details"); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index 204c7574c46b..34e61af9be17 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -447,9 +447,13 @@ class TCreateTable: public TSubOperation { if (checks) { if (parentPath.Base()->IsTableIndex()) { - checks.IsInsideTableIndexPath() - .IsUnderCreating(NKikimrScheme::StatusNameConflict) - .IsUnderTheSameOperation(OperationId.GetTxId()); //allow only as part of creating base table + checks.IsInsideTableIndexPath(); + // Not tmp index impl tables can be created only as part of create index + if (!NTableIndex::IsTmpImplTable(name)) { + checks + .IsUnderCreating(NKikimrScheme::StatusNameConflict) + .IsUnderTheSameOperation(OperationId.GetTxId()); + } } else if (!Transaction.GetAllowAccessToPrivatePaths()) { checks.IsCommonSensePath() .IsLikeDirectory(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 9ae55b33371b..7551e80efc42 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -396,11 +396,10 @@ TVector CreateDropIndexedTable(TOperationId nextId, const T auto dropOperation = tx.GetDrop(); const TString parentPathStr = tx.GetWorkingDir(); - const TString name = dropOperation.GetName(); TPath table = dropOperation.HasId() ? TPath::Init(TPathId(context.SS->TabletID(), dropOperation.GetId()), context.SS) - : TPath::Resolve(parentPathStr, context.SS).Dive(name); + : TPath::Resolve(parentPathStr, context.SS).Dive(dropOperation.GetName()); { TPath::TChecker checks = table.Check(); @@ -425,8 +424,10 @@ TVector CreateDropIndexedTable(TOperationId nextId, const T checks .IsTable() .NotUnderDeleting() - .NotUnderOperation() - .IsCommonSensePath(); + .NotUnderOperation(); + if (!NTableIndex::IsTmpImplTable(table.LeafName())) { + checks.IsCommonSensePath(); + } } } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index c21c9e8bf55d..68c03dbe4e6f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -4,6 +4,7 @@ #include "schemeshard_build_index_tx_base.h" #include +#include #include #include #include @@ -33,6 +34,10 @@ constexpr const char* Name(TIndexBuildInfo::EState state) noexcept { return "Initiating"; case TIndexBuildInfo::EState::Filling: return "Filling"; + case TIndexBuildInfo::EState::DropTmp: + return "DropTmp"; + case TIndexBuildInfo::EState::CreateTmp: + return "CreateTmp"; case TIndexBuildInfo::EState::Applying: return "Applying"; case TIndexBuildInfo::EState::Unlocking: @@ -112,7 +117,7 @@ class TUploadSampleK: public TActorBootstrapped { return UploadStatus.ToString(); } - void Bootstrap(const NActors::TActorContext& ctx) { + void Bootstrap() { Rows = std::make_shared(); Rows->reserve(Init.size()); std::array PrimaryKeys; @@ -136,28 +141,28 @@ class TUploadSampleK: public TActorBootstrapped { Become(&TThis::StateWork); - Upload(ctx, false); + Upload(false); } private: STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(TEvTxUserProxy::TEvUploadRowsResponse, Handle); - CFunc(TEvents::TSystem::Wakeup, HandleWakeup); + hFunc(TEvTxUserProxy::TEvUploadRowsResponse, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); default: LOG_E("StateWork unexpected event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); } } - void HandleWakeup(const NActors::TActorContext& ctx) { + void HandleWakeup() { LOG_D("Retry upload " << Debug()); if (Rows) { - Upload(ctx, true); + Upload(true); } } - void Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev) { LOG_T("Handle TEvUploadRowsResponse " << Debug() << " Uploader: " << Uploader.ToString() @@ -179,7 +184,7 @@ class TUploadSampleK: public TActorBootstrapped { if (UploadStatus.IsRetriable() && RetryCount < Limits.MaxUploadRowsRetryCount) { LOG_N("Got retriable error, " << Debug() << " RetryCount: " << RetryCount); - ctx.Schedule(Limits.GetTimeoutBackouff(RetryCount), new TEvents::TEvWakeup()); + this->Schedule(Limits.GetTimeoutBackouff(RetryCount), new TEvents::TEvWakeup()); return; } TAutoPtr response = new TEvIndexBuilder::TEvUploadSampleKResponse; @@ -190,10 +195,11 @@ class TUploadSampleK: public TActorBootstrapped { UploadStatusToMessage(response->Record); - ctx.Send(ResponseActorId, response.Release()); + this->Send(ResponseActorId, response.Release()); + this->PassAway(); } - void Upload(const NActors::TActorContext& ctx, bool isRetry) { + void Upload(bool isRetry) { if (isRetry) { ++RetryCount; } else { @@ -208,7 +214,7 @@ class TUploadSampleK: public TActorBootstrapped { NTxProxy::EUploadRowsMode::WriteToTableShadow, // TODO(mbkkt) is it fastest? true /*writeToPrivateTable*/); - Uploader = ctx.Register(actor); + Uploader = this->Register(actor); } }; @@ -261,6 +267,134 @@ THolder InitiatePropose( return propose; } +THolder DropTmpPropose( + TSchemeShard* ss, const TIndexBuildInfo& buildInfo) +{ + Y_ASSERT(buildInfo.IsBuildVectorIndex()); + + auto propose = MakeHolder(ui64(buildInfo.ApplyTxId), ss->TabletID()); + propose->Record.SetFailOnExist(true); + + auto path = TPath::Init(buildInfo.TablePathId, ss).Dive(buildInfo.IndexName); + + NKikimrSchemeOp::TModifyScheme& modifyScheme = *propose->Record.AddTransaction(); + modifyScheme.SetInternal(true); + modifyScheme.SetWorkingDir(path.PathString()); + + using namespace NTableIndex::NTableVectorKmeansTreeIndex; + TString name = PostingTable; + const char* suffix = buildInfo.KMeans.Level % 2 == 0 ? TmpPostingTableSuffix0 : TmpPostingTableSuffix1; + name.append(suffix); + + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable); + modifyScheme.MutableDrop()->SetName(name); + + return propose; +} + +THolder CreateTmpPropose( + TSchemeShard* ss, const TIndexBuildInfo& buildInfo) +{ + Y_ASSERT(buildInfo.IsBuildVectorIndex()); + + auto propose = MakeHolder(ui64(buildInfo.ApplyTxId), ss->TabletID()); + propose->Record.SetFailOnExist(true); + + auto path = TPath::Init(buildInfo.TablePathId, ss); + auto tableId = path->PathId; + path.Dive(buildInfo.IndexName); + + NKikimrSchemeOp::TModifyScheme& modifyScheme = *propose->Record.AddTransaction(); + modifyScheme.SetInternal(true); + modifyScheme.SetWorkingDir(path.PathString()); + + using namespace NTableIndex::NTableVectorKmeansTreeIndex; + TString name0 = PostingTable; + TString name1 = PostingTable; + const char* suffix0 = buildInfo.KMeans.Level % 2 == 0 ? TmpPostingTableSuffix0 : TmpPostingTableSuffix1; + const char* suffix1 = buildInfo.KMeans.Level % 2 == 0 ? TmpPostingTableSuffix1 : TmpPostingTableSuffix0; + name0.append(suffix0); + name1.append(suffix1); + + // TODO(mbkkt) for levels greater than zero we need to disable split/merge completely + // For now it's not guranteed, but very likely + // But lock is really unconvinient approach (needs to store TxId/etc) + // So maybe best way to do this is specify something in defintion, that will prevent these operations like IsBackup + + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpInitiateBuildIndexImplTable); + auto& op = *modifyScheme.MutableCreateTable(); + + op = ss->Tables.at(path.Dive(name1)->PathId)->TableDescription; + op.ClearPath(); + op.SetName(name0); + + THashMap keyColumns; + for (auto& column : *op.MutableColumns()) { + column.ClearFamily(); + column.ClearFamilyName(); + auto [typeInfo, typeMod] = NScheme::TypeInfoModFromProtoColumnType(column.GetTypeId(), &column.GetTypeInfo()); + column.SetType(NScheme::TypeName(typeInfo, typeMod)); + keyColumns.emplace(column.GetId(), column.GetName()); + } + op.ClearKeyColumnNames(); + for (auto id : op.GetKeyColumnIds()) { + op.AddKeyColumnNames(keyColumns[id]); + } + + op.SetSystemColumnNamesAllowed(true); + + const auto& kmeans = buildInfo.KMeans; + Y_ASSERT(kmeans.K != 0); + Y_ASSERT((kmeans.K & (kmeans.K - 1)) == 0); + auto pow = [](ui32 k, ui32 l) { + ui32 r = 1; + while (l != 0) { + if (l % 2 != 0) { + r *= k; + } + k *= k; + l /= 2; + } + return r; + }; + const auto count = pow(kmeans.K, kmeans.Level + 1); + auto step = 1; + auto parts = count; + auto shards = ss->Tables.at(tableId)->GetShard2PartitionIdx().size(); + if (buildInfo.KMeans.Level + 1 == buildInfo.KMeans.Levels || shards <= 1) { + shards = 1; + parts = 1; + } + for (; shards < parts; parts /= 2) { + step *= 2; + } + for (; parts < shards / 2; parts *= 2) { + Y_ASSERT(step == 1); + } + + auto& partition = *op.MutablePartitionConfig()->MutablePartitioningPolicy(); + partition.SetSizeToSplit(0); // disable auto split/merge + partition.SetMinPartitionsCount(parts); + partition.SetMaxPartitionsCount(parts); + partition.ClearFastSplitSettings(); + partition.ClearSplitByLoadSettings(); + + op.ClearSplitBoundary(); + if (parts <= 1) { + return propose; + } + auto i = kmeans.Parent; + for (const auto end = i + count;;) { + i += step; + if (i >= end) { + Y_ASSERT(op.SplitBoundarySize() == std::min(count, parts) - 1); + return propose; + } + auto cell = TCell::Make(i); + op.AddSplitBoundary()->SetSerializedKeyPrefix(TSerializedCellVec::Serialize({&cell, 1})); + } +} + THolder AlterMainTablePropose( TSchemeShard* ss, const TIndexBuildInfo& buildInfo) { @@ -412,7 +546,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil PathIdFromPathId(buildInfo.TablePathId, ev->Record.MutablePathId()); - ev->Record.SetK(buildInfo.KMeansSettings.K); + ev->Record.SetK(buildInfo.KMeans.K); ev->Record.SetMaxProbability(buildInfo.Sample.MaxProbability); ev->Record.AddColumns(buildInfo.IndexColumns[0]); @@ -544,7 +678,12 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil } else if (!buildInfo.InitiateTxDone) { Send(Self->SelfId(), MakeHolder(ui64(buildInfo.InitiateTxId))); } else { - ChangeState(BuildId, TIndexBuildInfo::EState::Filling); + if (buildInfo.IsBuildVectorIndex()) { + Y_ASSERT(buildInfo.KMeans.NeedsAnotherLevel()); + ChangeState(BuildId, TIndexBuildInfo::EState::DropTmp); + } else { + ChangeState(BuildId, TIndexBuildInfo::EState::Filling); + } Progress(BuildId); } @@ -609,7 +748,7 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil } if (buildInfo.IsBuildVectorIndex()) { - buildInfo.Sample.MakeStrictTop(buildInfo.KMeansSettings.K); + buildInfo.Sample.MakeStrictTop(buildInfo.KMeans.K); if (buildInfo.Sample.MaxProbability == 0) { makeDone(buildInfo.Shards.size()); } @@ -634,12 +773,18 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil && buildInfo.DoneShardsSize == buildInfo.Shards.size()) { // all done Y_ABORT_UNLESS(0 == Self->IndexBuildPipes.CloseAll(BuildId, ctx)); - - if (buildInfo.IsBuildVectorIndex() && SendUploadSampleKRequest(buildInfo)) { - AskToScheduleBilling(buildInfo); - break; + if (buildInfo.IsBuildVectorIndex()) { + if (SendUploadSampleKRequest(buildInfo)) { + AskToScheduleBilling(buildInfo); + break; + } + if (buildInfo.KMeans.NeedsAnotherLevel()) { + AskToScheduleBilling(buildInfo); + ChangeState(BuildId, TIndexBuildInfo::EState::DropTmp); + Progress(BuildId); + break; + } } - ChangeState(BuildId, TIndexBuildInfo::EState::Applying); Progress(BuildId); @@ -651,6 +796,52 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil break; } + case TIndexBuildInfo::EState::DropTmp: + Y_ASSERT(buildInfo.IsBuildVectorIndex()); + if (buildInfo.ApplyTxId == InvalidTxId) { + Send(Self->TxAllocatorClient, MakeHolder(), 0, ui64(BuildId)); + } else if (buildInfo.ApplyTxStatus == NKikimrScheme::StatusSuccess) { + Send(Self->SelfId(), DropTmpPropose(Self, buildInfo), 0, ui64(BuildId)); + } else if (!buildInfo.ApplyTxDone) { + Send(Self->SelfId(), MakeHolder(ui64(buildInfo.ApplyTxId))); + } else { + buildInfo.ApplyTxId = {}; + buildInfo.ApplyTxStatus = NKikimrScheme::StatusSuccess; + buildInfo.ApplyTxDone = false; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexApplyTxId(db, buildInfo); + Self->PersistBuildIndexApplyTxStatus(db, buildInfo); + Self->PersistBuildIndexApplyTxDone(db, buildInfo); + + ChangeState(BuildId, TIndexBuildInfo::EState::CreateTmp); + Progress(BuildId); + } + break; + case TIndexBuildInfo::EState::CreateTmp: + Y_ASSERT(buildInfo.IsBuildVectorIndex()); + if (buildInfo.ApplyTxId == InvalidTxId) { + Send(Self->TxAllocatorClient, MakeHolder(), 0, ui64(BuildId)); + } else if (buildInfo.ApplyTxStatus == NKikimrScheme::StatusSuccess) { + Send(Self->SelfId(), CreateTmpPropose(Self, buildInfo), 0, ui64(BuildId)); + } else if (!buildInfo.ApplyTxDone) { + Send(Self->SelfId(), MakeHolder(ui64(buildInfo.ApplyTxId))); + } else { + buildInfo.ApplyTxId = {}; + buildInfo.ApplyTxStatus = NKikimrScheme::StatusSuccess; + buildInfo.ApplyTxDone = false; + ++buildInfo.KMeans.Level; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistBuildIndexApplyTxId(db, buildInfo); + Self->PersistBuildIndexApplyTxStatus(db, buildInfo); + Self->PersistBuildIndexApplyTxDone(db, buildInfo); + // TODO(mbkkt) persist buildInfo.KMeans.Level + + ChangeState(BuildId, TIndexBuildInfo::EState::Filling); + Progress(BuildId); + } + break; case TIndexBuildInfo::EState::Applying: if (buildInfo.ApplyTxId == InvalidTxId) { Send(Self->TxAllocatorClient, MakeHolder(), 0, ui64(BuildId)); @@ -875,6 +1066,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyRetry: public TSchemeShard::TIndexBu case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: case TIndexBuildInfo::EState::Initiating: + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Unlocking: case TIndexBuildInfo::EState::Done: @@ -962,7 +1155,7 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex } buildInfo.Sample.Rows.emplace_back(probabilities[i], std::move(rows[i])); } - buildInfo.Sample.MakeWeakTop(buildInfo.KMeansSettings.K); + buildInfo.Sample.MakeWeakTop(buildInfo.KMeans.K); } if (record.HasRowsDelta() || record.HasBytesDelta()) { @@ -1025,6 +1218,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplySampleK: public TSchemeShard::TIndex case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: case TIndexBuildInfo::EState::Initiating: + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Unlocking: case TIndexBuildInfo::EState::Done: @@ -1085,11 +1280,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyUpload: public TSchemeShard::TIndexB NIceDb::TNiceDb db(txc.DB); auto status = record.GetUploadStatus(); if (status == Ydb::StatusIds::SUCCESS) { - // TODO(mbkkt) next rounds of build vector index - ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Applying); + ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Filling); Progress(buildId); - // make final bill - Bill(buildInfo); } else { NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); @@ -1105,6 +1297,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyUpload: public TSchemeShard::TIndexB case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: case TIndexBuildInfo::EState::Initiating: + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Unlocking: case TIndexBuildInfo::EState::Done: @@ -1280,6 +1474,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyProgress: public TSchemeShard::TInde case TIndexBuildInfo::EState::Locking: case TIndexBuildInfo::EState::GatheringStatistics: case TIndexBuildInfo::EState::Initiating: + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Unlocking: case TIndexBuildInfo::EState::Done: @@ -1357,6 +1553,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyCompleted: public TSchemeShard::TInd Self->PersistBuildIndexInitiateTxDone(db, buildInfo); break; } + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Cancellation_Applying: case TIndexBuildInfo::EState::Rejection_Applying: @@ -1522,6 +1720,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyModify: public TSchemeShard::TIndexB ifErrorMoveTo(TIndexBuildInfo::EState::Rejection_Unlocking); break; } + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Rejection_Applying: { @@ -1634,6 +1834,8 @@ struct TSchemeShard::TIndexBuilder::TTxReplyAllocate: public TSchemeShard::TInde Self->PersistBuildIndexInitiateTxId(db, buildInfo); } break; + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: case TIndexBuildInfo::EState::Applying: case TIndexBuildInfo::EState::Cancellation_Applying: case TIndexBuildInfo::EState::Rejection_Applying: diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp index 591074d53bd6..f6f9749584a2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp @@ -209,6 +209,8 @@ void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild index.SetProgress(0.0); break; case TIndexBuildInfo::EState::Filling: + case TIndexBuildInfo::EState::DropTmp: + case TIndexBuildInfo::EState::CreateTmp: index.SetState(Ydb::Table::IndexBuildState::STATE_TRANSFERING_DATA); index.SetProgress(indexInfo.CalcProgressPercent()); break; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 1903fe2a5e94..00bc38958dfe 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -305,7 +305,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( } if (!IsValidColumnName(colName, allowSystemColumns)) { - errStr = Sprintf("Invalid name for column '%s'", colName.data()); + errStr = Sprintf("Invalid name for %s column '%s'", allowSystemColumns ? "any" : "user", colName.data()); return nullptr; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 0eb7b65196b4..0ddcdcbd2caa 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2886,7 +2886,8 @@ struct TIndexBuildInfo: public TSimpleRefCount { GatheringStatistics = 20, Initiating = 30, Filling = 40, - // TODO(mbkkt) TruncateTmpTables = 45, + DropTmp = 45, + CreateTmp = 46, Applying = 50, Unlocking = 60, Done = 200, @@ -2962,10 +2963,23 @@ struct TIndexBuildInfo: public TSimpleRefCount { std::variant SpecializedIndexDescription; - struct TKMeansTreeSettings { - ui64 K = 128; + struct TKMeans { + // settings + ui32 K = 128; + ui32 Levels = 5; + + // pools + ui32 Ids = 0; + + // progress + ui32 Level = 0; + ui32 Parent = 0; + + bool NeedsAnotherLevel() const { + return Level < Levels; + } }; - TKMeansTreeSettings KMeansSettings; + TKMeans KMeans; EState State = EState::Invalid; TString Issue; @@ -3322,7 +3336,13 @@ struct TIndexBuildInfo: public TSimpleRefCount { } float CalcProgressPercent() const { - // TODO(mbkkt) different calculation for vector index + if (IsBuildVectorIndex()) { + // TODO(mbkkt) better calculation for vector index + if (KMeans.Level == 0) { + return 0.0; + } + return KMeans.Levels * 100. / KMeans.Level; + } if (Shards) { float totalShards = Shards.size(); return 100.0 * DoneShardsSize / totalShards; From 3a8f3289c2f52e86ee93454fd84062498894fd8b Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Mon, 16 Sep 2024 16:28:06 +0000 Subject: [PATCH 2/5] Adjust test (more time to init) --- .../tx/schemeshard/ut_index_build/ut_vector_index_build.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index 32bc04644bb1..2091f3c69b3c 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -125,7 +125,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { auto descr = TestGetBuildIndex(runtime, tenantSchemeShard, "/MyRoot/ServerLessDB", txId); UNIT_ASSERT_VALUES_EQUAL(descr.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE); - const TString meteringData = R"({"usage":{"start":0,"quantity":292,"finish":0,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-328-5075-328-5075","cloud_id":"CLOUD_ID_VAL","source_wt":0,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; + const TString meteringData = R"({"usage":{"start":1,"quantity":292,"finish":1,"unit":"request_unit","type":"delta"},"tags":{},"id":"106-72075186233409549-2-328-5075-328-5075","cloud_id":"CLOUD_ID_VAL","source_wt":1,"source_id":"sless-docapi-ydb-ss","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.requests.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})""\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); From 2c1150f5136191d5214f29bb42d963b9d1202b2f Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Mon, 16 Sep 2024 16:46:25 +0000 Subject: [PATCH 3/5] Small adjustment --- .../schemeshard/schemeshard__operation_drop_indexed_table.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 7551e80efc42..9bdcb7eeb449 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -425,7 +425,7 @@ TVector CreateDropIndexedTable(TOperationId nextId, const T .IsTable() .NotUnderDeleting() .NotUnderOperation(); - if (!NTableIndex::IsTmpImplTable(table.LeafName())) { + if (!table.Parent()->IsTableIndex() || !NTableIndex::IsTmpImplTable(table.LeafName())) { checks.IsCommonSensePath(); } } From 62f8e9288a65f16a56b5df4b663e340c423a92d3 Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Tue, 17 Sep 2024 10:59:28 +0000 Subject: [PATCH 4/5] Fix test --- .../schemeshard_build_index__progress.cpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 68c03dbe4e6f..0277b09cbfd3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -372,12 +372,16 @@ THolder CreateTmpPropose( Y_ASSERT(step == 1); } - auto& partition = *op.MutablePartitionConfig()->MutablePartitioningPolicy(); - partition.SetSizeToSplit(0); // disable auto split/merge - partition.SetMinPartitionsCount(parts); - partition.SetMaxPartitionsCount(parts); - partition.ClearFastSplitSettings(); - partition.ClearSplitByLoadSettings(); + auto& config = *op.MutablePartitionConfig(); + config.Clear(); + config.SetShadowData(true); + + auto& policy = *config.MutablePartitioningPolicy(); + policy.SetSizeToSplit(0); // disable auto split/merge + policy.SetMinPartitionsCount(parts); + policy.SetMaxPartitionsCount(parts); + policy.ClearFastSplitSettings(); + policy.ClearSplitByLoadSettings(); op.ClearSplitBoundary(); if (parts <= 1) { From 70d3ccd75d66ce8afc8de66c8ee1a7d02ed3094e Mon Sep 17 00:00:00 2001 From: Valerii Mironov Date: Thu, 19 Sep 2024 09:32:37 +0000 Subject: [PATCH 5/5] WIP --- ydb/core/protos/flat_scheme_op.proto | 2 +- ydb/core/protos/flat_tx_scheme.proto | 2 +- ...hemeshard__operation_apply_build_index.cpp | 20 ++++++++++--- ...emeshard__operation_create_build_index.cpp | 1 + .../schemeshard_build_index__progress.cpp | 8 ----- .../tx/schemeshard/schemeshard_info_types.h | 29 +++++++++---------- .../tx/schemeshard/schemeshard_tx_infly.h | 8 +++++ ydb/core/tx/tx_proxy/schemereq.cpp | 4 ++- 8 files changed, 44 insertions(+), 30 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index c5e25cdd5717..3d695289335d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -33,7 +33,7 @@ message TMkDir { enum EDropWaitPolicy { EDropFailOnChanges = 0; - EDropAbortChanges = 1; //depricated + EDropAbortChanges = 1; //deprecated EDropWaitChanges = 2; } diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index 84026ae9f41d..575eab171f49 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -53,7 +53,7 @@ message TEvModifySchemeTransaction { optional uint64 TxId = 2; optional uint64 TabletId = 3; optional string Owner = 5; - optional bool FailOnExist = 6; // depricated, TModifyScheme.FailOnExist is recomended + optional bool FailOnExist = 6; // deprecated, TModifyScheme.FailOnExist is recomended optional string UserToken = 7 [(Ydb.sensitive) = true]; // serialized NACLib::TUserToken optional string PeerName = 8; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp index f78e4bc21e3f..5269cc30cdc0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_apply_build_index.cpp @@ -28,7 +28,7 @@ ISubOperation::TPtr FinalizeIndexImplTable(TOperationContext& context, const TPa return CreateFinalizeBuildIndexImplTable(partId, transaction); } -ISubOperation::TPtr DropIndexImplTable(TOperationContext& /*context*/, const TPath& index, const TOperationId& nextId, const TOperationId& partId, const TString& name, const TPathId& pathId) { +ISubOperation::TPtr DropIndexImplTable(const TPath& index, const TOperationId& nextId, const TOperationId& partId, const TString& name, const TPathId& pathId, bool& rejected) { TPath implTable = index.Child(name); Y_ABORT_UNLESS(implTable->PathId == pathId); Y_ABORT_UNLESS(implTable.LeafName() == name); @@ -41,8 +41,10 @@ ISubOperation::TPtr DropIndexImplTable(TOperationContext& /*context*/, const TPa .NotUnderDeleting() .NotUnderOperation(); if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; + rejected = true; + return CreateReject(nextId, checks.GetStatus(), checks.GetError()); } + rejected = false; auto transaction = TransactionTemplate(index.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); auto operation = transaction.MutableDrop(); operation->SetName(name); @@ -93,7 +95,12 @@ TVector ApplyBuildIndex(TOperationId nextId, const TTxTrans const auto& indexImplTableName = indexChildItems.first; const auto partId = NextPartId(nextId, result); if (NTableIndex::IsTmpImplTable(indexImplTableName)) { - result.push_back(DropIndexImplTable(context, index, nextId, partId, indexImplTableName, indexChildItems.second)); + bool rejected = false; + auto op = DropIndexImplTable(index, nextId, partId, indexImplTableName, indexChildItems.second, rejected); + if (rejected) { + return {std::move(op)}; + } + result.push_back(std::move(op)); } else { result.push_back(FinalizeIndexImplTable(context, index, partId, indexImplTableName, indexChildItems.second)); } @@ -143,7 +150,12 @@ TVector CancelBuildIndex(TOperationId nextId, const TTxTran Y_ABORT_UNLESS(index.Base()->GetChildren().size() >= 1); for (auto& indexChildItems : index.Base()->GetChildren()) { const auto partId = NextPartId(nextId, result); - result.push_back(DropIndexImplTable(context, index, nextId, partId, indexChildItems.first, indexChildItems.second)); + bool rejected = false; + auto op = DropIndexImplTable(index, nextId, partId, indexChildItems.first, indexChildItems.second, rejected); + if (rejected) { + return {std::move(op)}; + } + result.push_back(std::move(op)); } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp index b346d48d9bc7..19bacf412ed9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_build_index.cpp @@ -59,6 +59,7 @@ TVector CreateBuildIndex(TOperationId opId, const TTxTransa .NotResolved(); } + // TODO(mbkkt) less than necessary for vector index checks .IsValidLeafName() .PathsLimit(2) // index and impl-table diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 0277b09cbfd3..8e64e4b2fb32 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -232,14 +232,6 @@ THolder LockPropose( modifyScheme.SetWorkingDir(path.Parent().PathString()); modifyScheme.MutableLockConfig()->SetName(path.LeafName()); - if (buildInfo.IsBuildIndex()) { - buildInfo.SerializeToProto(ss, modifyScheme.MutableInitiateIndexBuild()); - } else if (buildInfo.IsBuildColumns()) { - buildInfo.SerializeToProto(ss, modifyScheme.MutableInitiateColumnBuild()); - } else { - Y_ABORT("Unknown operation kind while building LockPropose"); - } - return propose; } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 0ddcdcbd2caa..811044388103 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2988,30 +2988,29 @@ struct TIndexBuildInfo: public TSimpleRefCount { bool CancelRequested = false; - TTxId AlterMainTableTxId = TTxId(); - NKikimrScheme::EStatus AlterMainTableTxStatus = NKikimrScheme::StatusSuccess; bool AlterMainTableTxDone = false; - - TTxId LockTxId = TTxId(); - NKikimrScheme::EStatus LockTxStatus = NKikimrScheme::StatusSuccess; bool LockTxDone = false; - - TTxId InitiateTxId = TTxId(); - NKikimrScheme::EStatus InitiateTxStatus = NKikimrScheme::StatusSuccess; bool InitiateTxDone = false; + bool ApplyTxDone = false; + bool UnlockTxDone = false; - TStepId SnapshotStep; - TTxId SnapshotTxId; + bool BillingEventIsScheduled = false; + TTxId AlterMainTableTxId = TTxId(); + TTxId LockTxId = TTxId(); + TTxId InitiateTxId = TTxId(); TTxId ApplyTxId = TTxId(); - NKikimrScheme::EStatus ApplyTxStatus = NKikimrScheme::StatusSuccess; - bool ApplyTxDone = false; - TTxId UnlockTxId = TTxId(); + + NKikimrScheme::EStatus AlterMainTableTxStatus = NKikimrScheme::StatusSuccess; + NKikimrScheme::EStatus LockTxStatus = NKikimrScheme::StatusSuccess; + NKikimrScheme::EStatus InitiateTxStatus = NKikimrScheme::StatusSuccess; + NKikimrScheme::EStatus ApplyTxStatus = NKikimrScheme::StatusSuccess; NKikimrScheme::EStatus UnlockTxStatus = NKikimrScheme::StatusSuccess; - bool UnlockTxDone = false; - bool BillingEventIsScheduled = false; + TStepId SnapshotStep; + TTxId SnapshotTxId; + TDuration ReBillPeriod = TDuration::Seconds(10); struct TShardStatus { diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index aab23b08df34..1d78a1493abc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -739,6 +739,14 @@ struct TTxState { case NKikimrSchemeOp::ESchemeOpCreateResourcePool: return TxCreateResourcePool; case NKikimrSchemeOp::ESchemeOpAlterResourcePool: return TxAlterResourcePool; case NKikimrSchemeOp::ESchemeOpDropResourcePool: return TxDropResourcePool; + case NKikimrSchemeOp::ESchemeOpAlterExtSubDomainCreateHive: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpDropExternalTable: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpDropExternalDataSource: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpCreateColumnBuild: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpCreateContinuousBackup: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpAlterContinuousBackup: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpDropContinuousBackup: return TxInvalid; + case NKikimrSchemeOp::ESchemeOpRestoreIncrementalBackup: return TxInvalid; default: return TxInvalid; } } diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index 99fa357a3fb2..851bbd97ac1f 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -213,8 +213,10 @@ struct TBaseSchemeReq: public TActorBootstrapped { return *modifyScheme.MutableUpgradeSubDomain()->MutableName(); case NKikimrSchemeOp::ESchemeOpCreateColumnBuild: + Y_ABORT("no implementation for ESchemeOpCreateColumnBuild"); + case NKikimrSchemeOp::ESchemeOpCreateIndexBuild: - Y_ABORT("no implementation for ESchemeOpCreateIndexBuild/ESchemeOpCreateColumnBuild"); + Y_ABORT("no implementation for ESchemeOpCreateIndexBuild"); case NKikimrSchemeOp::ESchemeOpInitiateBuildIndexMainTable: Y_ABORT("no implementation for ESchemeOpInitiateBuildIndexMainTable");