Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: Fine grained lock on external callbacks #5699

Merged
merged 14 commits into from
Sep 5, 2022
Merged
36 changes: 16 additions & 20 deletions dbms/src/Databases/test/gtest_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,15 @@ try
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
}

const String to_tbl_name = "t_112";
const String to_tbl_display_name = "tbl_test";
{
// Rename table
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db, to_tbl_name, db_name, to_tbl_name);
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db, tbl_name, db_name, to_tbl_display_name);

auto old_storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(old_storage, nullptr);

auto storage = db->tryGetTable(ctx, to_tbl_name);
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), to_tbl_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
Expand All @@ -294,13 +291,13 @@ try
// Drop table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = db_name;
drop_query->table = to_tbl_name;
drop_query->table = tbl_name;
drop_query->if_exists = false;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, ctx);
drop_interpreter.execute();

auto storage = db->tryGetTable(ctx, to_tbl_name);
auto storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down Expand Up @@ -391,18 +388,18 @@ try
EXPECT_EQ(managed_storage->getDatabaseName(), db_name);
}

const String to_tbl_name = "t_112";
const String to_tbl_display_name = "tbl_test";
{
// Rename table
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name);
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, to_tbl_display_name);

auto old_storage = db->tryGetTable(ctx, tbl_name);
ASSERT_EQ(old_storage, nullptr);

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_NE(storage, nullptr);
EXPECT_EQ(storage->getName(), MutableSupport::delta_tree_storage_name);
EXPECT_EQ(storage->getTableName(), to_tbl_name);
EXPECT_EQ(storage->getTableName(), tbl_name);

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
EXPECT_EQ(managed_storage->getDatabaseName(), db2_name);
Expand All @@ -412,13 +409,13 @@ try
// Drop table
auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = db2_name;
drop_query->table = to_tbl_name;
drop_query->table = tbl_name;
drop_query->if_exists = false;
ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, ctx);
drop_interpreter.execute();

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down Expand Up @@ -501,18 +498,17 @@ try
EXPECT_FALSE(db->empty(ctx));
EXPECT_TRUE(db->isTableExist(ctx, tbl_name));

const String to_tbl_name = "t_112";
// Rename table to another database, and mock crash by failed point
FailPointHelper::enableFailPoint(FailPoints::exception_before_rename_table_old_meta_removed);
ASSERT_THROW(
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, to_tbl_name, db2_name, to_tbl_name),
typeid_cast<DatabaseTiFlash *>(db.get())->renameTable(ctx, tbl_name, *db2, tbl_name, db2_name, tbl_name),
DB::Exception);

{
// After fail point triggled we should have both meta file in disk
Poco::File old_meta_file{db->getTableMetadataPath(tbl_name)};
ASSERT_TRUE(old_meta_file.exists());
Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name));
Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name));
ASSERT_TRUE(new_meta_file.exists());
// Old table should remain in db
auto old_storage = db->tryGetTable(ctx, tbl_name);
Expand All @@ -527,10 +523,10 @@ try
ThreadPool thread_pool(2);
db2->loadTables(ctx, &thread_pool, true);

Poco::File new_meta_file(db2->getTableMetadataPath(to_tbl_name));
Poco::File new_meta_file(db2->getTableMetadataPath(tbl_name));
ASSERT_FALSE(new_meta_file.exists());

auto storage = db2->tryGetTable(ctx, to_tbl_name);
auto storage = db2->tryGetTable(ctx, tbl_name);
ASSERT_EQ(storage, nullptr);
}

Expand Down
119 changes: 82 additions & 37 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Logger.h>
Expand All @@ -21,6 +22,7 @@
#include <Core/SortDescription.h>
#include <Functions/FunctionsConversion.h>
#include <Interpreters/sortBlock.h>
#include <Poco/Exception.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DMSegmentThreadInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
Expand All @@ -41,6 +43,7 @@

#include <atomic>
#include <ext/scope_guard.h>
#include <memory>

namespace ProfileEvents
{
Expand Down Expand Up @@ -196,7 +199,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
size_t rowkey_column_size_,
const Settings & settings_)
: global_context(db_context.getGlobalContext())
, path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name))
, path_pool(std::make_shared<StoragePathPool>(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)))
, settings(settings_)
, db_name(db_name_)
, table_name(table_name_)
Expand All @@ -216,7 +219,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,

storage_pool = std::make_shared<StoragePool>(global_context,
ns_id,
path_pool,
*path_pool,
db_name_ + "." + table_name_);

// Restore existing dm files and set capacity for path_pool.
Expand Down Expand Up @@ -296,25 +299,46 @@ DeltaMergeStore::~DeltaMergeStore()

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` dropped, we must
// make the callbacks safe.
ExternalPageCallbacks callbacks;
// V2 callbacks for cleaning DTFiles
callbacks.scanner = [this]() {
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), file_provider = global_context.getFileProvider()]() {
ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec;
auto delegate = path_pool.getStableDiskDelegator();

// If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table,
// simply return an empty list is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
return path_and_ids_vec;

// Return the DTFiles on disks.
auto delegate = path_pool->getStableDiskDelegator();
// Only return the DTFiles can be GC. The page id of not able to be GC files, which is being ingested or in the middle of
// SegmentSplit/Merge/MergeDelta, is not yet applied
// to PageStorage is marked as not able to be GC, so we don't return them and run the `remover`
DMFile::ListOptions options;
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
auto & path_and_ids = path_and_ids_vec.emplace_back();
path_and_ids.first = root_path;
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options);
for (auto id : file_ids_in_current_path)
path_and_ids.second.insert(id);
std::set<PageId> ids_under_path;
auto file_ids_in_current_path = DMFile::listAllInPath(file_provider, root_path, options);
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
return path_and_ids_vec;
};
callbacks.remover = [this](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
auto delegate = path_pool.getStableDiskDelegator();
callbacks.remover = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), //
file_provider = global_context.getFileProvider(),
logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
// If the StoragePathPool is invalid, meaning we call `remover` after dropping the table,
// simply skip is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
return;

SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove");
auto delegate = path_pool->getStableDiskDelegator();
for (const auto & [path, ids] : path_and_ids_vec)
{
for (auto id : ids)
Expand All @@ -323,18 +347,50 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
continue;

// Note that page_id is useless here.
auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
if (dmfile->canGC())
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
if (unlikely(!dmfile))
{
delegate.removeDTFile(dmfile->fileId());
dmfile->remove(global_context.getFileProvider());
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
// called in another thread. Just try to clean if any id is left.
try
{
delegate.removeDTFile(id);
}
catch (DB::Exception & e)
{
// just ignore
flowbehappy marked this conversation as resolved.
Show resolved Hide resolved
}
LOG_FMT_INFO(logger,
"GC try remove useless DM file, but file not found and may have been removed, dmfile={}",
DMFile::getPathByStatus(path, id, DMFile::Status::READABLE));
}
else if (dmfile->canGC())
{
// StoragePathPool::drop may be called concurrently, ignore and continue next file if any exception thrown
String err_msg;
try
{
// scanner should only return dtfiles that can GC,
// just another check here.
delegate.removeDTFile(dmfile->fileId());
dmfile->remove(file_provider);
}
catch (DB::Exception & e)
{
err_msg = e.message();
}
catch (Poco::Exception & e)
{
err_msg = e.message();
}
if (err_msg.empty())
LOG_FMT_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path());
else
LOG_FMT_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={}, err_msg={}", dmfile->path(), err_msg);
}

LOG_FMT_INFO(log, "GC removed useless dmfile: {}", dmfile->path());
}
}
};
callbacks.ns_id = storage_pool->getNamespaceId();
// remember to unregister it when shutdown
storage_pool->dataRegisterExternalPagesCallbacks(callbacks);
storage_pool->enableGC();
Expand All @@ -355,20 +411,9 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
blockable_background_pool_handle->wake();
}

void DeltaMergeStore::rename(String /*new_path*/, bool clean_rename, String new_database_name, String new_table_name)
void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, String new_table_name)
{
if (clean_rename)
{
path_pool.rename(new_database_name, new_table_name, clean_rename);
}
else
{
LOG_FMT_WARNING(log, "Applying heavy renaming for table {}.{} to {}.{}", db_name, table_name, new_database_name, new_table_name);

// Remove all background task first
shutdown();
path_pool.rename(new_database_name, new_table_name, clean_rename); // rename for multi-disk
}
path_pool->rename(new_database_name, new_table_name);

// TODO: replacing these two variables is not atomic, but could be good enough?
table_name.swap(new_table_name);
Expand Down Expand Up @@ -465,7 +510,7 @@ void DeltaMergeStore::drop()
storage_pool->drop();

// Drop data in storage path pool
path_pool.drop(/*recursive=*/true, /*must_success=*/false);
path_pool->drop(/*recursive=*/true, /*must_success=*/false);
LOG_FMT_INFO(log, "Drop DeltaMerge done [{}.{}]", db_name, table_name);
}

Expand Down Expand Up @@ -496,7 +541,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB:
// Because db_context could be a temporary object and won't last long enough during the query process.
// Like the context created by InterpreterSelectWithUnionQuery.
auto * ctx = new DMContext(db_context.getGlobalContext(),
path_pool,
*path_pool,
*storage_pool,
latest_gc_safe_point.load(std::memory_order_acquire),
settings.not_compress_columns,
Expand Down Expand Up @@ -704,7 +749,7 @@ std::tuple<String, PageId> DeltaMergeStore::preAllocateIngestFile()
if (shutdown_called.load(std::memory_order_relaxed))
return {};

auto delegator = path_pool.getStableDiskDelegator();
auto delegator = path_pool->getStableDiskDelegator();
auto parent_path = delegator.choosePath();
auto new_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__);
return {parent_path, new_id};
Expand All @@ -715,7 +760,7 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil
if (shutdown_called.load(std::memory_order_relaxed))
return;

auto delegator = path_pool.getStableDiskDelegator();
auto delegator = path_pool->getStableDiskDelegator();
delegator.addDTFile(file_id, file_size, parent_path);
}

Expand Down Expand Up @@ -2529,7 +2574,7 @@ void DeltaMergeStore::restoreStableFiles()
options.only_list_can_gc = false; // We need all files to restore the bytes on disk
options.clean_up = true;
auto file_provider = global_context.getFileProvider();
auto path_delegate = path_pool.getStableDiskDelegator();
auto path_delegate = path_pool->getStableDiskDelegator();
for (const auto & root_path : path_delegate.listPaths())
{
for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options))
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class DeltaMergeStore : private boost::noncopyable
return table_name;
}

void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name);
void rename(String new_path, String new_database_name, String new_table_name);

void clearData();

Expand Down Expand Up @@ -529,7 +529,7 @@ class DeltaMergeStore : private boost::noncopyable
#endif

Context & global_context;
StoragePathPool path_pool;
std::shared_ptr<StoragePathPool> path_pool;
Settings settings;
StoragePoolPtr storage_pool;

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ DMFilePtr DMFile::restore(
const ReadMetaMode & read_meta_mode)
{
String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
bool single_file_mode = Poco::File(path).isFile();
// The path may be dropped by another thread in some cases
auto poco_file = Poco::File(path);
if (!poco_file.exists())
return nullptr;

bool single_file_mode = poco_file.isFile();
DMFilePtr dmfile(new DMFile(
file_id,
page_id,
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ FileUsageStatistics GlobalStoragePool::getLogFileUsage() const

bool GlobalStoragePool::gc()
{
return gc(global_context.getSettingsRef(), true, DELTA_MERGE_GC_PERIOD);
return gc(global_context.getSettingsRef(), /*immediately=*/true, DELTA_MERGE_GC_PERIOD);
}

bool GlobalStoragePool::gc(const Settings & settings, bool immediately, const Seconds & try_gc_period)
Expand Down Expand Up @@ -445,7 +445,7 @@ PageStorageRunMode StoragePool::restore()
}
else
{
LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id);
LOG_FMT_INFO(logger, "Current pool.meta transform already done before restored [ns_id={}] ", ns_id);
}

if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0)
Expand All @@ -461,7 +461,7 @@ PageStorageRunMode StoragePool::restore()
}
else
{
LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}]", ns_id);
LOG_FMT_INFO(logger, "Current pool.data transform already done before restored [ns_id={}]", ns_id);
}

// Check number of valid pages in v2
Expand Down
Loading