From ac22d965867b075c26db463562d9229664a17da6 Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Fri, 14 Mar 2025 10:23:09 +0000 Subject: [PATCH] views: retry failed view creation while reasonable --- .../schemeshard_import__create.cpp | 51 +++--- .../tx/schemeshard/schemeshard_info_types.h | 2 +- .../tx/schemeshard/ut_restore/ut_restore.cpp | 160 +++++++++++++++--- 3 files changed, 164 insertions(+), 49 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 8dd8523e8343..98764f8f9bef 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -35,24 +35,27 @@ bool IsWaiting(const TItem& item) { return item.State == EState::Waiting; } -THashSet CollectItemStates(const TVector& items) { - THashSet itemStates; +THashMap CountItemsByState(const TVector& items) { + THashMap counter; for (const auto& item : items) { - itemStates.emplace(item.State); + counter[item.State]++; } - return itemStates; + return counter; } -bool AllDone(const THashSet& itemStates) { - return AllOf(itemStates, [](EState state) { return state == EState::Done; }); +bool AllDone(const THashMap& stateCounts) { + return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Done; }); } -bool AllWaiting(const THashSet& itemStates) { - return AllOf(itemStates, [](EState state) { return state == EState::Waiting; }); +bool AllWaiting(const THashMap& stateCounts) { + return AllOf(stateCounts, [](const auto& stateCount) { return stateCount.first == EState::Waiting; }); } -bool AllDoneOrWaiting(const THashSet& itemStates) { - return AllOf(itemStates, [](EState state) { return state == EState::Done || state == EState::Waiting; }); +bool AllDoneOrWaiting(const THashMap& stateCounts) { + return AllOf(stateCounts, [](const auto& stateCount) { + return stateCount.first == EState::Done + || stateCount.first == EState::Waiting; + }); } // the item is to be created by query, i.e. it is not a table @@ -431,20 +434,20 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase TVector retriedItems; for (ui32 itemIdx : xrange(importInfo->Items.size())) { auto& item = importInfo->Items[itemIdx]; - if (IsWaiting(item) && IsCreatedByQuery(item) && item.ViewCreationRetries == 0) { + if (IsWaiting(item) && IsCreatedByQuery(item)) { item.SchemeQueryExecutor = ctx.Register(CreateSchemeQueryExecutor( Self->SelfId(), importInfo->Id, itemIdx, item.CreationQuery, database )); Self->RunningImportSchemeQueryExecutors.emplace(item.SchemeQueryExecutor); item.State = EState::CreateSchemeObject; - item.ViewCreationRetries++; Self->PersistImportItemState(db, importInfo, itemIdx); retriedItems.emplace_back(itemIdx); } } if (!retriedItems.empty()) { + importInfo->WaitingViews = std::ssize(retriedItems); LOG_D("TImport::TTxProgress: retry view creation" << ": id# " << importInfo->Id << ", retried items# " << JoinSeq(", ", retriedItems) @@ -995,17 +998,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase auto& item = importInfo->Items[message.ItemIdx]; Self->RunningImportSchemeQueryExecutors.erase(std::exchange(item.SchemeQueryExecutor, {})); - if (message.Status == Ydb::StatusIds::SCHEME_ERROR && item.ViewCreationRetries == 0) { + if (message.Status == Ydb::StatusIds::SCHEME_ERROR) { // Scheme error happens when the view depends on a table (or a view) that is not yet imported. // Instead of tracking view dependencies, we simply retry the creation of the view later. item.State = EState::Waiting; Self->PersistImportItemState(db, importInfo, message.ItemIdx); - const auto itemStates = CollectItemStates(importInfo->Items); - if (AllWaiting(itemStates)) { + const auto stateCounts = CountItemsByState(importInfo->Items); + if (AllWaiting(stateCounts)) { // Cancel the import, or we will end up waiting indefinitely. return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); - } else if (AllDoneOrWaiting(itemStates)) { + } else if (AllDoneOrWaiting(stateCounts)) { + if (stateCounts.at(EState::Waiting) == importInfo->WaitingViews) { + // No progress has been made since the last view creation retry. + return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); + } RetryViewsCreation(importInfo, db, ctx); } return; @@ -1080,7 +1087,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase BuildIndex(importInfo, i, txId); itemIdx = i; break; - + case EState::CreateChangefeed: if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { TString error; @@ -1159,7 +1166,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } else { txId = GetActiveCreateConsumerTxId(importInfo, itemIdx); } - + } } @@ -1352,7 +1359,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } break; - + case EState::CreateChangefeed: if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) { item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers; @@ -1369,11 +1376,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase return SendNotificationsIfFinished(importInfo); } - const auto itemStates = CollectItemStates(importInfo->Items); - if (AllDone(itemStates)) { + const auto stateCounts = CountItemsByState(importInfo->Items); + if (AllDone(stateCounts)) { importInfo->State = EState::Done; importInfo->EndTime = TAppData::TimeProvider->Now(); - } else if (AllDoneOrWaiting(itemStates)) { + } else if (AllDoneOrWaiting(stateCounts)) { RetryViewsCreation(importInfo, db, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 6d52d76f207f..a31ffd27c95a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2869,7 +2869,6 @@ struct TImportInfo: public TSimpleRefCount { int NextIndexIdx = 0; int NextChangefeedIdx = 0; TString Issue; - int ViewCreationRetries = 0; TPathId StreamImplPathId; TItem() = default; @@ -2901,6 +2900,7 @@ struct TImportInfo: public TSimpleRefCount { EState State = EState::Invalid; TString Issue; TVector Items; + int WaitingViews = 0; TSet Subscribers; diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index b6c32293aec5..676d59c9d79b 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -1919,20 +1919,20 @@ value { Y_UNIT_TEST_WITH_COMPRESSION(ExportImportWithChecksums) { TPortManager portManager; const ui16 port = portManager.GetPort(); - + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); - + TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); - + ui64 txId = 100; - + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); - + // Create table TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Original" @@ -1941,10 +1941,10 @@ value { KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); - + // Upload data UpdateRow(runtime, "Original", 1, "valueA", TTestTxConfig::FakeHiveTablets); - + // Export table const char* compression = Codec == ECompressionCodec::Zstd ? "zstd" : ""; TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( @@ -1960,12 +1960,12 @@ value { )", port, compression)); const ui64 exportId = txId; env.TestWaitNotification(runtime, exportId); - + // Check export TestGetExport(runtime, exportId, "/MyRoot"); - + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 6); - + // Restore table TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( ImportFromS3Settings { @@ -1979,10 +1979,10 @@ value { )", port)); const ui64 importId = txId; env.TestWaitNotification(runtime, importId); - + // Check import TestGetImport(runtime, importId, "/MyRoot"); - + // Check data in restored table { auto expectedJson = TStringBuilder() << "[[[[" @@ -2000,20 +2000,20 @@ value { void ExportImportWithCorruption(T corruption) { TPortManager portManager; const ui16 port = portManager.GetPort(); - + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); - + TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true).EnablePermissionsExport(true)); - + ui64 txId = 100; - + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); - + // Create table TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Original" @@ -2022,10 +2022,10 @@ value { KeyColumnNames: ["key"] )"); env.TestWaitNotification(runtime, txId); - + // Upload data UpdateRow(runtime, "Original", 1, "valueA", TTestTxConfig::FakeHiveTablets); - + // Export table const char* compression = Codec == ECompressionCodec::Zstd ? "zstd" : ""; TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( @@ -2041,12 +2041,12 @@ value { )", port, compression)); const ui64 exportId = txId; env.TestWaitNotification(runtime, exportId); - + // Check export TestGetExport(runtime, exportId, "/MyRoot"); - + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); - + // Make corruption corruption(s3Mock.GetData()); @@ -2063,7 +2063,7 @@ value { )", port)); ui64 importId = txId; env.TestWaitNotification(runtime, importId); - + // Check corrupted import TestGetImport(runtime, importId, "/MyRoot", Ydb::StatusIds::CANCELLED); @@ -4949,8 +4949,8 @@ Y_UNIT_TEST_SUITE(TImportTests) { TBlockEvents queryResultBlocker(runtime, [&](const TEvPrivate::TEvImportSchemeQueryResult::TPtr& event) { - // When we receive the scheme query result message, we can be sure that the SchemeShard actor ID is set, - // because the table is the first item on the import list. + // The test expects the SchemeShard actor ID to be already initialized when we receive the first query result message. + // This expectation is valid because we import items in order of their appearance on the import items list. if (!schemeshardActorId || event->Recipient != schemeshardActorId) { return false; } @@ -4993,6 +4993,114 @@ Y_UNIT_TEST_SUITE(TImportTests) { }); } + Y_UNIT_TEST(MultipleViewCreationRetries) { + TTestBasicRuntime runtime; + auto options = TTestEnvOptions() + .RunFakeConfigDispatcher(true) + .SetupKqpProxy(true); + TTestEnv env(runtime, options); + runtime.GetAppData().FeatureFlags.SetEnableViews(true); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + ui64 txId = 100; + + constexpr int ViewLayers = 10; + THashMap bucketContent(ViewLayers); + for (int i = 0; i < ViewLayers; ++i) { + bucketContent.emplace(std::format("/view{}", i), GenerateTestData( + { + EPathTypeView, + std::format(R"( + -- backup root: "/MyRoot" + CREATE VIEW IF NOT EXISTS `view` WITH security_invoker = TRUE AS {}; + )", i == 0 + ? "SELECT 1" + : std::format("SELECT * FROM `view{}`", i - 1) + ) + } + )); + } + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(bucketContent), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TActorId schemeshardActorId; + TBlockEvents viewCreationBlocker(runtime, + [&](const TEvSchemeShard::TEvModifySchemeTransaction::TPtr& event) { + const auto& record = event->Get()->Record; + if (record.GetTransaction(0).GetOperationType() == ESchemeOpCreateView) { + schemeshardActorId = event->Recipient; + return true; + } + return false; + } + ); + + int missingDependencyFails = 0; + auto missingDependencyObserver = runtime.AddObserver( + [&](const TEvPrivate::TEvImportSchemeQueryResult::TPtr& event) { + if (!schemeshardActorId + || event->Recipient != schemeshardActorId + || event->Get()->Status != Ydb::StatusIds::SCHEME_ERROR) { + return; + } + const auto* error = std::get_if(&event->Get()->Result); + if (error && error->Contains("Cannot find table")) { + ++missingDependencyFails; + } + } + ); + + auto importSettings = TStringBuilder() << std::format(R"( + ImportFromS3Settings {{ + endpoint: "localhost:{}" + scheme: HTTP + )", port + ); + for (int i = 0; i < ViewLayers; ++i) { + importSettings << std::format(R"( + items {{ + source_prefix: "view{}" + destination_path: "/MyRoot/view{}" + }} + )", i, i + ); + } + importSettings << '}'; + + const ui64 importId = ++txId; + TestImport(runtime, importId, "/MyRoot", importSettings); + + int expectedFails = 0; + for (int iteration = 1; iteration <= ViewLayers; ++iteration) { + runtime.WaitFor("blocked view creation", [&]{ return !viewCreationBlocker.empty(); }); + + expectedFails += ViewLayers - iteration; + if (iteration > 1) { + runtime.WaitFor("query results", [&]{ return missingDependencyFails >= expectedFails; }); + } else { + // the first iteration might miss some query results due to the initially unset schemeshardActorId + missingDependencyFails = expectedFails; + } + + viewCreationBlocker.Unblock(1); + } + UNIT_ASSERT(viewCreationBlocker.empty()); + viewCreationBlocker.Stop(); + + env.TestWaitNotification(runtime, importId); + TestGetImport(runtime, importId, "/MyRoot"); + + for (int i = 0; i < ViewLayers; ++i) { + TestDescribeResult(DescribePath(runtime, std::format("/MyRoot/view{}", i)), { + NLs::Finished, + NLs::IsView + }); + } + } + struct TGeneratedChangefeed { std::pair Changefeed; std::function Checker; @@ -5483,7 +5591,7 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { } primary_key: "key" )" , {{"a", 1}}, "", R"({"version": 1})"); - + TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start());