From c2046123edad37610695ee80f87fa0f9085f3c97 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 25 Aug 2022 14:56:00 +0800 Subject: [PATCH 01/12] Try refine the grain of callbacks_mutex Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 51 +++++++++++++------ dbms/src/Storages/Page/V3/PageStorageImpl.h | 3 +- .../Page/V3/tests/gtest_page_storage.cpp | 30 +++++++++++ 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index dba1fef7566..77916f512e6 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -25,6 +26,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -336,24 +339,40 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li // TODO: `clean_external_page` for all tables may slow down the whole gc process when there are lots of table. void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics) { - // TODO: `callbacks_mutex` is being held during the whole `cleanExternalPage`, meaning gc will block - // creating/dropping table, need to refine it later. - std::scoped_lock lock{callbacks_mutex}; - statistics.num_external_callbacks = callbacks_container.size(); - if (!callbacks_container.empty()) + ExternalPageCallbacksContainer::iterator callbacks_iter; + { + std::scoped_lock lock{callbacks_mutex}; + statistics.num_external_callbacks = callbacks_container.size(); + if (statistics.num_external_callbacks == 0) + { + statistics.clean_external_page_ms = gc_watch.elapsedMillisecondsFromLastTime(); + return; + } + callbacks_iter = callbacks_container.begin(); + } + + SYNC_FOR("before_PageStorageImpl::doGC_clean_external_page"); + + Stopwatch external_watch; + while (true) { - Stopwatch external_watch; - for (const auto & [ns_id, callbacks] : callbacks_container) + const auto & callbacks = callbacks_iter->second; + // Note that we must call `scanner` before `getAliveExternalIds` + // Or some committed external ids is not included and we may + // remove the external page by accident with `remover`. + auto pending_external_pages = callbacks.scanner(); + statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); + auto alive_external_ids = page_directory->getAliveExternalIds(callbacks.ns_id); + statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); + callbacks.remover(pending_external_pages, alive_external_ids); + statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + + // move to next namespace callbacks { - // Note that we must call `scanner` before `getAliveExternalIds` - // Or some committed external ids is not included and we may - // remove the external page by accident with `remover`. - const auto pending_external_pages = callbacks.scanner(); - statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); - const auto alive_external_ids = page_directory->getAliveExternalIds(ns_id); - statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - callbacks.remover(pending_external_pages, alive_external_ids); - statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + std::scoped_lock lock{callbacks_mutex}; + callbacks_iter++; + if (callbacks_iter == callbacks_container.end()) + break; } } diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 9bce2e5dde8..df7317be947 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -172,7 +172,8 @@ class PageStorageImpl : public DB::PageStorage const static String manifests_file_name; std::mutex callbacks_mutex; - using ExternalPageCallbacksContainer = std::unordered_map; + // Only std::map not std::unordered_map. We need insert/erase do not invalid iterators. + using ExternalPageCallbacksContainer = std::map; ExternalPageCallbacksContainer callbacks_container; }; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 0ccdd1d9b2c..661c2497d42 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -13,6 +13,7 @@ // limitations under the License. +#include #include #include #include @@ -35,6 +36,8 @@ #include #include +#include + namespace DB { namespace FailPoints @@ -1238,6 +1241,33 @@ try } CATCH +TEST_F(PageStorageTest, ConcurrencyRemoveExtCallbacks) +try +{ + ExternalPageCallbacks callbacks; + callbacks.scanner = []() -> ExternalPageCallbacks::PathAndIdsVec { + return {}; + }; + callbacks.remover = [](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + }; + callbacks.ns_id = TEST_NAMESPACE_ID; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a segment merge and suspend it before applyMerge + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_clean_external_page"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + page_storage->unregisterExternalPagesCallbacks(TEST_NAMESPACE_ID); + + sp_gc.next(); + + th_gc.wait(); +} +CATCH + TEST_F(PageStorageTest, GcReuseSpaceThenRestore) try { From 99013193283967a0a7de451ce2ca3a6055a9055e Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 25 Aug 2022 15:59:14 +0800 Subject: [PATCH 02/12] Fine grained lock on clean_external_page Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 76 +++++++++++++++---- dbms/src/Storages/Page/V3/PageStorageImpl.h | 6 ++ .../Page/V3/tests/gtest_page_storage.cpp | 61 +++++++++++++-- 3 files changed, 123 insertions(+), 20 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 77916f512e6..a8f26e0572b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -339,9 +339,22 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li // TODO: `clean_external_page` for all tables may slow down the whole gc process when there are lots of table. void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics) { + // Fine grained lock on `callbacks_mutex` and `callbacks_remove_mutex`. + // So that adding/removing a storage will not be blocked for the whole + // processing time of `cleanExternalPage`. ExternalPageCallbacksContainer::iterator callbacks_iter; { std::scoped_lock lock{callbacks_mutex}; + { + std::scoped_lock remove_lock{callbacks_remove_mutex}; + // remove the callbacks by remove queue + for (auto ns_id : callbacks_remove_queue) + callbacks_container.erase(ns_id); + callbacks_remove_queue.clear(); + callbacks_remove_queue.rehash(0); + } + + // check and get the begin iter statistics.num_external_callbacks = callbacks_container.size(); if (statistics.num_external_callbacks == 0) { @@ -351,21 +364,33 @@ void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & callbacks_iter = callbacks_container.begin(); } - SYNC_FOR("before_PageStorageImpl::doGC_clean_external_page"); + SYNC_FOR("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); Stopwatch external_watch; while (true) { - const auto & callbacks = callbacks_iter->second; - // Note that we must call `scanner` before `getAliveExternalIds` - // Or some committed external ids is not included and we may - // remove the external page by accident with `remover`. - auto pending_external_pages = callbacks.scanner(); - statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); - auto alive_external_ids = page_directory->getAliveExternalIds(callbacks.ns_id); - statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - callbacks.remover(pending_external_pages, alive_external_ids); - statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + const auto & ns_id = callbacks_iter->first; + do + { + std::scoped_lock remove_lock{callbacks_remove_mutex}; + // If the ns_id is pushed into `remove_queue`, then the `callbacks` + // are invalid and may cause failure, break do..while to continue + // on next namespace + if (callbacks_remove_queue.count(ns_id) > 0) + break; + + // `remove_lock` ensure that the `callbacks` won't be invalid + const auto & callbacks = callbacks_iter->second; + // Note that we must call `scanner` before `getAliveExternalIds` + // Or some committed external ids is not included and we may + // remove the external page by accident with `remover`. + auto pending_external_pages = callbacks.scanner(); + statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); + auto alive_external_ids = page_directory->getAliveExternalIds(callbacks.ns_id); + statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); + callbacks.remover(pending_external_pages, alive_external_ids); + statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); + } while (false); // move to next namespace callbacks { @@ -464,15 +489,38 @@ void PageStorageImpl::registerExternalPagesCallbacks(const ExternalPageCallbacks assert(callbacks.scanner != nullptr); assert(callbacks.remover != nullptr); assert(callbacks.ns_id != MAX_NAMESPACE_ID); - assert(callbacks_container.count(callbacks.ns_id) == 0); + { +#ifdef NDEBUG + // In product env, we assume that NamespaceId(TableID) will not be reuse, + // so that creating table don't require to be blocked by `callbacks_remove_mutex`. + RUNTIME_CHECK_MSG( + callbacks_container.count(callbacks.ns_id) == 0, + "Try to create callbacks for duplicated namespace id {}", + callbacks.ns_id); +#else + // In unit test, we may create, drop, recreate with same NamespaceId, + // do a cleanup and check. + std::scoped_lock remove_lock{callbacks_remove_mutex}; + // remove the callbacks by remove queue + for (auto ns_id : callbacks_remove_queue) + callbacks_container.erase(ns_id); + callbacks_remove_queue.clear(); + callbacks_remove_queue.rehash(0); + assert(callbacks_container.count(callbacks.ns_id) == 0); +#endif + } + // `emplace` won't invalid other iterator callbacks_container.emplace(callbacks.ns_id, callbacks); } void PageStorageImpl::unregisterExternalPagesCallbacks(NamespaceId ns_id) { { - std::scoped_lock lock{callbacks_mutex}; - callbacks_container.erase(ns_id); + std::scoped_lock remove_lock{callbacks_remove_mutex}; + // `callbacks_container.erase` will invalid the iterator of `ns_id` + // we only push the id into queue. + // The callbacks will be removed inside `cleanExternalPage` + callbacks_remove_queue.emplace(ns_id); } // clean all external ids ptrs page_directory->unregisterNamespace(ns_id); diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index df7317be947..4c6797ba1fb 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -15,12 +15,16 @@ #pragma once #include +#include +#include #include #include #include #include #include +#include + namespace DB { namespace PS::V3 @@ -175,6 +179,8 @@ class PageStorageImpl : public DB::PageStorage // Only std::map not std::unordered_map. We need insert/erase do not invalid iterators. using ExternalPageCallbacksContainer = std::map; ExternalPageCallbacksContainer callbacks_container; + std::mutex callbacks_remove_mutex; + std::unordered_set callbacks_remove_queue; }; } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index 661c2497d42..a6afd9a7d11 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -36,6 +36,7 @@ #include #include +#include #include namespace DB @@ -1241,29 +1242,77 @@ try } CATCH +TEST_F(PageStorageTest, ConcurrencyAddExtCallbacks) +try +{ + auto * ptr = new int(100); // mock the `StorageDeltaMerge` + SCOPE_EXIT({ delete ptr; }); + ExternalPageCallbacks callbacks; + callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + (*ptr) += 1; // mock access the storage inside callback + }; + callbacks.ns_id = TEST_NAMESPACE_ID; + page_storage->registerExternalPagesCallbacks(callbacks); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); + }); + sp_gc.waitAndPause(); + + // mock table created while gc is running + { + ExternalPageCallbacks new_callbacks; + new_callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + (*ptr) += 1; // mock access the storage inside callback + return {}; + }; + new_callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + (*ptr) += 1; // mock access the storage inside callback + }; + new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; + page_storage->registerExternalPagesCallbacks(new_callbacks); + } + + sp_gc.next(); // continue the gc + th_gc.wait(); +} +CATCH + TEST_F(PageStorageTest, ConcurrencyRemoveExtCallbacks) try { + auto * ptr = new int(100); // mock the `StorageDeltaMerge` + SCOPE_EXIT({ delete ptr; }); ExternalPageCallbacks callbacks; - callbacks.scanner = []() -> ExternalPageCallbacks::PathAndIdsVec { + callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + (*ptr) += 1; // mock access the storage inside callback return {}; }; - callbacks.remover = [](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + (*ptr) += 1; // mock access the storage inside callback }; callbacks.ns_id = TEST_NAMESPACE_ID; page_storage->registerExternalPagesCallbacks(callbacks); - // Start a segment merge and suspend it before applyMerge - auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::doGC_clean_external_page"); + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); auto th_gc = std::async([&]() { page_storage->gcImpl(/*not_skip*/ true, nullptr, nullptr); }); sp_gc.waitAndPause(); + // mock table dropped while gc is running page_storage->unregisterExternalPagesCallbacks(TEST_NAMESPACE_ID); + delete ptr; + ptr = nullptr; - sp_gc.next(); - + sp_gc.next(); // continue the gc th_gc.wait(); } CATCH From 8230b438fb6cca6c2a8f8f583706604cdbda09dc Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 29 Aug 2022 14:19:25 +0800 Subject: [PATCH 03/12] Make callbacks safe to be enter after table dropped Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 67 ++++++++++++------- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 2 +- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 900239e623a..5684852bde4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -41,6 +41,7 @@ #include #include +#include namespace ProfileEvents { @@ -196,7 +197,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, size_t rowkey_column_size_, const Settings & settings_) : global_context(db_context.getGlobalContext()) - , path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)) + , path_pool(std::make_shared(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))) , settings(settings_) , db_name(db_name_) , table_name(table_name_) @@ -216,7 +217,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, storage_pool = std::make_shared(global_context, ns_id, - path_pool, + *path_pool, db_name_ + "." + table_name_); // Restore existing dm files and set capacity for path_pool. @@ -296,25 +297,45 @@ DeltaMergeStore::~DeltaMergeStore() void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) { + // Callbacks for cleaning outdated DTFiles. Note that there is a chance + // that callbacks is called after the `DeltaMergeStore` dropped, we must + // make the callbacks safe. ExternalPageCallbacks callbacks; - // V2 callbacks for cleaning DTFiles - callbacks.scanner = [this]() { + callbacks.ns_id = storage_pool->getNamespaceId(); + callbacks.scanner = [path_pool_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec; - auto delegate = path_pool.getStableDiskDelegator(); + + // If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table, + // simply return an empty list is OK. + auto path_pool = path_pool_ref.lock(); + if (!path_pool) + return path_and_ids_vec; + + // Return the DTFiles on disks. + auto delegate = path_pool->getStableDiskDelegator(); + // Only return the DTFiles can be GC. The page id of not able to be GC files, which is being ingested or in the middle of + // SegmentSplit/Merge/MergeDelta, is not yet applied + // to PageStorage is marked as not able to be GC, so we don't return them and run the `remover` DMFile::ListOptions options; options.only_list_can_gc = true; for (auto & root_path : delegate.listPaths()) { - auto & path_and_ids = path_and_ids_vec.emplace_back(); - path_and_ids.first = root_path; - auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options); - for (auto id : file_ids_in_current_path) - path_and_ids.second.insert(id); + std::set ids_under_path; + auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options); + path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path)); } return path_and_ids_vec; }; - callbacks.remover = [this](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { - auto delegate = path_pool.getStableDiskDelegator(); + callbacks.remover = [path_pool_ref = std::weak_ptr(path_pool), // + file_provider = global_context.getFileProvider(), + logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { + // If the StoragePathPool is invalid, meaning we call `remover` after dropping the table, + // simply skip is OK. + auto path_pool = path_pool_ref.lock(); + if (!path_pool) + return; + + auto delegate = path_pool->getStableDiskDelegator(); for (const auto & [path, ids] : path_and_ids_vec) { for (auto id : ids) @@ -323,18 +344,17 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) continue; // Note that page_id is useless here. - auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); + auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); if (dmfile->canGC()) { delegate.removeDTFile(dmfile->fileId()); - dmfile->remove(global_context.getFileProvider()); + dmfile->remove(file_provider); } - LOG_FMT_INFO(log, "GC removed useless dmfile: {}", dmfile->path()); + LOG_FMT_INFO(logger, "GC removed useless dmfile: {}", dmfile->path()); } } }; - callbacks.ns_id = storage_pool->getNamespaceId(); // remember to unregister it when shutdown storage_pool->dataRegisterExternalPagesCallbacks(callbacks); storage_pool->enableGC(); @@ -359,15 +379,16 @@ void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_ { if (clean_rename) { - path_pool.rename(new_database_name, new_table_name, clean_rename); + path_pool->rename(new_database_name, new_table_name, clean_rename); } else { + // Note: Only for mocking test, should not run into here in product env. LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name); // Remove all background task first shutdown(); - path_pool.rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk + path_pool->rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk } // TODO: replacing these two variables is not atomic, but could be good enough? @@ -465,7 +486,7 @@ void DeltaMergeStore::drop() storage_pool->drop(); // Drop data in storage path pool - path_pool.drop(/*recursive=*/true, /*must_success=*/false); + path_pool->drop(/*recursive=*/true, /*must_success=*/false); LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name); } @@ -496,7 +517,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: // Because db_context could be a temporary object and won't last long enough during the query process. // Like the context created by InterpreterSelectWithUnionQuery. auto * ctx = new DMContext(db_context.getGlobalContext(), - path_pool, + *path_pool, *storage_pool, latest_gc_safe_point.load(std::memory_order_acquire), settings.not_compress_columns, @@ -704,7 +725,7 @@ std::tuple DeltaMergeStore::preAllocateIngestFile() if (shutdown_called.load(std::memory_order_relaxed)) return {}; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); auto parent_path = delegator.choosePath(); auto new_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); return {parent_path, new_id}; @@ -715,7 +736,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil if (shutdown_called.load(std::memory_order_relaxed)) return; - auto delegator = path_pool.getStableDiskDelegator(); + auto delegator = path_pool->getStableDiskDelegator(); delegator.addDTFile(file_id, file_size, parent_path); } @@ -2508,7 +2529,7 @@ void DeltaMergeStore::restoreStableFiles() options.only_list_can_gc = false; // We need all files to restore the bytes on disk options.clean_up = true; auto file_provider = global_context.getFileProvider(); - auto path_delegate = path_pool.getStableDiskDelegator(); + auto path_delegate = path_pool->getStableDiskDelegator(); for (const auto & root_path : path_delegate.listPaths()) { for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options)) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 769786ce070..897e17410db 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -529,7 +529,7 @@ class DeltaMergeStore : private boost::noncopyable #endif Context & global_context; - StoragePathPool path_pool; + std::shared_ptr path_pool; Settings settings; StoragePoolPtr storage_pool; From d0a313f6cbb90aa50ea677dbb37270cd1da79e35 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 29 Aug 2022 14:52:34 +0800 Subject: [PATCH 04/12] Remove callbacks_remove_mutex Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 8 +- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 2 +- .../tests/gtest_dm_delta_merge_store.cpp | 88 ++++++++++++++- .../gtest_dm_delta_merge_store_test_basic.h | 6 +- dbms/src/Storages/Page/PageStorage.h | 5 +- dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 100 +++++++----------- dbms/src/Storages/Page/V3/PageStorageImpl.h | 8 +- .../Page/V3/tests/gtest_page_storage.cpp | 51 ++++++--- 8 files changed, 173 insertions(+), 95 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5684852bde4..062063fcd26 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -302,12 +302,12 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) // make the callbacks safe. ExternalPageCallbacks callbacks; callbacks.ns_id = storage_pool->getNamespaceId(); - callbacks.scanner = [path_pool_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { + callbacks.scanner = [path_pool_weak_ref = std::weak_ptr(path_pool), file_provider = global_context.getFileProvider()]() { ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec; // If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table, // simply return an empty list is OK. - auto path_pool = path_pool_ref.lock(); + auto path_pool = path_pool_weak_ref.lock(); if (!path_pool) return path_and_ids_vec; @@ -326,12 +326,12 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) } return path_and_ids_vec; }; - callbacks.remover = [path_pool_ref = std::weak_ptr(path_pool), // + callbacks.remover = [path_pool_weak_ref = std::weak_ptr(path_pool), // file_provider = global_context.getFileProvider(), logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set & valid_ids) { // If the StoragePathPool is invalid, meaning we call `remover` after dropping the table, // simply skip is OK. - auto path_pool = path_pool_ref.lock(); + auto path_pool = path_pool_weak_ref.lock(); if (!path_pool) return; diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index c9ba3943a60..8a2ebf5ad9e 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -135,7 +135,7 @@ FileUsageStatistics GlobalStoragePool::getLogFileUsage() const bool GlobalStoragePool::gc() { - return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD); + return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD); } bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 471d59f760c..cc143d03fae 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -18,12 +18,15 @@ #include #include #include +#include #include #include +#include #include #include #include #include +#include #include #include @@ -84,11 +87,94 @@ try // check column structure of store const auto & cols = store->getTableColumns(); // version & tag column added - ASSERT_EQ(cols.size(), 3UL); + ASSERT_EQ(cols.size(), 3); } } CATCH +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); + } + + // drop table in the middle of page storage gc + store->shutdown(); + store = nullptr; + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); +} +CATCH + +TEST_F(DeltaMergeStoreTest, CreateInMiddleDTFileGC) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before clean external page + auto sp_gc = SyncPointCtl::enableInScope("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + DeltaMergeStorePtr new_store; + ColumnDefinesPtr new_cols; + { + new_cols = DMTestEnv::getDefaultColumns(); + ColumnDefine handle_column_define = (*new_cols)[0]; + new_store = std::make_shared(*db_context, + false, + "test", + "t_200", + 200, + *new_cols, + handle_column_define, + false, + 1, + DeltaMergeStore::Settings()); + } + + sp_gc.next(); // continue the page storage gc + th_gc.wait(); + + BlockInputStreamPtr in = new_store->read(*db_context, + db_context->getSettingsRef(), + *new_cols, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + "", + /* keep_order= */ false, + /* is_fast_mode= */ false, + /* expected_block_size= */ 1024)[0]; + ASSERT_INPUTSTREAM_NROWS(in, 0); +} +CATCH + TEST_F(DeltaMergeStoreTest, OpenWithExtraColumns) try { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 5f8c098a8b1..a34a5125e28 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -67,7 +67,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreTest", + "t_100", 100, *cols, handle_column_define, @@ -135,7 +135,7 @@ class DeltaMergeStoreRWTest DeltaMergeStorePtr s = std::make_shared(*db_context, false, "test", - "DeltaMergeStoreRWTest", + "t_101", 101, *cols, handle_column_define, @@ -180,4 +180,4 @@ class DeltaMergeStoreRWTest }; } // namespace tests } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index b77656785a7..7468c4fde44 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -68,13 +68,13 @@ enum class PageStorageRunMode : UInt8 struct ExternalPageCallbacks { - // `scanner` for scanning avaliable external page ids on disks. + // `scanner` for scanning available external page ids on disks. // `remover` will be called with living normal page ids after gc run a round, user should remove those // external pages(files) in `pending_external_pages` but not in `valid_normal_pages` using PathAndIdsVec = std::vector>>; using ExternalPagesScanner = std::function; using ExternalPagesRemover - = std::function & valid_normal_pages)>; + = std::function & valid_normal_pages)>; ExternalPagesScanner scanner = nullptr; ExternalPagesRemover remover = nullptr; NamespaceId ns_id = MAX_NAMESPACE_ID; @@ -336,6 +336,7 @@ class PageStorage : private boost::noncopyable } // Register and unregister external pages GC callbacks + // Note that user must ensure that it is safe to call `scanner` and `remover` even after unregister. virtual void registerExternalPagesCallbacks(const ExternalPageCallbacks & callbacks) = 0; virtual void unregisterExternalPagesCallbacks(NamespaceId /*ns_id*/){}; diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index a8f26e0572b..67611dc980b 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -339,65 +339,55 @@ bool PageStorageImpl::gcImpl(bool /*not_skip*/, const WriteLimiterPtr & write_li // TODO: `clean_external_page` for all tables may slow down the whole gc process when there are lots of table. void PageStorageImpl::cleanExternalPage(Stopwatch & gc_watch, GCTimeStatistics & statistics) { - // Fine grained lock on `callbacks_mutex` and `callbacks_remove_mutex`. + // Fine grained lock on `callbacks_mutex`. // So that adding/removing a storage will not be blocked for the whole // processing time of `cleanExternalPage`. - ExternalPageCallbacksContainer::iterator callbacks_iter; + std::shared_ptr ns_callbacks; { std::scoped_lock lock{callbacks_mutex}; - { - std::scoped_lock remove_lock{callbacks_remove_mutex}; - // remove the callbacks by remove queue - for (auto ns_id : callbacks_remove_queue) - callbacks_container.erase(ns_id); - callbacks_remove_queue.clear(); - callbacks_remove_queue.rehash(0); - } - // check and get the begin iter statistics.num_external_callbacks = callbacks_container.size(); - if (statistics.num_external_callbacks == 0) + auto iter = callbacks_container.begin(); + if (iter == callbacks_container.end()) // empty { statistics.clean_external_page_ms = gc_watch.elapsedMillisecondsFromLastTime(); return; } - callbacks_iter = callbacks_container.begin(); + + assert(iter != callbacks_container.end()); // early exit in the previous code + // keep the shared_ptr so that erasing ns_id from PageStorage won't invalid the `ns_callbacks` + ns_callbacks = iter->second; } + Stopwatch external_watch; + SYNC_FOR("before_PageStorageImpl::cleanExternalPage_execute_callbacks"); - Stopwatch external_watch; while (true) { - const auto & ns_id = callbacks_iter->first; - do - { - std::scoped_lock remove_lock{callbacks_remove_mutex}; - // If the ns_id is pushed into `remove_queue`, then the `callbacks` - // are invalid and may cause failure, break do..while to continue - // on next namespace - if (callbacks_remove_queue.count(ns_id) > 0) - break; - - // `remove_lock` ensure that the `callbacks` won't be invalid - const auto & callbacks = callbacks_iter->second; - // Note that we must call `scanner` before `getAliveExternalIds` - // Or some committed external ids is not included and we may - // remove the external page by accident with `remover`. - auto pending_external_pages = callbacks.scanner(); - statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); - auto alive_external_ids = page_directory->getAliveExternalIds(callbacks.ns_id); - statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); - callbacks.remover(pending_external_pages, alive_external_ids); - statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); - } while (false); + // 1. Note that we must call `scanner` before `getAliveExternalIds`. + // Or some committed external ids is not included in `alive_ids` + // but exist in `pending_external_pages`. They will be removed by + // accident with `remover` under this situation. + // 2. Assume calling the callbacks after erasing ns_is is safe. + + // the external pages on disks. + auto pending_external_pages = ns_callbacks->scanner(); + statistics.external_page_scan_ns += external_watch.elapsedFromLastTime(); + auto alive_external_ids = page_directory->getAliveExternalIds(ns_callbacks->ns_id); + statistics.external_page_get_alive_ns += external_watch.elapsedFromLastTime(); + // remove the external pages that is not alive now. + ns_callbacks->remover(pending_external_pages, alive_external_ids); + statistics.external_page_remove_ns += external_watch.elapsedFromLastTime(); // move to next namespace callbacks { std::scoped_lock lock{callbacks_mutex}; - callbacks_iter++; - if (callbacks_iter == callbacks_container.end()) + // next ns_id that is greater than `ns_id` + auto iter = callbacks_container.upper_bound(ns_callbacks->ns_id); + if (iter == callbacks_container.end()) break; + ns_callbacks = iter->second; } } @@ -489,38 +479,20 @@ void PageStorageImpl::registerExternalPagesCallbacks(const ExternalPageCallbacks assert(callbacks.scanner != nullptr); assert(callbacks.remover != nullptr); assert(callbacks.ns_id != MAX_NAMESPACE_ID); - { -#ifdef NDEBUG - // In product env, we assume that NamespaceId(TableID) will not be reuse, - // so that creating table don't require to be blocked by `callbacks_remove_mutex`. - RUNTIME_CHECK_MSG( - callbacks_container.count(callbacks.ns_id) == 0, - "Try to create callbacks for duplicated namespace id {}", - callbacks.ns_id); -#else - // In unit test, we may create, drop, recreate with same NamespaceId, - // do a cleanup and check. - std::scoped_lock remove_lock{callbacks_remove_mutex}; - // remove the callbacks by remove queue - for (auto ns_id : callbacks_remove_queue) - callbacks_container.erase(ns_id); - callbacks_remove_queue.clear(); - callbacks_remove_queue.rehash(0); - assert(callbacks_container.count(callbacks.ns_id) == 0); -#endif - } + // NamespaceId(TableID) should not be reuse + RUNTIME_CHECK_MSG( + callbacks_container.count(callbacks.ns_id) == 0, + "Try to create callbacks for duplicated namespace id {}", + callbacks.ns_id); // `emplace` won't invalid other iterator - callbacks_container.emplace(callbacks.ns_id, callbacks); + callbacks_container.emplace(callbacks.ns_id, std::make_shared(callbacks)); } void PageStorageImpl::unregisterExternalPagesCallbacks(NamespaceId ns_id) { { - std::scoped_lock remove_lock{callbacks_remove_mutex}; - // `callbacks_container.erase` will invalid the iterator of `ns_id` - // we only push the id into queue. - // The callbacks will be removed inside `cleanExternalPage` - callbacks_remove_queue.emplace(ns_id); + std::scoped_lock lock{callbacks_mutex}; + callbacks_container.erase(ns_id); } // clean all external ids ptrs page_directory->unregisterNamespace(ns_id); diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.h b/dbms/src/Storages/Page/V3/PageStorageImpl.h index 4c6797ba1fb..321c9742f66 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.h +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.h @@ -23,8 +23,6 @@ #include #include -#include - namespace DB { namespace PS::V3 @@ -176,11 +174,9 @@ class PageStorageImpl : public DB::PageStorage const static String manifests_file_name; std::mutex callbacks_mutex; - // Only std::map not std::unordered_map. We need insert/erase do not invalid iterators. - using ExternalPageCallbacksContainer = std::map; + // Only std::map not std::unordered_map. We need insert/erase do not invalid other iterators. + using ExternalPageCallbacksContainer = std::map>; ExternalPageCallbacksContainer callbacks_container; - std::mutex callbacks_remove_mutex; - std::unordered_set callbacks_remove_queue; }; } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index a6afd9a7d11..0c86fbb52ec 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1245,17 +1245,24 @@ CATCH TEST_F(PageStorageTest, ConcurrencyAddExtCallbacks) try { - auto * ptr = new int(100); // mock the `StorageDeltaMerge` - SCOPE_EXIT({ delete ptr; }); + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` ExternalPageCallbacks callbacks; - callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + (*ptr) += 1; // mock access the storage inside callback return {}; }; - callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + (*ptr) += 1; // mock access the storage inside callback }; - callbacks.ns_id = TEST_NAMESPACE_ID; page_storage->registerExternalPagesCallbacks(callbacks); // Start a PageStorage gc and suspend it before clean external page @@ -1268,36 +1275,53 @@ try // mock table created while gc is running { ExternalPageCallbacks new_callbacks; - new_callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; + new_callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + (*ptr) += 1; // mock access the storage inside callback return {}; }; - new_callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + new_callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + (*ptr) += 1; // mock access the storage inside callback }; - new_callbacks.ns_id = TEST_NAMESPACE_ID + 1; page_storage->registerExternalPagesCallbacks(new_callbacks); } sp_gc.next(); // continue the gc th_gc.wait(); + + ASSERT_EQ(*ptr, 100 + 4); } CATCH TEST_F(PageStorageTest, ConcurrencyRemoveExtCallbacks) try { - auto * ptr = new int(100); // mock the `StorageDeltaMerge` - SCOPE_EXIT({ delete ptr; }); + auto ptr = std::make_shared(100); // mock the `StorageDeltaMerge` ExternalPageCallbacks callbacks; - callbacks.scanner = [&ptr]() -> ExternalPageCallbacks::PathAndIdsVec { + callbacks.ns_id = TEST_NAMESPACE_ID; + callbacks.scanner = [ptr_weak_ref = std::weak_ptr(ptr)]() -> ExternalPageCallbacks::PathAndIdsVec { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return {}; + (*ptr) += 1; // mock access the storage inside callback return {}; }; - callbacks.remover = [&ptr](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + callbacks.remover = [ptr_weak_ref = std::weak_ptr(ptr)](const ExternalPageCallbacks::PathAndIdsVec &, const std::set &) -> void { + auto ptr = ptr_weak_ref.lock(); + if (!ptr) + return; + (*ptr) += 1; // mock access the storage inside callback }; - callbacks.ns_id = TEST_NAMESPACE_ID; page_storage->registerExternalPagesCallbacks(callbacks); // Start a PageStorage gc and suspend it before clean external page @@ -1309,7 +1333,6 @@ try // mock table dropped while gc is running page_storage->unregisterExternalPagesCallbacks(TEST_NAMESPACE_ID); - delete ptr; ptr = nullptr; sp_gc.next(); // continue the gc From cbf5cb38066512a091d66c0d370abc9c6db31af3 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 31 Aug 2022 23:58:17 +0800 Subject: [PATCH 05/12] Add path check for DMFile::restore Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 24 ++++++++++-- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 7 +++- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 4 +- .../tests/gtest_dm_delta_merge_store.cpp | 39 ++++++++++++++++++- 4 files changed, 67 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 062063fcd26..06d6bf8b086 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -335,6 +335,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) if (!path_pool) return; + SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove"); auto delegate = path_pool->getStableDiskDelegator(); for (const auto & [path, ids] : path_and_ids_vec) { @@ -345,13 +346,28 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) // Note that page_id is useless here. auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); - if (dmfile->canGC()) + if (unlikely(!dmfile)) { + try + { + delegate.removeDTFile(id); + } + catch (DB::Exception & e) + { + // just ignore + } + LOG_FMT_INFO(logger, + "GC try remove useless DM file, but file not found and may have been removed, dmfile={}", + DMFile::getPathByStatus(path, id, DMFile::Status::READABLE)); + } + else if (dmfile->canGC()) + { + // scanner should only return dtfiles that can GC, + // just another check here. delegate.removeDTFile(dmfile->fileId()); dmfile->remove(file_provider); + LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); } - - LOG_FMT_INFO(logger, "GC removed useless dmfile: {}", dmfile->path()); } } }; @@ -485,6 +501,8 @@ void DeltaMergeStore::drop() dropAllSegments(false); storage_pool->drop(); + SYNC_FOR("before"); + // Drop data in storage path pool path_pool->drop(/*recursive=*/true, /*must_success=*/false); LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index fe984ad519f..e05d98a15bd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -151,7 +151,12 @@ DMFilePtr DMFile::restore( const ReadMetaMode & read_meta_mode) { String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); - bool single_file_mode = Poco::File(path).isFile(); + // The path may be dropped by another thread in some cases + auto poco_file = Poco::File(path); + if (!poco_file.exists()) + return nullptr; + + bool single_file_mode = poco_file.isFile(); DMFilePtr dmfile(new DMFile( file_id, page_id, diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 8a2ebf5ad9e..fbe6ab681d8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -445,7 +445,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id); + LOG_FMT_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id); } if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) @@ -461,7 +461,7 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}]", ns_id); + LOG_FMT_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id); } // Check number of valid pages in v2 diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index cc143d03fae..e23640f618a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -33,6 +33,8 @@ #include #include +#include "Storages/DeltaMerge/RowKeyRange.h" + namespace DB { namespace FailPoints @@ -124,6 +126,38 @@ try } CATCH +TEST_F(DeltaMergeStoreTest, DroppedInMiddleDTFileRemoveCallback) +try +{ + // create table + ASSERT_NE(store, nullptr); + + auto global_page_storage = TiFlashTestEnv::getGlobalContext().getGlobalStoragePool(); + + // Start a PageStorage gc and suspend it before removing dtfiles + auto sp_gc = SyncPointCtl::enableInScope("before_DeltaMergeStore::callbacks_remover_remove"); + auto th_gc = std::async([&]() { + if (global_page_storage) + global_page_storage->gc(); + }); + sp_gc.waitAndPause(); + + { + // check column structure of store + const auto & cols = store->getTableColumns(); + // version & tag column added + ASSERT_EQ(cols.size(), 3); + } + + // drop table and files in the middle of page storage gc + store->drop(); + store = nullptr; + + sp_gc.next(); // continue removing dtfiles + th_gc.wait(); +} +CATCH + TEST_F(DeltaMergeStoreTest, CreateInMiddleDTFileGC) try { @@ -155,6 +189,9 @@ try false, 1, DeltaMergeStore::Settings()); + auto block = DMTestEnv::prepareSimpleWriteBlock(0, 100, false); + new_store->write(*db_context, db_context->getSettingsRef(), block); + new_store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); } sp_gc.next(); // continue the page storage gc @@ -171,7 +208,7 @@ try /* keep_order= */ false, /* is_fast_mode= */ false, /* expected_block_size= */ 1024)[0]; - ASSERT_INPUTSTREAM_NROWS(in, 0); + ASSERT_INPUTSTREAM_NROWS(in, 100); } CATCH From 4677d78942c1569c8d8f7f58485dcf99b3967e33 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 1 Sep 2022 01:04:17 +0800 Subject: [PATCH 06/12] RUNTIME_ASSERT for renaming table running into rename directories Remove useless rename on read test Signed-off-by: JaySon-Huang --- dbms/src/Databases/test/gtest_database.cpp | 36 +++++----- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 21 ++---- .../tests/gtest_dm_delta_merge_store.cpp | 2 + .../tests/gtest_dm_storage_delta_merge.cpp | 9 +-- dbms/src/Storages/StorageDeltaMerge.cpp | 67 +++++-------------- .../raft/schema/rename_on_read.test | 38 ----------- .../raft/txn_mock/partition_table.test | 13 +--- 7 files changed, 47 insertions(+), 139 deletions(-) delete mode 100644 tests/delta-merge-test/raft/schema/rename_on_read.test diff --git a/dbms/src/Databases/test/gtest_database.cpp b/dbms/src/Databases/test/gtest_database.cpp index 6b8bbc17348..b469a571570 100644 --- a/dbms/src/Databases/test/gtest_database.cpp +++ b/dbms/src/Databases/test/gtest_database.cpp @@ -273,18 +273,15 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, to_tbl_name, db_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db, tbl_name, db_name, to_tbl_display_name); - auto old_storage = db->tryGetTable(ctx, tbl_name); - ASSERT_EQ(old_storage, nullptr); - - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db_name); @@ -294,13 +291,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db->tryGetTable(ctx, to_tbl_name); + auto storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -391,18 +388,18 @@ try EXPECT_EQ(managed_storage->getDatabaseName(), db_name); } - const String to_tbl_name = "t_112"; + const String to_tbl_display_name = "tbl_test"; { // Rename table - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name); + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, to_tbl_display_name); auto old_storage = db->tryGetTable(ctx, tbl_name); ASSERT_EQ(old_storage, nullptr); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_NE(storage, nullptr); EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name); - EXPECT_EQ(storage->getTableName(), to_tbl_name); + EXPECT_EQ(storage->getTableName(), tbl_name); auto managed_storage = std::dynamic_pointer_cast(storage); EXPECT_EQ(managed_storage->getDatabaseName(), db2_name); @@ -412,13 +409,13 @@ try // Drop table auto drop_query = std::make_shared(); drop_query->database = db2_name; - drop_query->table = to_tbl_name; + drop_query->table = tbl_name; drop_query->if_exists = false; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, ctx); drop_interpreter.execute(); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } @@ -501,18 +498,17 @@ try EXPECT_FALSE(db->empty(ctx)); EXPECT_TRUE(db->isTableExist(ctx, tbl_name)); - const String to_tbl_name = "t_112"; // Rename table to another database, and mock crash by failed point FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed); ASSERT_THROW( - typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name), + typeid_cast(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, tbl_name), DB::Exception); { // After fail point triggled we should have both meta file in disk Poco::File old_meta_file{db->getTableMetadataPath(tbl_name)}; ASSERT_TRUE(old_meta_file.exists()); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_TRUE(new_meta_file.exists()); // Old table should remain in db auto old_storage = db->tryGetTable(ctx, tbl_name); @@ -527,10 +523,10 @@ try ThreadPool thread_pool(2); db2->loadTables(ctx, &thread_pool, true); - Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name)); + Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name)); ASSERT_FALSE(new_meta_file.exists()); - auto storage = db2->tryGetTable(ctx, to_tbl_name); + auto storage = db2->tryGetTable(ctx, tbl_name); ASSERT_EQ(storage, nullptr); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 06d6bf8b086..eef646fe8f7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -393,19 +393,12 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name) { - if (clean_rename) - { - path_pool->rename(new_database_name, new_table_name, clean_rename); - } - else - { - // Note: Only for mocking test, should not run into here in product env. - LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name); - - // Remove all background task first - shutdown(); - path_pool->rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk - } + RUNTIME_ASSERT(clean_rename, + log, + "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", + new_database_name, + new_table_name); + path_pool->rename(new_database_name, new_table_name, clean_rename); // TODO: replacing these two variables is not atomic, but could be good enough? table_name.swap(new_table_name); @@ -501,8 +494,6 @@ void DeltaMergeStore::drop() dropAllSegments(false); storage_pool->drop(); - SYNC_FOR("before"); - // Drop data in storage path pool path_pool->drop(/*recursive=*/true, /*must_success=*/false); LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index e23640f618a..f3d1daa739a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index ac03b509f18..4406cf06289 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -207,10 +207,12 @@ try // Rename database name before store object is created. const String new_db_name = "new_" + storage->getDatabaseName(); - storage->rename(path_name, new_db_name, table_name, table_name); + const String new_display_table_name = "new_" + storage->getTableName(); + storage->rename(path_name, new_db_name, table_name, new_display_table_name); ASSERT_FALSE(storage->storeInited()); ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); + ASSERT_EQ(storage->getTableInfo().name, new_display_table_name); // prepare block data Block sample; @@ -231,9 +233,8 @@ try } // TiFlash always use t_{table_id} as table name - String new_table_name = storage->getTableName(); - storage->rename(path_name, new_db_name, new_table_name, new_table_name); - ASSERT_EQ(storage->getTableName(), new_table_name); + storage->rename(path_name, new_db_name, table_name, table_name); + ASSERT_EQ(storage->getTableName(), table_name); ASSERT_EQ(storage->getDatabaseName(), new_db_name); storage->drop(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 6eb6a16736b..31d9ded2551 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1198,59 +1198,26 @@ void StorageDeltaMerge::rename( // For DatabaseTiFlash, simply update store's database is OK. // `store->getTableName() == new_table_name` only keep for mock test. bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; - if (likely(clean_rename)) + RUNTIME_ASSERT(clean_rename, + log, + "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", + new_database_name, + new_table_name); + if (storeInited()) { - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - return; - } - std::lock_guard lock(store_mutex); - if (storeInited()) - { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); - } - else - { - table_column_info->db_name = new_database_name; - table_column_info->table_name = new_table_name; - } + _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); return; } - - /// Note that this routine is only left for CI tests. `clean_rename` should always be true in production env. - auto & store = getAndMaybeInitStore(); - - // For DatabaseOrdinary, we need to rename data path, then recreate a new store. - const String new_path = new_path_to_db + "/" + new_table_name; - - if (Poco::File{new_path}.exists()) - throw Exception( - fmt::format("Target path already exists: {}", new_path), - ErrorCodes::DIRECTORY_ALREADY_EXISTS); - - // flush store and then reset store to new path - store->flushCache(global_context, RowKeyRange::newAll(is_common_handle, rowkey_column_size)); - ColumnDefines table_column_defines = store->getTableColumns(); - ColumnDefine handle_column_define = store->getHandle(); - DeltaMergeStore::Settings settings = store->getSettings(); - - // remove background tasks - store->shutdown(); - // rename directories for multi disks - store->rename(new_path, clean_rename, new_database_name, new_table_name); - // generate a new store - store = std::make_shared( - global_context, - data_path_contains_database_name, - new_database_name, - new_table_name, - tidb_table_info.id, - std::move(table_column_defines), - std::move(handle_column_define), - is_common_handle, - rowkey_column_size, - settings); + std::lock_guard lock(store_mutex); + if (storeInited()) + { + _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); + } + else + { + table_column_info->db_name = new_database_name; + table_column_info->table_name = new_table_name; + } } String StorageDeltaMerge::getTableName() const diff --git a/tests/delta-merge-test/raft/schema/rename_on_read.test b/tests/delta-merge-test/raft/schema/rename_on_read.test deleted file mode 100644 index 40eb66277a9..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_read.test +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> select * from default.test -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> select * from default.test -=> select * from default.test " --schema_version "1000000 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -=> select * from default.test1 " --schema_version "1000000 - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/txn_mock/partition_table.test b/tests/delta-merge-test/raft/txn_mock/partition_table.test index 2f8e67a61a8..43ace7094e7 100644 --- a/tests/delta-merge-test/raft/txn_mock/partition_table.test +++ b/tests/delta-merge-test/raft/txn_mock/partition_table.test @@ -21,8 +21,6 @@ => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -94,16 +92,9 @@ │ 0 │ └──────────────┘ -=> DBGInvoke __rename_tidb_table(default, test, test1) -=> DBGInvoke __refresh_schemas() -=> select count(*) from default.test1_9997 -┌─count()─┐ -│ 2 │ -└─────────┘ - => DBGInvoke __drop_tidb_table(default, test1) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9999) +=> DBGInvoke is_tombstone(default, test_9999) ┌─is_tombstone(default, test_9999)─┐ │ true │ └──────────────────────────────────┘ @@ -113,8 +104,6 @@ => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 -=> drop table if exists default.test1_9997 -=> drop table if exists default.test1_9999 => DBGInvoke __enable_schema_sync_service('true') => DBGInvoke __clean_up_region() From 65dbd0d1246ce1c21896637d66baea4a43f0e819 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 1 Sep 2022 14:20:51 +0800 Subject: [PATCH 07/12] Fix mock test Signed-off-by: JaySon-Huang --- tests/delta-merge-test/ddl/alter.test | 16 ---------------- .../raft/txn_mock/partition_table.test | 4 +--- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/tests/delta-merge-test/ddl/alter.test b/tests/delta-merge-test/ddl/alter.test index 4bf405ac9e1..c058e5cdc81 100644 --- a/tests/delta-merge-test/ddl/alter.test +++ b/tests/delta-merge-test/ddl/alter.test @@ -72,21 +72,5 @@ └───┴──────┴───────┴──────┘ -## rename table ->> drop table if exists dm_test_renamed ->> rename table dm_test to dm_test_renamed ->> select * from dm_test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.dm_test doesn't exist.. - ->> select * from dm_test_renamed -┌─a─┬────b─┬─────c─┬────d─┐ -│ 1 │ 0 │ 0 │ \N │ -│ 2 │ 1024 │ 65535 │ 4096 │ -│ 3 │ 2048 │ 65536 │ \N │ -└───┴──────┴───────┴──────┘ - - ## Clean up >> drop table if exists dm_test ->> drop table if exists dm_test_renamed diff --git a/tests/delta-merge-test/raft/txn_mock/partition_table.test b/tests/delta-merge-test/raft/txn_mock/partition_table.test index 43ace7094e7..84b8044260f 100644 --- a/tests/delta-merge-test/raft/txn_mock/partition_table.test +++ b/tests/delta-merge-test/raft/txn_mock/partition_table.test @@ -17,7 +17,6 @@ => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 @@ -92,7 +91,7 @@ │ 0 │ └──────────────┘ -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() => DBGInvoke is_tombstone(default, test_9999) ┌─is_tombstone(default, test_9999)─┐ @@ -100,7 +99,6 @@ └──────────────────────────────────┘ => drop table if exists default.test -=> drop table if exists default.test1 => drop table if exists default.test_9997 => drop table if exists default.test_9998 => drop table if exists default.test_9999 From 57f6cf83769b7a3bb0368cff1124d11e3f4ef481 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 1 Sep 2022 14:36:36 +0800 Subject: [PATCH 08/12] Clean mock tests for rename table Signed-off-by: JaySon-Huang --- .../raft/schema/partition_table_restart.test | 40 ++++------------ .../raft/schema/rename_on_write.test | 46 ------------------- .../raft/schema/rename_tables.test | 45 ------------------ 3 files changed, 10 insertions(+), 121 deletions(-) delete mode 100644 tests/delta-merge-test/raft/schema/rename_on_write.test delete mode 100644 tests/delta-merge-test/raft/schema/rename_tables.test diff --git a/tests/delta-merge-test/raft/schema/partition_table_restart.test b/tests/delta-merge-test/raft/schema/partition_table_restart.test index 893bb617af4..c7a5e488111 100644 --- a/tests/delta-merge-test/raft/schema/partition_table_restart.test +++ b/tests/delta-merge-test/raft/schema/partition_table_restart.test @@ -15,16 +15,11 @@ => DBGInvoke __enable_schema_sync_service('false') => DBGInvoke __drop_tidb_table(default, test) -=> DBGInvoke __drop_tidb_table(default, test1) => DBGInvoke __refresh_schemas() => drop table if exists default.test => drop table if exists default.test_9999 => drop table if exists default.test_9998 => drop table if exists default.test_9997 -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64', '', 'dt') => DBGInvoke __mock_tidb_partition(default, test, 9999) @@ -38,38 +33,23 @@ => DBGInvoke __reset_schemas() => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) => DBGInvoke __refresh_schemas() - -=> show tables -┌─name───────┐ -│ test1 │ -│ test1_9997 │ -│ test1_9998 │ -│ test1_9999 │ -└────────────┘ -=> select col_2 from default.test1_9997 -=> select * from default.test_9997 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9997 doesn't exist.. -=> select * from default.test_9998 -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test_9998 doesn't exist.. +=> select col_2 from default.test_9997 => DBGInvoke __reset_schemas() -=> DBGInvoke __drop_tidb_partition(default, test1, 9997) +=> DBGInvoke __drop_tidb_partition(default, test, 9997) => DBGInvoke __refresh_schemas() -=> DBGInvoke is_tombstone(default, test1_9997) -┌─is_tombstone(default, test1_9997)─┐ +=> DBGInvoke is_tombstone(default, test_9997) +┌─is_tombstone(default, test_9997)─┐ │ true │ └───────────────────────────────────┘ -=> select * from default.test1_9999 +=> select * from default.test_9999 -=> DBGInvoke __drop_tidb_table(default, test1) +=> DBGInvoke __drop_tidb_table(default, test) => DBGInvoke __refresh_schemas() -=> drop table if exists default.test1 -=> drop table if exists default.test1_9999 -=> drop table if exists default.test1_9998 -=> drop table if exists default.test1_9997 +=> drop table if exists default.test +=> drop table if exists default.test_9999 +=> drop table if exists default.test_9998 +=> drop table if exists default.test_9997 => DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_on_write.test b/tests/delta-merge-test/raft/schema/rename_on_write.test deleted file mode 100644 index 6ad2c809ce6..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_on_write.test +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#TODO: We can not mock this situation, ignore for now -#RETURN - -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, test) -=> drop table if exists default.test - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - -=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String', '', 'dt') -=> DBGInvoke __refresh_schemas() -=> DBGInvoke __put_region(4, 0, 100, default, test) -=> select col_1 from default.test -=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') -=> DBGInvoke __rename_tidb_table(default, test, test1) -#For DeltaTree, each write will trigger schema sync. -=> DBGInvoke __raft_insert_row(default, test1, 4, 50, 'test1', 1) -=> select * from default.test -Received exception from server (version {#WORD}): -Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. -=> select * from default.test1 -┌─col_1─┬─_tidb_rowid─┬─col_2─┐ -│ test1 │ 50 │ 1 │ -└───────┴─────────────┴───────┘ - -=> DBGInvoke __drop_tidb_table(default, test1) -=> drop table if exists default.test1 -=> DBGInvoke __enable_schema_sync_service('true') diff --git a/tests/delta-merge-test/raft/schema/rename_tables.test b/tests/delta-merge-test/raft/schema/rename_tables.test deleted file mode 100644 index 7c65d46d3e3..00000000000 --- a/tests/delta-merge-test/raft/schema/rename_tables.test +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2022 PingCAP, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Preparation. -=> DBGInvoke __enable_schema_sync_service('false') - -=> DBGInvoke __drop_tidb_table(default, t1) -=> DBGInvoke __drop_tidb_table(default, t2) -=> drop table if exists default.t1 -=> drop table if exists default.t2 -=> DBGInvoke __refresh_schemas() - -=> DBGInvoke __set_flush_threshold(1000000, 1000000) - - -=> DBGInvoke __create_tidb_tables(default, t1, t2) -# rename table -=> DBGInvoke __rename_tidb_tables(default, t1, r1, default, t2, r2) -=> DBGInvoke __refresh_schemas() -=> select database,name,engine from system.tables where database='default' and name='r1' -┌─database─┬─name─┬─engine─────┐ -│ default │ r1 │ DeltaMerge │ -└──────────┴──────┴────────────┘ -=> select database,name,engine from system.tables where database='default' and name='r2' -┌─database─┬─name─┬─engine─────┐ -│ default │ r2 │ DeltaMerge │ -└──────────┴──────┴────────────┘ - -# clean -=> DBGInvoke __drop_tidb_table(default, r1) -=> DBGInvoke __drop_tidb_table(default, r2) -=> drop table if exists default.r1 -=> drop table if exists default.r2 -=> DBGInvoke __enable_schema_sync_service('true') \ No newline at end of file From 77a43a564f770d3b700b34850ab3e4ede8565129 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 1 Sep 2022 14:48:39 +0800 Subject: [PATCH 09/12] Clean PathPool::rename Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- dbms/src/Storages/PathPool.cpp | 52 +++---------------- dbms/src/Storages/PathPool.h | 2 +- 3 files changed, 9 insertions(+), 47 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index eef646fe8f7..0d27d25d9b0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -398,7 +398,7 @@ void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_ "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", new_database_name, new_table_name); - path_pool->rename(new_database_name, new_table_name, clean_rename); + path_pool->rename(new_database_name, new_table_name); // TODO: replacing these two variables is not atomic, but could be good enough? table_name.swap(new_table_name); diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 2e7edd7435b..edb1ede0747 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -228,53 +228,15 @@ void StoragePathPool::clearPSV2ObsoleteData() drop_instance_data("data"); } -void StoragePathPool::rename(const String & new_database, const String & new_table, bool clean_rename) +void StoragePathPool::rename(const String & new_database, const String & new_table) { - if (unlikely(new_database.empty() || new_table.empty())) - throw Exception(fmt::format("Can not rename for PathPool to {}.{}", new_database, new_table)); + RUNTIME_CHECK(!new_database.empty() && !new_table.empty(), new_database, new_table); + RUNTIME_CHECK(!path_need_database_name); - if (likely(clean_rename)) - { - // caller ensure that no path need to be renamed. - if (unlikely(path_need_database_name)) - throw Exception("Can not do clean rename with path_need_database_name is true!"); - - std::lock_guard lock{mutex}; - database = new_database; - table = new_table; - } - else - { - if (unlikely(file_provider->isEncryptionEnabled())) - throw Exception("Encryption is only supported when using clean_rename"); - - // Note: changing these path is not atomic, we may lost data if process is crash here. - std::lock_guard lock{mutex}; - // Get root path without database and table - for (auto & info : main_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - for (auto & info : latest_path_infos) - { - Poco::Path p(info.path); - p = p.parent().parent(); - if (path_need_database_name) - p = p.parent(); - auto new_path = getStorePath(p.toString() + "/data", new_database, new_table); - renamePath(info.path, new_path); - info.path = new_path; - } - - database = new_database; - table = new_table; - } + // The directories for storing table data is not changed, just rename related names. + std::lock_guard lock{mutex}; + database = new_database; + table = new_table; } void StoragePathPool::drop(bool recursive, bool must_success) diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 4f16511feff..59d8f463e4b 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -412,7 +412,7 @@ class StoragePathPool void clearPSV2ObsoleteData(); - void rename(const String & new_database, const String & new_table, bool clean_rename); + void rename(const String & new_database, const String & new_table); void drop(bool recursive, bool must_success = true); From 89233d310e13de31bbc601afdba45623fc075d2a Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 1 Sep 2022 20:08:55 +0800 Subject: [PATCH 10/12] Address comment Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 31 ++++++++++++++++--- dbms/src/Storages/PathPool.cpp | 2 ++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0d27d25d9b0..27c49556949 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -43,6 +43,9 @@ #include #include +#include "Common/Exception.h" +#include "Poco/Exception.h" + namespace ProfileEvents { extern const Event DMWriteBlock; @@ -348,6 +351,8 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); if (unlikely(!dmfile)) { + // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been + // called in another thread. Just try to clean if any id is left. try { delegate.removeDTFile(id); @@ -362,11 +367,27 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) } else if (dmfile->canGC()) { - // scanner should only return dtfiles that can GC, - // just another check here. - delegate.removeDTFile(dmfile->fileId()); - dmfile->remove(file_provider); - LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); + // StoragePathPool::drop may be called concurrently, ignore and continue next file if any exception thrown + String err_msg; + try + { + // scanner should only return dtfiles that can GC, + // just another check here. + delegate.removeDTFile(dmfile->fileId()); + dmfile->remove(file_provider); + } + catch (DB::Exception & e) + { + err_msg = e.message(); + } + catch (Poco::Exception & e) + { + err_msg = e.message(); + } + if (err_msg.empty()) + LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path()); + else + LOG_FMT_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={}, err_msg={}", dmfile->path(), err_msg); } } } diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index edb1ede0747..465874564f1 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -259,6 +259,8 @@ void StoragePathPool::drop(bool recursive, bool must_success) total_bytes += file_size; } global_capacity->freeUsedSize(path_info.path, total_bytes); + // clear in case delegator->removeDTFile is called after `drop` + dt_file_path_map.clear(); } } catch (Poco::DirectoryNotEmptyException & e) From ec652f64cdcf85302bacdbb827aa1b80e0740cec Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 2 Sep 2022 17:34:09 +0800 Subject: [PATCH 11/12] Fix include header path Signed-off-by: JaySon-Huang --- .../Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h index 26ff9c109f4..51c118e9172 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h +++ b/dbms/src/Storages/Page/V3/PageDirectory/ExternalIdsByNamespace.h @@ -17,6 +17,10 @@ #include #include +#include +#include +#include +#include #include namespace DB::PS::V3 From ac9879329bb427ffc265f2566a7cdec7fa3dbee7 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 2 Sep 2022 18:38:32 +0800 Subject: [PATCH 12/12] Remove useless clean_rename param Signed-off-by: JaySon-Huang --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 12 +++------- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 22 ++++++++++--------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 27c49556949..ab47848e63b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -21,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -43,9 +45,6 @@ #include #include -#include "Common/Exception.h" -#include "Poco/Exception.h" - namespace ProfileEvents { extern const Event DMWriteBlock; @@ -412,13 +411,8 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) blockable_background_pool_handle->wake(); } -void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name) +void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, String new_table_name) { - RUNTIME_ASSERT(clean_rename, - log, - "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", - new_database_name, - new_table_name); path_pool->rename(new_database_name, new_table_name); // TODO: replacing these two variables is not atomic, but could be good enough? diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 897e17410db..3dc9fca4f08 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -314,7 +314,7 @@ class DeltaMergeStore : private boost::noncopyable return table_name; } - void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name); + void rename(String new_path, String new_database_name, String new_table_name); void clearData(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 31d9ded2551..8c9f115a472 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1195,23 +1195,25 @@ void StorageDeltaMerge::rename( const String & new_display_table_name) { tidb_table_info.name = new_display_table_name; // update name in table info - // For DatabaseTiFlash, simply update store's database is OK. - // `store->getTableName() == new_table_name` only keep for mock test. - bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; - RUNTIME_ASSERT(clean_rename, - log, - "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", - new_database_name, - new_table_name); + { + // For DatabaseTiFlash, simply update store's database is OK. + // `store->getTableName() == new_table_name` only keep for mock test. + bool clean_rename = !data_path_contains_database_name && getTableName() == new_table_name; + RUNTIME_ASSERT(clean_rename, + log, + "should never rename the directories when renaming table, new_database_name={}, new_table_name={}", + new_database_name, + new_table_name); + } if (storeInited()) { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); + _store->rename(new_path_to_db, new_database_name, new_table_name); return; } std::lock_guard lock(store_mutex); if (storeInited()) { - _store->rename(new_path_to_db, clean_rename, new_database_name, new_table_name); + _store->rename(new_path_to_db, new_database_name, new_table_name); } else {