Skip to content

Commit

Permalink
PageStorage: Fix unexpected dmfile removed after shutdown (pingcap#6558
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and Lloyd-Pottiger committed Feb 22, 2023
1 parent 41c08db commit c3a2e38
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 116 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ struct ContextShared
return;
shutdown_called = true;

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
// shutting down the tables.
global_storage_pool->shutdown();
}

/** At this point, some tables may have threads that block our mutex.
* To complete them correctly, we will copy the current list of tables,
* and ask them all to finish their work.
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,11 @@ void DeltaMergeStore::shutdown()
return;

LOG_TRACE(log, "Shutdown DeltaMerge start");
// shutdown before unregister to avoid conflict between this thread and background gc thread on the `ExternalPagesCallbacks`
// because PageStorage V2 doesn't have any lock protection on the `ExternalPagesCallbacks`.(The order doesn't matter for V3)
// Must shutdown storage path pool to make sure the DMFile remove callbacks
// won't remove dmfiles unexpectly.
path_pool->shutdown();
// shutdown storage pool and clean up the local DMFile remove callbacks
storage_pool->shutdown();
storage_pool->dataUnregisterExternalPagesCallbacks(storage_pool->getNamespaceId());

background_pool.removeTask(background_task_handle);
blockable_background_pool.removeTask(blockable_background_pool_handle);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ using SegmentIdSet = std::unordered_set<UInt64>;
struct ExternalDTFileInfo;
struct GCOptions;

namespace tests
{
class DeltaMergeStoreTest;
}

inline static const PageId DELTA_MERGE_FIRST_SEGMENT_ID = 1;

struct SegmentStats
Expand Down Expand Up @@ -155,6 +160,7 @@ struct StoreStats
class DeltaMergeStore : private boost::noncopyable
{
public:
friend class ::DB::DM::tests::DeltaMergeStoreTest;
struct Settings
{
NotCompress not_compress_columns{};
Expand Down
89 changes: 69 additions & 20 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
#include <Encryption/FileProvider.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/GCOptions.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/TMTContext.h>

#include <magic_enum.hpp>
#include <memory>

namespace CurrentMetrics
{
Expand All @@ -36,20 +39,33 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{
void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)

// A callback class for scanning the DMFiles on local filesystem
class LocalDMFileGcScanner final
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` dropped, we must
// make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), file_provider = global_context.getFileProvider()]() {
private:
// !!! Warning !!!
// Should only keep a weak ref of storage path pool since
// this callback instance may still valid inside the PageStorage
// even after the DeltaMerge storage is shutdown or released.
std::weak_ptr<StoragePathPool> path_pool_weak_ref;
FileProviderPtr file_provider;

public:
LocalDMFileGcScanner(std::weak_ptr<StoragePathPool> path_pool_, FileProviderPtr provider)
: path_pool_weak_ref(std::move(path_pool_))
, file_provider(std::move(provider))
{}

ExternalPageCallbacks::PathAndIdsVec operator()()
{
ExternalPageCallbacks::PathAndIdsVec path_and_ids_vec;

// If the StoragePathPool is invalid, meaning we call `scanner` after dropping the table,
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `scanner` after shutdowning or dropping the table,
// simply return an empty list is OK.
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
if (!path_pool || path_pool->isShutdown())
return path_and_ids_vec;

// Return the DTFiles on disks.
Expand All @@ -66,14 +82,35 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
path_and_ids_vec.emplace_back(root_path, std::move(file_ids_in_current_path));
}
return path_and_ids_vec;
};
callbacks.remover = [path_pool_weak_ref = std::weak_ptr<StoragePathPool>(path_pool), //
file_provider = global_context.getFileProvider(),
logger = log](const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids) {
// If the StoragePathPool is invalid, meaning we call `remover` after dropping the table,
// simply skip is OK.
}
};

// A callback class for removing the DMFiles on local filesystem
class LocalDMFileGcRemover final
{
private:
// !!! Warning !!!
// Should only keep a weak ref of storage path pool since
// this callback instance may still valid inside the PageStorage
// even after the DeltaMerge storage is shutdown or released.
std::weak_ptr<StoragePathPool> path_pool_weak_ref;
FileProviderPtr file_provider;
LoggerPtr logger;

public:
LocalDMFileGcRemover(std::weak_ptr<StoragePathPool> path_pool_, FileProviderPtr provider, LoggerPtr log)
: path_pool_weak_ref(std::move(path_pool_))
, file_provider(std::move(provider))
, logger(std::move(log))
{}

void operator()(const ExternalPageCallbacks::PathAndIdsVec & path_and_ids_vec, const std::set<PageId> & valid_ids)
{
// If the StoragePathPool is invalid or shutdown flag is set,
// meaning we call `remover` after shutdowning or dropping the table,
// we must skip because the `valid_ids` is not reliable!
auto path_pool = path_pool_weak_ref.lock();
if (!path_pool)
if (!path_pool || path_pool->isShutdown())
return;

SYNC_FOR("before_DeltaMergeStore::callbacks_remover_remove");
Expand Down Expand Up @@ -102,6 +139,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
LOG_INFO(logger,
"GC try remove useless DM file, but file not found and may have been removed, dmfile={}",
DMFile::getPathByStatus(path, id, DMFile::Status::READABLE));
continue; // next file
}
else if (dmfile->canGC())
{
Expand All @@ -126,19 +164,30 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
LOG_INFO(logger, "GC removed useless DM file, dmfile={}", dmfile->path());
else
LOG_INFO(logger, "GC try remove useless DM file, but error happen, dmfile={} err_msg={}", dmfile->path(), err_msg);
continue; // next file
}
}
}
};
}
};

void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
{
// Callbacks for cleaning outdated DTFiles. Note that there is a chance
// that callbacks is called after the `DeltaMergeStore` shutdown or dropped,
// we must make the callbacks safe.
ExternalPageCallbacks callbacks;
callbacks.ns_id = storage_pool->getNamespaceId();
callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider());
callbacks.remover = LocalDMFileGcRemover(std::weak_ptr<StoragePathPool>(path_pool), global_context.getFileProvider(), log);
// remember to unregister it when shutdown
storage_pool->dataRegisterExternalPagesCallbacks(callbacks);
storage_pool->enableGC();
storage_pool->startup(std::move(callbacks));

background_task_handle = background_pool.addTask([this] { return handleBackgroundTask(false); });

blockable_background_pool_handle = blockable_background_pool.addTask([this] { return handleBackgroundTask(true); });

// Do place delta index.
// Generate place delta index tasks
for (auto & [end, segment] : segments)
{
(void)end;
Expand Down
77 changes: 42 additions & 35 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ GlobalStoragePool::GlobalStoragePool(const PathPool & path_pool, Context & globa

GlobalStoragePool::~GlobalStoragePool()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
shutdown();
}

void GlobalStoragePool::restore()
Expand All @@ -132,6 +128,15 @@ void GlobalStoragePool::restore()
false);
}

void GlobalStoragePool::shutdown()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = {};
}
}

FileUsageStatistics GlobalStoragePool::getLogFileUsage() const
{
return log_storage->getFileUsageStatistics();
Expand Down Expand Up @@ -535,50 +540,68 @@ StoragePool::~StoragePool()
shutdown();
}

void StoragePool::enableGC()
{
// The data in V3 will be GCed by `GlobalStoragePool::gc`, only register gc task under only v2/mix mode
if (run_mode == PageStorageRunMode::ONLY_V2 || run_mode == PageStorageRunMode::MIX_MODE)
{
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
}
}

void StoragePool::dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks)
void StoragePool::startup(ExternalPageCallbacks && callbacks)
{
switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
// For V2, we need a per physical table gc handle to perform the gc of its PageStorage instances.
data_storage_v2->registerExternalPagesCallbacks(callbacks);
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
break;
}
case PageStorageRunMode::ONLY_V3:
{
// For V3, the GC is handled by `GlobalStoragePool::gc`, just register callbacks is OK.
data_storage_v3->registerExternalPagesCallbacks(callbacks);
break;
}
case PageStorageRunMode::MIX_MODE:
{
// We have transformed all pages from V2 to V3 in `restore`, so
// only need to register callbacks for V3.
// For V3, the GC is handled by `GlobalStoragePool::gc`.
// Since we have transformed all external pages from V2 to V3 in `StoragePool::restore`,
// just register callbacks to V3 is OK
data_storage_v3->registerExternalPagesCallbacks(callbacks);
// we still need a gc_handle to reclaim the V2 disk space.
gc_handle = global_context.getBackgroundPool().addTask([this] { return this->gc(global_context.getSettingsRef()); });
break;
}
default:
throw Exception(fmt::format("Unknown PageStorageRunMode {}", static_cast<UInt8>(run_mode)), ErrorCodes::LOGICAL_ERROR);
}
}

void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id)
void StoragePool::shutdown()
{
// Note: Should reset the gc_handle before unregistering the pages callbacks
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}

switch (run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
data_storage_v2->unregisterExternalPagesCallbacks(ns_id);
break;
}
case PageStorageRunMode::ONLY_V3:
{
data_storage_v3->unregisterExternalPagesCallbacks(ns_id);
break;
}
case PageStorageRunMode::MIX_MODE:
{
// We have transformed all pages from V2 to V3 in `restore`, so
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
// We have transformed all external pages from V2 to V3 in `restore`, so
// only need to unregister callbacks for V3.
data_storage_v3->unregisterExternalPagesCallbacks(ns_id);
break;
Expand All @@ -588,7 +611,6 @@ void StoragePool::dataUnregisterExternalPagesCallbacks(NamespaceId ns_id)
}
}


bool StoragePool::doV2Gc(const Settings & settings)
{
bool done_anything = false;
Expand Down Expand Up @@ -629,21 +651,6 @@ bool StoragePool::gc(const Settings & settings, const Seconds & try_gc_period)
return doV2Gc(settings);
}

void StoragePool::shutdown()
{
if (gc_handle)
{
global_context.getBackgroundPool().removeTask(gc_handle);
gc_handle = nullptr;
}
if (run_mode != PageStorageRunMode::ONLY_V3)
{
meta_storage_v2->shutdown();
log_storage_v2->shutdown();
data_storage_v2->shutdown();
}
}

void StoragePool::drop()
{
shutdown();
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Poco/Logger.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageStorage.h>
Expand All @@ -36,7 +35,7 @@ namespace DM
class StoragePool;
using StoragePoolPtr = std::shared_ptr<StoragePool>;

static const std::chrono::seconds DELTA_MERGE_GC_PERIOD(60);
static constexpr std::chrono::seconds DELTA_MERGE_GC_PERIOD(60);

class GlobalStoragePool : private boost::noncopyable
{
Expand All @@ -51,6 +50,8 @@ class GlobalStoragePool : private boost::noncopyable

void restore();

void shutdown();

friend class StoragePool;
friend class ::DB::AsynchronousMetrics;

Expand Down Expand Up @@ -90,7 +91,7 @@ class StoragePool : private boost::noncopyable

NamespaceId getNamespaceId() const { return ns_id; }

PageStorageRunMode getPageStorageRunMode()
PageStorageRunMode getPageStorageRunMode() const
{
return run_mode;
}
Expand Down Expand Up @@ -141,16 +142,15 @@ class StoragePool : private boost::noncopyable
PageReader newMetaReader(ReadLimiterPtr read_limiter, bool snapshot_read, const String & tracing_id);
PageReader newMetaReader(ReadLimiterPtr read_limiter, PageStorage::SnapshotPtr & snapshot);

void enableGC();

void dataRegisterExternalPagesCallbacks(const ExternalPageCallbacks & callbacks);
// Register the clean up DMFiles callbacks to PageStorage.
// The callbacks will be unregister when `shutdown` is called.
void startup(ExternalPageCallbacks && callbacks);

void dataUnregisterExternalPagesCallbacks(NamespaceId ns_id);
// Shutdown the gc handle and DMFile callbacks
void shutdown();

bool gc(const Settings & settings, const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

void shutdown();

// Caller must cancel gc tasks before drop
void drop();

Expand Down
Loading

0 comments on commit c3a2e38

Please sign in to comment.