Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,14 @@ struct TTxShardReply: public TSchemeShard::TIndexBuilder::TTxReply {
}

if (buildInfo.State != TIndexBuildInfo::EState::Filling) {
LOG_I("TTxReply : " << TypeName<TEvResponse>() << " superfluous event, id# " << BuildId);
LOG_N("TTxReply : " << TypeName<TEvResponse>() << " superfluous state event, id# " << BuildId
<< ", TIndexBuildInfo: " << buildInfo);
return true;
}

if (!buildInfo.InProgressShards.contains(shardIdx)) {
LOG_N("TTxReply : " << TypeName<TEvResponse>() << " superfluous shard event, id# " << BuildId
<< ", TIndexBuildInfo: " << buildInfo);
return true;
}

Expand Down Expand Up @@ -1637,41 +1644,50 @@ struct TTxShardReply: public TSchemeShard::TIndexBuilder::TTxReply {
shardStatus.Status = record.GetStatus();

switch (shardStatus.Status) {
case NKikimrIndexBuilder::EBuildStatus::INVALID:
case NKikimrIndexBuilder::EBuildStatus::INVALID:
Y_ENSURE(false, "Unreachable");
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: // TODO: do we need ACCEPTED?
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS:
case NKikimrIndexBuilder::EBuildStatus::ACCEPTED: // TODO: do we need ACCEPTED?
case NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS: {
HandleProgress(shardStatus, buildInfo);
Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
// no progress
// no pipe close
return true;
case NKikimrIndexBuilder::EBuildStatus::DONE:
if (buildInfo.InProgressShards.erase(shardIdx)) {
HandleDone(db, buildInfo);
buildInfo.DoneShards.emplace_back(shardIdx);
}
break;
case NKikimrIndexBuilder::EBuildStatus::ABORTED:
}
case NKikimrIndexBuilder::EBuildStatus::DONE: {
bool erased = buildInfo.InProgressShards.erase(shardIdx);
Y_ENSURE(erased);
buildInfo.DoneShards.emplace_back(shardIdx);
HandleDone(db, buildInfo);
Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
Progress(BuildId);
return true;
}
case NKikimrIndexBuilder::EBuildStatus::ABORTED: {
// datashard gracefully rebooted, reschedule shard
if (buildInfo.InProgressShards.erase(shardIdx)) {
buildInfo.ToUploadShards.emplace_front(shardIdx);
}
break;
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST:
bool erased = buildInfo.InProgressShards.erase(shardIdx);
Y_ENSURE(erased);
buildInfo.ToUploadShards.emplace_front(shardIdx);
Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
Progress(BuildId);
return true;
}
case NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR:
case NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST: {
Self->PersistBuildIndexAddIssue(db, buildInfo, TStringBuilder()
<< "One of the shards report " << shardStatus.Status << " " << shardStatus.DebugMessage
<< " at Filling stage, process has to be canceled"
<< ", shardId: " << shardId
<< ", shardIdx: " << shardIdx);
Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
return true;
}
Self->PersistBuildIndexUploadProgress(db, BuildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
Progress(BuildId);

return true;
}
}

virtual void HandleProgress(TIndexBuildInfo::TShardStatus& shardStatus, TIndexBuildInfo& buildInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {
TTestEnv env(runtime);
ui64 txId = 100;

// runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Expand Down Expand Up @@ -700,24 +700,28 @@ Y_UNIT_TEST_SUITE (VectorIndexBuildTest) {

{
auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
Cout << "BuildIndex 1 " << buildIndexOperation.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED,
buildIndexOperation.DebugString()
);
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "One of the shards report BUILD_ERROR");
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Error: Datashard test fail");
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Processed: { upload rows: 0, upload bytes: 0, read rows: 0, read bytes: 0 } }");
}

RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

{
auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
Cout << "BuildIndex 2 " << buildIndexOperation.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED,
buildIndexOperation.DebugString()
);
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "One of the shards report BUILD_ERROR");
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Error: Datashard test fail");
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Processed: { upload rows: 0, upload bytes: 0, read rows: 0, read bytes: 0 } }");
}
}
}
Loading