Skip to content

Commit

Permalink
support storing all data in UniversalPageStorage (#6717)
Browse files Browse the repository at this point in the history
ref #6728
  • Loading branch information
lidezhu authored Feb 15, 2023
1 parent 34fc70f commit a5d6f86
Show file tree
Hide file tree
Showing 51 changed files with 1,741 additions and 398 deletions.
93 changes: 82 additions & 11 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -164,6 +165,10 @@ struct ContextShared
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalStoragePoolPtr global_storage_pool;

/// The PS instance available on Write Node.
UniversalPageStorageServicePtr ps_write;

TiFlashSecurityConfigPtr security_config;

/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
Expand Down Expand Up @@ -1545,22 +1550,35 @@ static bool isPageStorageV2Existed(const PathPool & path_pool)

static bool isPageStorageV3Existed(const PathPool & path_pool)
{
const std::vector<String> path_prefixes = {
PathPool::log_path_prefix,
PathPool::data_path_prefix,
PathPool::meta_path_prefix,
PathPool::kvstore_path_prefix,
};
for (const auto & path : path_pool.listGlobalPagePaths())
{
Poco::File dir(path);
if (!dir.exists())
continue;

std::vector<std::string> files;
dir.list(files);
if (!files.empty())
for (const auto & path_prefix : path_prefixes)
{
return true;
Poco::File dir(path + "/" + path_prefix);
if (dir.exists())
return true;
}
}
return false;
}

static bool isWriteNodeUniPSExisted(const PathPool & path_pool)
{
for (const auto & path : path_pool.listGlobalPagePaths())
{
Poco::File dir(path + "/" + PathPool::write_uni_path_prefix);
if (dir.exists())
return true;
}
return false;
}

void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version)
{
auto lock = getLock();
Expand All @@ -1577,21 +1595,36 @@ void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 stora
case PageFormat::V1:
case PageFormat::V2:
{
if (isPageStorageV3Existed(path_pool))
if (isPageStorageV3Existed(path_pool) || isWriteNodeUniPSExisted(path_pool))
{
throw Exception("Invalid config `storage.format_version`, Current page V3 data exist. But using the PageFormat::V2."
throw Exception("Invalid config `storage.format_version`, newer format page data exist. But using the PageFormat::V2."
"If you are downgrading the format_version for this TiFlash node, you need to rebuild the data from scratch.",
ErrorCodes::LOGICAL_ERROR);
}
// not exist V3
// not exist newer format page data
shared->storage_run_mode = PageStorageRunMode::ONLY_V2;
return;
}
case PageFormat::V3:
{
if (isWriteNodeUniPSExisted(path_pool))
{
throw Exception("Invalid config `storage.format_version`, newer format page data exist. But using the PageFormat::V3."
"If you are downgrading the format_version for this TiFlash node, you need to rebuild the data from scratch.",
ErrorCodes::LOGICAL_ERROR);
}
shared->storage_run_mode = isPageStorageV2Existed(path_pool) ? PageStorageRunMode::MIX_MODE : PageStorageRunMode::ONLY_V3;
return;
}
case PageFormat::V4:
{
if (isPageStorageV2Existed(path_pool) || isPageStorageV3Existed(path_pool))
{
throw Exception("Uni PS can only be enabled on a fresh start", ErrorCodes::LOGICAL_ERROR);
}
shared->storage_run_mode = PageStorageRunMode::UNI_PS;
return;
}
default:
throw Exception(fmt::format("Can't detect the format version of Page [page_version={}]", storage_page_format_version),
ErrorCodes::LOGICAL_ERROR);
Expand All @@ -1617,6 +1650,7 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
shared->global_storage_pool->shutdown();
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down Expand Up @@ -1646,6 +1680,43 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}

void Context::initializeWriteNodePageStorageIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
if (shared->storage_run_mode == PageStorageRunMode::UNI_PS)
{
if (shared->ps_write)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalUniversalPageStorage(WriteNode) has already been initialized.");
}
PageStorageConfig config;
shared->ps_write = UniversalPageStorageService::create( //
*this,
"write",
path_pool.getPSDiskDelegatorGlobalMulti(PathPool::write_uni_path_prefix),
config);
LOG_INFO(shared->log, "initialized GlobalUniversalPageStorage(WriteNode)");
}
else
{
shared->ps_write = nullptr;
}
}

UniversalPageStoragePtr Context::getWriteNodePageStorage() const
{
auto lock = getLock();
if (shared->ps_write)
{
return shared->ps_write->getUniversalPageStorage();
}
else
{
return nullptr;
}
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ using Dependencies = std::vector<DatabaseAndTableName>;
using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
using TableAndCreateASTs = std::map<String, TableAndCreateAST>;

class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
Expand Down Expand Up @@ -426,6 +429,9 @@ class Context
bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

void initializeWriteNodePageStorageIfNeed(const PathPool & path_pool);
UniversalPageStoragePtr getWriteNodePageStorage() const;

/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));

global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool());

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ dt_open_file_max_idle_seconds = 20
dt_page_gc_low_write_prob = 0.2
)"};
auto & global_ctx = TiFlashTestEnv::getGlobalContext();
if (global_ctx.getPageStorageRunMode() == PageStorageRunMode::UNI_PS)
{
// don't support reload uni ps config through region persister
return;
}
auto & global_path_pool = global_ctx.getPathPool();
RegionManager region_manager;
RegionPersister persister(global_ctx, region_manager);
Expand Down Expand Up @@ -445,6 +450,11 @@ dt_page_gc_low_write_prob = 0.2
)"};

auto & global_ctx = TiFlashTestEnv::getGlobalContext();
if (global_ctx.getPageStorageRunMode() == PageStorageRunMode::UNI_PS)
{
// don't support reload uni ps config through storage pool
return;
}
std::unique_ptr<StoragePathPool> path_pool = std::make_unique<StoragePathPool>(global_ctx.getPathPool().withTable("test", "t1", false));
std::unique_ptr<DM::StoragePool> storage_pool = std::make_unique<DM::StoragePool>(global_ctx, /*ns_id*/ 100, *path_pool, "test.t1");

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ SegmentPtr Segment::restoreSegment( //
return segment;
}

void Segment::serialize(WriteBatch & wb)
void Segment::serialize(WriteBatchWrapper & wb)
{
MemoryWriteBuffer buf(0, SEGMENT_BUFFER_SIZE);
writeIntBinary(STORAGE_FORMAT_CURRENT.segment, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class Segment

static SegmentPtr restoreSegment(const LoggerPtr & parent_log, DMContext & context, PageIdU64 segment_id);

void serialize(WriteBatch & wb);
void serialize(WriteBatchWrapper & wb);

/// Attach a new ColumnFile into the Segment. The ColumnFile will be added to MemFileSet and flushed to disk later.
/// The block data of the passed in ColumnFile should be placed on disk before calling this function.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
this->files = files_;
}

void StableValueSpace::saveMeta(WriteBatch & meta_wb)
void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb)
{
MemoryWriteBuffer buf(0, 8192);
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
void setFiles(const DMFiles & files_, const RowKeyRange & range, DMContext * dm_context = nullptr);

PageIdU64 getId() const { return id; }
void saveMeta(WriteBatch & meta_wb);
void saveMeta(WriteBatchWrapper & meta_wb);

size_t getRows() const;
size_t getBytes() const;
Expand Down
Loading

0 comments on commit a5d6f86

Please sign in to comment.