From 2782335d99eb96133cebff65ab83bdc18fed5344 Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 16 May 2024 21:29:43 +0800 Subject: [PATCH] This is an automated cherry-pick of #9048 Signed-off-by: ti-chi-bot --- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 220 ++++++++++++++---- dbms/src/Storages/DeltaMerge/StoragePool.h | 19 +- .../V3/tests/gtest_page_storage_mix_mode.cpp | 84 +++++-- 3 files changed, 267 insertions(+), 56 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 6f35449ae13..338d1dc26af 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -26,6 +26,8 @@ #include #include +#include + namespace CurrentMetrics { @@ -177,10 +179,18 @@ bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Se return done_anything; } +<<<<<<< HEAD StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPool & storage_path_pool_, const String & name) : logger(Logger::get(!name.empty() ? name : DB::toString(ns_id_))) , run_mode(global_ctx.getPageStorageRunMode()) , ns_id(ns_id_) +======= +StoragePool::StoragePool(Context & global_ctx, KeyspaceID keyspace_id_, NamespaceID table_id_, StoragePathPool & storage_path_pool_, const String & name) + : logger(Logger::get(!name.empty() ? name : DB::toString(table_id_))) + , run_mode(global_ctx.getPageStorageRunMode()) + , keyspace_id(keyspace_id_) + , table_id(table_id_) +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) , storage_path_pool(storage_path_pool_) , global_context(global_ctx) , storage_pool_metrics(CurrentMetrics::StoragePoolV3Only, 0) @@ -205,9 +215,15 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo extractConfig(global_context.getSettingsRef(), StorageType::Meta), global_context.getFileProvider(), global_context); +<<<<<<< HEAD log_storage_reader = std::make_shared(run_mode, ns_id, log_storage_v2, /*storage_v3_*/ nullptr, nullptr); data_storage_reader = std::make_shared(run_mode, ns_id, data_storage_v2, /*storage_v3_*/ nullptr, nullptr); meta_storage_reader = std::make_shared(run_mode, ns_id, meta_storage_v2, /*storage_v3_*/ nullptr, nullptr); +======= + log_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Log, table_id, log_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, nullptr); + data_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Data, table_id, data_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, nullptr); + meta_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, meta_storage_v2, /*storage_v3_*/ nullptr, /*uni_ps_*/ nullptr, nullptr); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) log_storage_writer = std::make_shared(run_mode, log_storage_v2, /*storage_v3_*/ nullptr); data_storage_writer = std::make_shared(run_mode, data_storage_v2, /*storage_v3_*/ nullptr); @@ -221,9 +237,15 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo data_storage_v3 = global_storage_pool->data_storage; meta_storage_v3 = global_storage_pool->meta_storage; +<<<<<<< HEAD log_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, log_storage_v3, nullptr); data_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, data_storage_v3, nullptr); meta_storage_reader = std::make_shared(run_mode, ns_id, /*storage_v2_*/ nullptr, meta_storage_v3, nullptr); +======= + log_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Log, table_id, /*storage_v2_*/ nullptr, log_storage_v3, /*uni_ps_*/ nullptr, nullptr); + data_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Data, table_id, /*storage_v2_*/ nullptr, data_storage_v3, /*uni_ps_*/ nullptr, nullptr); + meta_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, /*storage_v2_*/ nullptr, meta_storage_v3, /*uni_ps_*/ nullptr, nullptr); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) log_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, log_storage_v3); data_storage_writer = std::make_shared(run_mode, /*storage_v2_*/ nullptr, data_storage_v3); @@ -239,10 +261,10 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo if (storage_path_pool.isPSV2Deleted()) { - LOG_INFO(logger, "PageStorage V2 is already mark deleted. Current pagestorage change from {} to {} [ns_id={}]", // - static_cast(PageStorageRunMode::MIX_MODE), // - static_cast(PageStorageRunMode::ONLY_V3), // - ns_id); + LOG_INFO(logger, "PageStorage V2 is already mark deleted. Current pagestorage change from {} to {}, table_id={}", // + magic_enum::enum_name(PageStorageRunMode::MIX_MODE), + magic_enum::enum_name(PageStorageRunMode::ONLY_V3), + table_id); log_storage_v2 = nullptr; data_storage_v2 = nullptr; meta_storage_v2 = nullptr; @@ -277,6 +299,7 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo /* no_more_write_to_v2 */ true); } +<<<<<<< HEAD log_storage_reader = std::make_shared(run_mode, ns_id, log_storage_v2, log_storage_v3, nullptr); data_storage_reader = std::make_shared(run_mode, ns_id, data_storage_v2, data_storage_v3, nullptr); meta_storage_reader = std::make_shared(run_mode, ns_id, meta_storage_v2, meta_storage_v3, nullptr); @@ -284,6 +307,26 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo log_storage_writer = std::make_shared(run_mode, log_storage_v2, log_storage_v3); data_storage_writer = std::make_shared(run_mode, data_storage_v2, data_storage_v3); meta_storage_writer = std::make_shared(run_mode, meta_storage_v2, meta_storage_v3); +======= + log_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Log, table_id, log_storage_v2, log_storage_v3, /*uni_ps_*/ nullptr, nullptr); + data_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Data, table_id, data_storage_v2, data_storage_v3, /*uni_ps_*/ nullptr, nullptr); + meta_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr, nullptr); + + log_storage_writer = std::make_shared(run_mode, StorageType::Log, log_storage_v2, log_storage_v3, /*uni_ps_*/ nullptr); + data_storage_writer = std::make_shared(run_mode, StorageType::Data, data_storage_v2, data_storage_v3, /*uni_ps_*/ nullptr); + meta_storage_writer = std::make_shared(run_mode, StorageType::Meta, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr); + break; + } + case PageStorageRunMode::UNI_PS: + { + log_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Log, table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, nullptr); + data_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Data, table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, nullptr); + meta_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps, nullptr); + + log_storage_writer = std::make_shared(run_mode, StorageType::Log, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps); + data_storage_writer = std::make_shared(run_mode, StorageType::Data, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps); + meta_storage_writer = std::make_shared(run_mode, StorageType::Meta, /*storage_v2_*/ nullptr, /*storage_v3_*/ nullptr, uni_ps); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) break; } default: @@ -297,8 +340,13 @@ void StoragePool::forceTransformMetaV2toV3() throw Exception(fmt::format("Transform meta must run under mix mode [run_mode={}]", static_cast(run_mode))); assert(meta_storage_v2 != nullptr); assert(meta_storage_v3 != nullptr); +<<<<<<< HEAD auto meta_transform_storage_writer = std::make_shared(run_mode, meta_storage_v2, meta_storage_v3); auto meta_transform_storage_reader = std::make_shared(run_mode, ns_id, meta_storage_v2, meta_storage_v3, nullptr); +======= + auto meta_transform_storage_writer = std::make_shared(run_mode, StorageType::Meta, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr); + auto meta_transform_storage_reader = std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, meta_storage_v2, meta_storage_v3, /*uni_ps_*/ nullptr, nullptr); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) Pages pages_transform = {}; auto meta_transform_acceptor = [&](const DB::Page & page) { @@ -307,8 +355,8 @@ void StoragePool::forceTransformMetaV2toV3() meta_transform_storage_reader->traverse(meta_transform_acceptor, /*only_v2*/ true, /*only_v3*/ false); - WriteBatch write_batch_transform{ns_id}; - WriteBatch write_batch_del_v2{ns_id}; + WriteBatch write_batch_transform{table_id}; + WriteBatch write_batch_del_v2{table_id}; for (const auto & page_transform : pages_transform) { @@ -348,7 +396,7 @@ toV2ConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) void StoragePool::forceTransformDataV2toV3() { if (unlikely(run_mode != PageStorageRunMode::MIX_MODE)) - throw Exception(fmt::format("Transform meta must run under mix mode [run_mode={}]", static_cast(run_mode))); + throw Exception(fmt::format("Transform data must run under mix mode [run_mode={}]", static_cast(run_mode))); assert(data_storage_v2 != nullptr); assert(data_storage_v3 != nullptr); auto data_transform_storage_writer = std::make_shared(run_mode, data_storage_v2, data_storage_v3); @@ -375,8 +423,8 @@ void StoragePool::forceTransformDataV2toV3() // The page ids that can be accessed by DeltaTree const auto all_page_ids = v2_snap->view.validPageIds(); - WriteBatch write_batch_transform{ns_id}; - WriteBatch write_batch_del_v2{ns_id}; + WriteBatch write_batch_transform{table_id}; + WriteBatch write_batch_del_v2{table_id}; std::set created_dt_file_id; for (const auto page_id : all_page_ids) @@ -419,10 +467,12 @@ PageStorageRunMode StoragePool::restore() { case PageStorageRunMode::ONLY_V2: { + // Restore the PSV2 instances from disk log_storage_v2->restore(); data_storage_v2->restore(); meta_storage_v2->restore(); + // ONLY_V2, make sure the page_ids is larger than that in PSV2 max_log_page_id = log_storage_v2->getMaxId(); max_data_page_id = data_storage_v2->getMaxId(); max_meta_page_id = meta_storage_v2->getMaxId(); @@ -432,6 +482,9 @@ PageStorageRunMode StoragePool::restore() } case PageStorageRunMode::ONLY_V3: { + // ONLY_V3 + // - StoragePool is simply a wrapper for the PS instances in GlobalStoragePool + // - Make sure the page_ids is larger than that in PSV2 max_log_page_id = log_storage_v3->getMaxId(); max_data_page_id = data_storage_v3->getMaxId(); max_meta_page_id = meta_storage_v3->getMaxId(); @@ -441,6 +494,7 @@ PageStorageRunMode StoragePool::restore() } case PageStorageRunMode::MIX_MODE: { + // Restore the PSV2 instances from disk log_storage_v2->restore(); data_storage_v2->restore(); meta_storage_v2->restore(); @@ -449,44 +503,52 @@ PageStorageRunMode StoragePool::restore() // However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all. if (const auto & meta_remain_pages = meta_storage_v2->getNumberOfPages(); meta_remain_pages != 0) { - LOG_INFO(logger, "Current pool.meta transform to V3 begin [ns_id={}] [pages_before_transform={}]", ns_id, meta_remain_pages); + LOG_INFO(logger, "Current pool.meta transform to V3 begin, table_id={} pages_before_transform={}", table_id, meta_remain_pages); forceTransformMetaV2toV3(); const auto & meta_remain_pages_after_transform = meta_storage_v2->getNumberOfPages(); - LOG_INFO(logger, "Current pool.meta transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", // - ns_id, + LOG_INFO(logger, "Current pool.meta transform to V3 finished, table_id={} done={} pages_before_transform={} pages_after_transform={}", // + table_id, meta_remain_pages_after_transform == 0, meta_remain_pages, meta_remain_pages_after_transform); } else { - LOG_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id); + LOG_INFO(logger, "Current pool.meta transform already done before restored table_id={} ", table_id); } if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) { - LOG_INFO(logger, "Current pool.data transform to V3 begin [ns_id={}] [pages_before_transform={}]", ns_id, data_remain_pages); + LOG_INFO(logger, "Current pool.data transform to V3 begin, table_id={} pages_before_transform={}", table_id, data_remain_pages); forceTransformDataV2toV3(); const auto & data_remain_pages_after_transform = data_storage_v2->getNumberOfPages(); - LOG_INFO(logger, "Current pool.data transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", // - ns_id, + LOG_INFO(logger, "Current pool.data transform to V3 finished, table_id={} done={} pages_before_transform={} pages_after_transform={}", // + table_id, data_remain_pages_after_transform == 0, data_remain_pages, data_remain_pages_after_transform); } else { - LOG_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id); + LOG_INFO(logger, "Current pool.data transform already done before restored, table_id={}", table_id); } + // Though all the pages may have been transformed into PageStoage V3 format, we still need + // to ensure the following allocated page_ids is larger than that in both v2 and v3. + // Because `PageStorageV3->getMaxId` is not accurate after the previous "meta" and "data" + // transformed from v2 to v3. + max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId()); + max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId()); + max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId()); + // Check number of valid pages in v2 // If V2 already have no any data in disk, Then change run_mode to ONLY_V3 if (log_storage_v2->getNumberOfPages() == 0 && data_storage_v2->getNumberOfPages() == 0 && meta_storage_v2->getNumberOfPages() == 0) { - LOG_INFO(logger, "Current pagestorage change from {} to {} [ns_id={}]", // - static_cast(PageStorageRunMode::MIX_MODE), - static_cast(PageStorageRunMode::ONLY_V3), - ns_id); + LOG_INFO(logger, "Current pagestorage change from {} to {}, table_id={}", // + magic_enum::enum_name(PageStorageRunMode::MIX_MODE), + magic_enum::enum_name(PageStorageRunMode::ONLY_V3), + table_id); if (storage_path_pool.createPSV2DeleteMarkFile()) { log_storage_v2->drop(); @@ -498,40 +560,51 @@ PageStorageRunMode StoragePool::restore() meta_storage_v2 = nullptr; // Must init by PageStorageRunMode::ONLY_V3 +<<<<<<< HEAD log_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, ns_id, /*storage_v2_*/ nullptr, log_storage_v3, nullptr); data_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, ns_id, /*storage_v2_*/ nullptr, data_storage_v3, nullptr); meta_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, ns_id, /*storage_v2_*/ nullptr, meta_storage_v3, nullptr); +======= + log_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Log, table_id, /*storage_v2_*/ nullptr, log_storage_v3, /*uni_ps_*/ nullptr, nullptr); + data_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Data, table_id, /*storage_v2_*/ nullptr, data_storage_v3, /*uni_ps_*/ nullptr, nullptr); + meta_storage_reader = std::make_shared(PageStorageRunMode::ONLY_V3, keyspace_id, StorageType::Meta, table_id, /*storage_v2_*/ nullptr, meta_storage_v3, /*uni_ps_*/ nullptr, nullptr); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) log_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, log_storage_v3); data_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, data_storage_v3); meta_storage_writer = std::make_shared(PageStorageRunMode::ONLY_V3, /*storage_v2_*/ nullptr, meta_storage_v3); - max_log_page_id = log_storage_v3->getMaxId(); - max_data_page_id = data_storage_v3->getMaxId(); - max_meta_page_id = meta_storage_v3->getMaxId(); - run_mode = PageStorageRunMode::ONLY_V3; storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolV3Only}; } else // Still running Mix Mode { - max_log_page_id = std::max(log_storage_v2->getMaxId(), log_storage_v3->getMaxId()); - max_data_page_id = std::max(data_storage_v2->getMaxId(), data_storage_v3->getMaxId()); - max_meta_page_id = std::max(meta_storage_v2->getMaxId(), meta_storage_v3->getMaxId()); storage_pool_metrics = CurrentMetrics::Increment{CurrentMetrics::StoragePoolMixMode}; } break; } +<<<<<<< HEAD +======= + case PageStorageRunMode::UNI_PS: + { + // UNI_PS + // - StoragePool is simply a wrapper for the uni_ps + max_log_page_id = uni_ps->getMaxIdAfterRestart(); + max_data_page_id = uni_ps->getMaxIdAfterRestart(); + max_meta_page_id = uni_ps->getMaxIdAfterRestart(); + break; + } +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) default: throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); } - LOG_TRACE(logger, "Finished StoragePool restore. [current_run_mode={}] [ns_id={}]" - " [max_log_page_id={}] [max_data_page_id={}] [max_meta_page_id={}]", - static_cast(run_mode), - ns_id, - max_log_page_id, - max_data_page_id, - max_meta_page_id); + LOG_INFO(logger, "Finished StoragePool restore. current_run_mode={} table_id={}" + " max_log_page_id={} max_data_page_id={} max_meta_page_id={}", + magic_enum::enum_name(run_mode), + table_id, + max_log_page_id, + max_data_page_id, + max_meta_page_id); return run_mode; } @@ -567,6 +640,19 @@ void StoragePool::startup(ExternalPageCallbacks && callbacks) gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); }); break; } +<<<<<<< HEAD +======= + case PageStorageRunMode::UNI_PS: + { + // For uni ps, the GC is handled by `UniversalPageStorageService`, register callbacks with prefix for this table + UniversalExternalPageCallbacks us_callbacks; + us_callbacks.remover = std::move(callbacks.remover); + us_callbacks.scanner = std::move(callbacks.scanner); + us_callbacks.prefix = UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, table_id); + uni_ps->registerUniversalExternalPagesCallbacks(us_callbacks); + break; + } +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) default: throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); } @@ -588,12 +674,12 @@ void StoragePool::shutdown() meta_storage_v2->shutdown(); log_storage_v2->shutdown(); data_storage_v2->shutdown(); - data_storage_v2->unregisterExternalPagesCallbacks(ns_id); + data_storage_v2->unregisterExternalPagesCallbacks(table_id); break; } case PageStorageRunMode::ONLY_V3: { - data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + data_storage_v3->unregisterExternalPagesCallbacks(table_id); break; } case PageStorageRunMode::MIX_MODE: @@ -603,9 +689,17 @@ void StoragePool::shutdown() data_storage_v2->shutdown(); // We have transformed all external pages from V2 to V3 in `restore`, so // only need to unregister callbacks for V3. - data_storage_v3->unregisterExternalPagesCallbacks(ns_id); + data_storage_v3->unregisterExternalPagesCallbacks(table_id); break; } +<<<<<<< HEAD +======= + case PageStorageRunMode::UNI_PS: + { + uni_ps->unregisterUniversalExternalPagesCallbacks(UniversalPageIdFormat::toFullPrefix(keyspace_id, StorageType::Data, table_id)); + break; + } +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) default: throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); } @@ -688,7 +782,7 @@ PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, cons } // else there is a DTFile with that id, continue to acquire a new ID. LOG_WARNING(logger, - "The DTFile is already exists, continute to acquire another ID. [call={}][path={}] [id={}]", + "The DTFile is already exists, continute to acquire another ID. call={} path={} file_id={}", who, existed_path, dtfile_id); @@ -697,11 +791,16 @@ PageId StoragePool::newDataPageIdForDTFile(StableDiskDelegator & delegator, cons } template +<<<<<<< HEAD inline static PageReader newReader(const PageStorageRunMode run_mode, const NamespaceId ns_id, T & storage_v2, T & storage_v3, ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +======= +inline static PageReaderPtr newReader(const PageStorageRunMode run_mode, KeyspaceID keyspace_id, StorageType tag, const NamespaceID table_id, T & storage_v2, T & storage_v3, UniversalPageStoragePtr uni_ps, ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) { switch (run_mode) { case PageStorageRunMode::ONLY_V2: +<<<<<<< HEAD return PageReader(run_mode, ns_id, storage_v2, nullptr, snapshot_read ? storage_v2->getSnapshot(tracing_id) : nullptr, read_limiter); case PageStorageRunMode::ONLY_V3: return PageReader(run_mode, ns_id, nullptr, storage_v3, snapshot_read ? storage_v3->getSnapshot(tracing_id) : nullptr, read_limiter); @@ -710,6 +809,25 @@ inline static PageReader newReader(const PageStorageRunMode run_mode, const Name storage_v3->getSnapshot(fmt::format("{}-v3", tracing_id))) : nullptr, read_limiter); +======= + return std::make_shared(run_mode, keyspace_id, tag, table_id, storage_v2, nullptr, /*uni_ps*/ nullptr, snapshot_read ? storage_v2->getSnapshot(tracing_id) : nullptr, read_limiter); + case PageStorageRunMode::ONLY_V3: + return std::make_shared(run_mode, keyspace_id, tag, table_id, nullptr, storage_v3, /*uni_ps*/ nullptr, snapshot_read ? storage_v3->getSnapshot(tracing_id) : nullptr, read_limiter); + case PageStorageRunMode::MIX_MODE: + return std::make_shared( + run_mode, + keyspace_id, + tag, + table_id, + storage_v2, + storage_v3, + /*uni_ps*/ nullptr, + snapshot_read ? std::make_shared(storage_v2->getSnapshot(fmt::format("{}-v2", tracing_id)), storage_v3->getSnapshot(fmt::format("{}-v3", tracing_id))) + : nullptr, + read_limiter); + case PageStorageRunMode::UNI_PS: + return std::make_shared(run_mode, keyspace_id, tag, table_id, nullptr, nullptr, uni_ps, snapshot_read ? uni_ps->getSnapshot(tracing_id) : nullptr, read_limiter); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) default: throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast(run_mode)), ErrorCodes::LOGICAL_ERROR); } @@ -717,32 +835,56 @@ inline static PageReader newReader(const PageStorageRunMode run_mode, const Name PageReader StoragePool::newLogReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) { +<<<<<<< HEAD return newReader(run_mode, ns_id, log_storage_v2, log_storage_v3, read_limiter, snapshot_read, tracing_id); +======= + return newReader(run_mode, keyspace_id, StorageType::Log, table_id, log_storage_v2, log_storage_v3, uni_ps, read_limiter, snapshot_read, tracing_id); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } PageReader StoragePool::newLogReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) { +<<<<<<< HEAD return PageReader(run_mode, ns_id, log_storage_v2, log_storage_v3, snapshot, read_limiter); +======= + return std::make_shared(run_mode, keyspace_id, StorageType::Log, table_id, log_storage_v2, log_storage_v3, uni_ps, snapshot, read_limiter); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } PageReader StoragePool::newDataReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) { +<<<<<<< HEAD return newReader(run_mode, ns_id, data_storage_v2, data_storage_v3, read_limiter, snapshot_read, tracing_id); +======= + return newReader(run_mode, keyspace_id, StorageType::Data, table_id, data_storage_v2, data_storage_v3, uni_ps, read_limiter, snapshot_read, tracing_id); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } PageReader StoragePool::newDataReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) { +<<<<<<< HEAD return PageReader(run_mode, ns_id, data_storage_v2, data_storage_v3, snapshot, read_limiter); +======= + return std::make_shared(run_mode, keyspace_id, StorageType::Data, table_id, data_storage_v2, data_storage_v3, uni_ps, snapshot, read_limiter); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } PageReader StoragePool::newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id) { +<<<<<<< HEAD return newReader(run_mode, ns_id, meta_storage_v2, meta_storage_v3, read_limiter, snapshot_read, tracing_id); +======= + return newReader(run_mode, keyspace_id, StorageType::Meta, table_id, meta_storage_v2, meta_storage_v3, uni_ps, read_limiter, snapshot_read, tracing_id); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } PageReader StoragePool::newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot) { +<<<<<<< HEAD return PageReader(run_mode, ns_id, meta_storage_v2, meta_storage_v3, snapshot, read_limiter); +======= + return std::make_shared(run_mode, keyspace_id, StorageType::Meta, table_id, meta_storage_v2, meta_storage_v3, uni_ps, snapshot, read_limiter); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 8439835b637..bdebf908c1f 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -83,13 +83,23 @@ class StoragePool : private boost::noncopyable using Timepoint = Clock::time_point; using Seconds = std::chrono::seconds; +<<<<<<< HEAD StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPool & storage_path_pool_, const String & name = ""); +======= + StoragePool(Context & global_ctx, KeyspaceID keyspace_id_, NamespaceID table_id_, StoragePathPool & storage_path_pool_, const String & name = ""); +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) PageStorageRunMode restore(); ~StoragePool(); +<<<<<<< HEAD NamespaceId getNamespaceId() const { return ns_id; } +======= + KeyspaceID getKeyspaceID() const { return keyspace_id; } + + NamespaceID getNamespaceID() const { return table_id; } +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) PageStorageRunMode getPageStorageRunMode() const { @@ -157,10 +167,10 @@ class StoragePool : private boost::noncopyable // For function `newLogPageId`,`newMetaPageId`,`newDataPageIdForDTFile`: // For PageStorageRunMode::ONLY_V2, every table have its own three PageStorage (meta/data/log). // So these functions return the Page id starts from 1 and is continuously incremented. - // For PageStorageRunMode::ONLY_V3/MIX_MODE, PageStorage is global(distinguish by ns_id for different table). + // For PageStorageRunMode::ONLY_V3/MIX_MODE, PageStorage is global(distinguish by table_id for different table). // In order to avoid Page id from being reused (and cause troubles while restoring WAL from disk), // StoragePool will assign the max_log_page_id/max_meta_page_id/max_data_page_id by the global max id - // regardless of ns_id while being restored. This causes the ids in a table to not be continuously incremented. + // regardless of table_id while being restored. This causes the ids in a table to not be continuously incremented. PageId newDataPageIdForDTFile(StableDiskDelegator & delegator, const char * who); PageId newLogPageId() { return ++max_log_page_id; } @@ -181,8 +191,13 @@ class StoragePool : private boost::noncopyable PageStorageRunMode run_mode; +<<<<<<< HEAD // whether the three storage instance is owned by this StoragePool const NamespaceId ns_id; +======= + const KeyspaceID keyspace_id; + const NamespaceID table_id; +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) StoragePathPool & storage_path_pool; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index ee02f56ecd3..7f88ee364ae 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -24,6 +24,11 @@ #include #include +namespace DB::FailPoints +{ +extern const char force_set_dtfile_exist_when_acquire_id[]; +} // namespace DB::FailPoints + namespace DB { using namespace tests; @@ -688,30 +693,74 @@ try } CATCH - -TEST_F(PageStorageMixedTest, ReuseV2ID) +TEST_F(PageStorageMixedTest, GetMaxIdAfterUpgraded) try { const size_t buf_sz = 1024; char c_buff[buf_sz] = {0}; - { - WriteBatch batch; - ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); - batch.putPage(1, 0, buff, buf_sz); - page_writer_v2->write(std::move(batch), nullptr); - } + const PageIdU64 max_data_page_id_allocated = (1 << 30) + 1; + const PageIdU64 max_meta_page_id_allocated = (1 << 28) + 1; + { + // Prepare a StoragePool with + // - 0 pages in "log" (must be 0) + // - some pages in "data" with `max_data_page_id_allocated` + // - some pages in "meta" with `max_meta_page_id_allocated` + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + batch.putRefPage(2, 1); + ReadBufferPtr buff2 = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1 << 30, 0, buff2, buf_sz); + batch.putRefPage(max_data_page_id_allocated, 1 << 30); + storage_pool_v2->dataWriter()->write(std::move(batch), nullptr); + } + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(max_meta_page_id_allocated, 0, buff, buf_sz); + storage_pool_v2->metaWriter()->write(std::move(batch), nullptr); + } + } + + // Mock that after upgraded, the run_mode is transformed to `ONLY_V3` + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3); + + // Disable some failpoint to avoid it affect the allocated id + DB::FailPointHelper::disableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id); + SCOPE_EXIT({ DB::FailPointHelper::enableFailPoint(DB::FailPoints::force_set_dtfile_exist_when_acquire_id); }); + + // Allocate new id for "data" should be larger than `max_data_page_id_allocated` + auto d = storage_path_pool_v2->getStableDiskDelegator(); + EXPECT_GT(storage_pool_mix->newDataPageIdForDTFile(d, "GetMaxIdAfterUpgraded"), max_data_page_id_allocated); + + // Allocate new id for "meta" should be larger than `max_meta_page_id_allocated` + EXPECT_GT(storage_pool_mix->newMetaPageId(), max_meta_page_id_allocated); +} +CATCH +TEST_F(PageStorageMixedTest, ReuseV2ID) +try +{ + const size_t buf_sz = 1024; + char c_buff[buf_sz] = {0}; { - WriteBatch batch; - batch.delPage(1); - page_writer_v2->write(std::move(batch), nullptr); + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + page_writer_v2->write(std::move(batch), nullptr); + } + { + WriteBatch batch; + batch.delPage(1); + page_writer_v2->write(std::move(batch), nullptr); + } } - { - ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3); - ASSERT_EQ(storage_pool_mix->newLogPageId(), 1); - } + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3); + ASSERT_EQ(storage_pool_mix->newLogPageId(), 2); { WriteBatch batch; @@ -726,10 +775,15 @@ try page_writer_mix->write(std::move(batch), nullptr); } +<<<<<<< HEAD { ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3); ASSERT_EQ(storage_pool_mix->newLogPageId(), 2); } +======= + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::ONLY_V3); + // ASSERT_EQ(storage_pool_mix->newLogPageId(), 2); // max id for v3 will not be updated, ignore this check +>>>>>>> 1b6cc860f9 (Storage: Fix page_id being mis-reuse when upgrade from cluster < 6.5 (#9041) (release-7.1) (#9048)) } CATCH