diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp index 562092d7f3c5..64a18a0d87b0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp @@ -1608,7 +1608,14 @@ struct TTxShardReply: public TSchemeShard::TIndexBuilder::TTxReply { } if (buildInfo.State != TIndexBuildInfo::EState::Filling) { - LOG_I("TTxReply : " << TypeName() << " superfluous event, id# " << BuildId); + LOG_N("TTxReply : " << TypeName() << " superfluous state event, id# " << BuildId + << ", TIndexBuildInfo: " << buildInfo); + return true; + } + + if (!buildInfo.InProgressShards.contains(shardIdx)) { + LOG_N("TTxReply : " << TypeName() << " superfluous shard event, id# " << BuildId + << ", TIndexBuildInfo: " << buildInfo); return true; } @@ -1637,41 +1644,50 @@ struct TTxShardReply: public TSchemeShard::TIndexBuilder::TTxReply { shardStatus.Status = record.GetStatus(); switch (shardStatus.Status) { - case NKikimrIndexBuilder::EBuildStatus::INVALID: + case NKikimrIndexBuilder::EBuildStatus::INVALID: Y_ENSURE(false, "Unreachable"); - case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: // TODO: do we need ACCEPTED? - case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: + case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: // TODO: do we need ACCEPTED? + case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: { HandleProgress(shardStatus, buildInfo); Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + // no progress + // no pipe close return true; - case NKikimrIndexBuilder::EBuildStatus::DONE: - if (buildInfo.InProgressShards.erase(shardIdx)) { - HandleDone(db, buildInfo); - buildInfo.DoneShards.emplace_back(shardIdx); - } - break; - case NKikimrIndexBuilder::EBuildStatus::ABORTED: + } + case NKikimrIndexBuilder::EBuildStatus::DONE: { + bool erased = buildInfo.InProgressShards.erase(shardIdx); + Y_ENSURE(erased); + buildInfo.DoneShards.emplace_back(shardIdx); + HandleDone(db, buildInfo); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); + return true; + } + case NKikimrIndexBuilder::EBuildStatus::ABORTED: { // datashard gracefully rebooted, reschedule shard - if (buildInfo.InProgressShards.erase(shardIdx)) { - buildInfo.ToUploadShards.emplace_front(shardIdx); - } - break; - case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: - case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: + bool erased = buildInfo.InProgressShards.erase(shardIdx); + Y_ENSURE(erased); + buildInfo.ToUploadShards.emplace_front(shardIdx); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); + Progress(BuildId); + return true; + } + case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR: + case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: { Self->PersistBuildIndexAddIssue(db, buildInfo, TStringBuilder() << "One of the shards report " << shardStatus.Status << " " << shardStatus.DebugMessage << " at Filling stage, process has to be canceled" << ", shardId: " << shardId << ", shardIdx: " << shardIdx); + Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); + Self->IndexBuildPipes.Close(BuildId, shardId, ctx); ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying); Progress(BuildId); return true; } - Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus); - Self->IndexBuildPipes.Close(BuildId, shardId, ctx); - Progress(BuildId); - - return true; + } } virtual void HandleProgress(TIndexBuildInfo::TShardStatus& shardStatus, TIndexBuildInfo& buildInfo) { diff --git a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp index 314c2099370c..91ff9fc649aa 100644 --- a/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp +++ b/ydb/core/tx/schemeshard/ut_index_build/ut_vector_index_build.cpp @@ -651,7 +651,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { TTestEnv env(runtime); ui64 txId = 100; - // runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE); TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -700,24 +700,28 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) { { auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + Cout << "BuildIndex 1 " << buildIndexOperation.DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL_C( buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED, buildIndexOperation.DebugString() ); UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "One of the shards report BUILD_ERROR"); UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Error: Datashard test fail"); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Processed: { upload rows: 0, upload bytes: 0, read rows: 0, read bytes: 0 } }"); } RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); { auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx); + Cout << "BuildIndex 2 " << buildIndexOperation.DebugString() << Endl; UNIT_ASSERT_VALUES_EQUAL_C( buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED, buildIndexOperation.DebugString() ); UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "One of the shards report BUILD_ERROR"); UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Error: Datashard test fail"); + UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Processed: { upload rows: 0, upload bytes: 0, read rows: 0, read bytes: 0 } }"); } } }