Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,27 @@ bool IsWaiting(const TItem& item) {
return item.State == EState::Waiting;
}

THashSet<EState> CollectItemStates(const TVector<TItem>& items) {
THashSet<EState> itemStates;
THashMap<EState, int> CountItemsByState(const TVector<TItem>& items) {
THashMap<EState, int> counter;
for (const auto& item : items) {
itemStates.emplace(item.State);
counter[item.State]++;
}
return itemStates;
return counter;
}

bool AllDone(const THashSet<EState>& itemStates) {
return AllOf(itemStates, [](EState state) { return state == EState::Done; });
bool AllDone(const THashMap<EState, int>& stateCounts) {
return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Done; });
}

bool AllWaiting(const THashSet<EState>& itemStates) {
return AllOf(itemStates, [](EState state) { return state == EState::Waiting; });
bool AllWaiting(const THashMap<EState, int>& stateCounts) {
return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Waiting; });
}

bool AllDoneOrWaiting(const THashSet<EState>& itemStates) {
return AllOf(itemStates, [](EState state) { return state == EState::Done || state == EState::Waiting; });
bool AllDoneOrWaiting(const THashMap<EState, int>& stateCounts) {
return AllOf(stateCounts, [](const auto& stateCount) {
return stateCount.first == EState::Done
|| stateCount.first == EState::Waiting;
});
}

// the item is to be created by query, i.e. it is not a table
Expand Down Expand Up @@ -431,20 +434,20 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
TVector<ui32> retriedItems;
for (ui32 itemIdx : xrange(importInfo->Items.size())) {
auto& item = importInfo->Items[itemIdx];
if (IsWaiting(item) && IsCreatedByQuery(item) && item.ViewCreationRetries == 0) {
if (IsWaiting(item) && IsCreatedByQuery(item)) {
item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor(
Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database
));
Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor);

item.State = EState::CreateSchemeObject;
item.ViewCreationRetries++;
Self->PersistImportItemState(db, importInfo, itemIdx);

retriedItems.emplace_back(itemIdx);
}
}
if (!retriedItems.empty()) {
importInfo->WaitingViews = std::ssize(retriedItems);
LOG_D("TImport::TTxProgress: retry view creation"
<< ": id# " << importInfo->Id
<< ", retried items# " << JoinSeq(", ", retriedItems)
Expand Down Expand Up @@ -995,17 +998,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
auto& item = importInfo->Items[message.ItemIdx];
Self->RunningImportSchemeQueryExecutors.erase(std::exchange(item.SchemeQueryExecutor, {}));

if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) {
if (message.Status == Ydb::StatusIds::SCHEME_ERROR) {
// Scheme error happens when the view depends on a table (or a view) that is not yet imported.
// Instead of tracking view dependencies, we simply retry the creation of the view later.
item.State = EState::Waiting;
Self->PersistImportItemState(db, importInfo, message.ItemIdx);

const auto itemStates = CollectItemStates(importInfo->Items);
if (AllWaiting(itemStates)) {
const auto stateCounts = CountItemsByState(importInfo->Items);
if (AllWaiting(stateCounts)) {
// Cancel the import, or we will end up waiting indefinitely.
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
} else if (AllDoneOrWaiting(itemStates)) {
} else if (AllDoneOrWaiting(stateCounts)) {
if (stateCounts.at(EState::Waiting) == importInfo->WaitingViews) {
// No progress has been made since the last view creation retry.
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
}
RetryViewsCreation(importInfo, db, ctx);
}
return;
Expand Down Expand Up @@ -1080,7 +1087,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
BuildIndex(importInfo, i, txId);
itemIdx = i;
break;

case EState::CreateChangefeed:
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
TString error;
Expand Down Expand Up @@ -1159,7 +1166,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else {
txId = GetActiveCreateConsumerTxId(importInfo, itemIdx);
}

}
}

Expand Down Expand Up @@ -1352,7 +1359,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}
break;

case EState::CreateChangefeed:
if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
Expand All @@ -1369,11 +1376,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
return SendNotificationsIfFinished(importInfo);
}

const auto itemStates = CollectItemStates(importInfo->Items);
if (AllDone(itemStates)) {
const auto stateCounts = CountItemsByState(importInfo->Items);
if (AllDone(stateCounts)) {
importInfo->State = EState::Done;
importInfo->EndTime = TAppData::TimeProvider->Now();
} else if (AllDoneOrWaiting(itemStates)) {
} else if (AllDoneOrWaiting(stateCounts)) {
RetryViewsCreation(importInfo, db, ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,6 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
int NextIndexIdx = 0;
int NextChangefeedIdx = 0;
TString Issue;
int ViewCreationRetries = 0;
TPathId StreamImplPathId;

TItem() = default;
Expand Down Expand Up @@ -2901,6 +2900,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
EState State = EState::Invalid;
TString Issue;
TVector<TItem> Items;
int WaitingViews = 0;

TSet<TActorId> Subscribers;

Expand Down
Loading
Loading