Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support storing all data in UniversalPageStorage #6717

Merged
merged 37 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ec66818
store data in uni ps
lidezhu Feb 7, 2023
6311b79
remove useless code
lidezhu Feb 7, 2023
e63738f
refine callback and getMaxId
lidezhu Feb 8, 2023
f041f4f
do some page format check at restart
lidezhu Feb 8, 2023
4165d75
refine code
lidezhu Feb 8, 2023
2cdeef8
small fix
lidezhu Feb 8, 2023
adda46e
small fix
lidezhu Feb 8, 2023
be33011
fix gtest
lidezhu Feb 8, 2023
2e7effd
fix region persistor restart
lidezhu Feb 8, 2023
4b4fc0a
rename
lidezhu Feb 8, 2023
8d9918c
fix conflict
lidezhu Feb 8, 2023
5a19c7f
fix gtest
lidezhu Feb 8, 2023
7d73de8
try run test in uni_ps mode
lidezhu Feb 8, 2023
e7509b4
add some unit test
lidezhu Feb 9, 2023
f52441a
Merge branch 'master' into store-data-in-uni-ps
lidezhu Feb 9, 2023
95f3213
fix gtest
lidezhu Feb 9, 2023
d47a03b
address comment
lidezhu Feb 9, 2023
138690d
small fix
lidezhu Feb 9, 2023
28a209a
avoid calcualte max id for prefix at runtime
lidezhu Feb 10, 2023
21e6c75
address comment
lidezhu Feb 12, 2023
8c78ac4
Update dbms/src/Storages/DeltaMerge/StoragePool.cpp
lidezhu Feb 13, 2023
cf78497
address comment
lidezhu Feb 13, 2023
a920775
fix print universal page id
lidezhu Feb 14, 2023
3ae3445
backport directory fix && test
JaySon-Huang Feb 14, 2023
d534e0d
fix for master
JaySon-Huang Feb 14, 2023
8935a29
Clean some hacks
JaySon-Huang Feb 14, 2023
5857b5f
fix max page id
lidezhu Feb 14, 2023
18168b5
address comment
lidezhu Feb 14, 2023
490e405
Merge pull request #10 from JaySon-Huang/store-data-in-uni-ps
lidezhu Feb 14, 2023
7b937f7
Merge branch 'master' into store-data-in-uni-ps
lidezhu Feb 14, 2023
19145ac
format
lidezhu Feb 14, 2023
17ce8eb
Merge branch 'store-data-in-uni-ps' of github.com:lidezhu/tics into s…
lidezhu Feb 14, 2023
8069e19
Update dbms/src/Storages/Page/V3/PageDirectory.h
lidezhu Feb 14, 2023
3ebd791
Update dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
lidezhu Feb 14, 2023
5e4cd29
Update dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp
lidezhu Feb 14, 2023
6a81b9a
fix unit test
lidezhu Feb 14, 2023
012df3d
fix unit test
lidezhu Feb 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 82 additions & 11 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include <Storages/IStorage.h>
#include <Storages/MarkCache.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/BackgroundService.h>
Expand Down Expand Up @@ -164,6 +165,10 @@ struct ContextShared
IORateLimiter io_rate_limiter;
PageStorageRunMode storage_run_mode = PageStorageRunMode::ONLY_V3;
DM::GlobalStoragePoolPtr global_storage_pool;

/// The PS instance available on Write Node.
UniversalPageStorageServicePtr ps_write;

TiFlashSecurityConfigPtr security_config;

/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
Expand Down Expand Up @@ -1545,22 +1550,35 @@ static bool isPageStorageV2Existed(const PathPool & path_pool)

static bool isPageStorageV3Existed(const PathPool & path_pool)
{
const std::vector<String> path_prefixes = {
PathPool::log_path_prefix,
PathPool::data_path_prefix,
PathPool::meta_path_prefix,
PathPool::kvstore_path_prefix,
};
for (const auto & path : path_pool.listGlobalPagePaths())
{
Poco::File dir(path);
if (!dir.exists())
continue;

std::vector<std::string> files;
dir.list(files);
if (!files.empty())
for (const auto & path_prefix : path_prefixes)
{
return true;
Poco::File dir(path + "/" + path_prefix);
if (dir.exists())
return true;
}
}
return false;
}

static bool isWriteNodeUniPSExisted(const PathPool & path_pool)
{
for (const auto & path : path_pool.listGlobalPagePaths())
{
Poco::File dir(path + "/" + PathPool::write_uni_path_prefix);
if (dir.exists())
return true;
}
return false;
}

void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 storage_page_format_version)
{
auto lock = getLock();
Expand All @@ -1577,21 +1595,36 @@ void Context::initializePageStorageMode(const PathPool & path_pool, UInt64 stora
case PageFormat::V1:
case PageFormat::V2:
{
if (isPageStorageV3Existed(path_pool))
if (isPageStorageV3Existed(path_pool) || isWriteNodeUniPSExisted(path_pool))
{
throw Exception("Invalid config `storage.format_version`, Current page V3 data exist. But using the PageFormat::V2."
throw Exception("Invalid config `storage.format_version`, newer format page data exist. But using the PageFormat::V2."
"If you are downgrading the format_version for this TiFlash node, you need to rebuild the data from scratch.",
ErrorCodes::LOGICAL_ERROR);
}
// not exist V3
// not exist newer format page data
shared->storage_run_mode = PageStorageRunMode::ONLY_V2;
return;
}
case PageFormat::V3:
{
if (isWriteNodeUniPSExisted(path_pool))
{
throw Exception("Invalid config `storage.format_version`, newer format page data exist. But using the PageFormat::V3."
"If you are downgrading the format_version for this TiFlash node, you need to rebuild the data from scratch.",
ErrorCodes::LOGICAL_ERROR);
}
shared->storage_run_mode = isPageStorageV2Existed(path_pool) ? PageStorageRunMode::MIX_MODE : PageStorageRunMode::ONLY_V3;
return;
}
case PageFormat::V4:
{
if (isPageStorageV2Existed(path_pool) || isPageStorageV3Existed(path_pool))
{
throw Exception("Uni PS can only be enabled on a fresh start", ErrorCodes::LOGICAL_ERROR);
}
shared->storage_run_mode = PageStorageRunMode::UNI_PS;
return;
}
default:
throw Exception(fmt::format("Can't detect the format version of Page [page_version={}]", storage_page_format_version),
ErrorCodes::LOGICAL_ERROR);
Expand All @@ -1617,6 +1650,7 @@ bool Context::initializeGlobalStoragePoolIfNeed(const PathPool & path_pool)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalStoragePool has already been initialized.");
shared->global_storage_pool->shutdown();
}
CurrentMetrics::set(CurrentMetrics::GlobalStorageRunMode, static_cast<UInt8>(shared->storage_run_mode));
if (shared->storage_run_mode == PageStorageRunMode::MIX_MODE || shared->storage_run_mode == PageStorageRunMode::ONLY_V3)
Expand Down Expand Up @@ -1646,6 +1680,43 @@ DM::GlobalStoragePoolPtr Context::getGlobalStoragePool() const
return shared->global_storage_pool;
}

void Context::initializeWriteNodePageStorageIfNeed(const PathPool & path_pool)
{
auto lock = getLock();
if (shared->storage_run_mode == PageStorageRunMode::UNI_PS)
{
if (shared->ps_write)
{
// GlobalStoragePool may be initialized many times in some test cases for restore.
LOG_WARNING(shared->log, "GlobalUniversalPageStorage(WriteNode) has already been initialized.");
}
PageStorageConfig config;
shared->ps_write = UniversalPageStorageService::create( //
*this,
"write",
path_pool.getPSDiskDelegatorGlobalMulti(PathPool::write_uni_path_prefix),
config);
LOG_INFO(shared->log, "initialized GlobalUniversalPageStorage(WriteNode)");
}
else
{
shared->ps_write = nullptr;
}
}

UniversalPageStoragePtr Context::getWriteNodePageStorage() const
{
auto lock = getLock();
if (shared->ps_write)
{
return shared->ps_write->getUniversalPageStorage();
}
else
{
return nullptr;
}
}

UInt16 Context::getTCPPort() const
{
auto lock = getLock();
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ using Dependencies = std::vector<DatabaseAndTableName>;
using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
using TableAndCreateASTs = std::map<String, TableAndCreateAST>;

class UniversalPageStorage;
using UniversalPageStoragePtr = std::shared_ptr<UniversalPageStorage>;

/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
Expand Down Expand Up @@ -426,6 +429,9 @@ class Context
bool initializeGlobalStoragePoolIfNeed(const PathPool & path_pool);
DM::GlobalStoragePoolPtr getGlobalStoragePool() const;

void initializeWriteNodePageStorageIfNeed(const PathPool & path_pool);
UniversalPageStoragePtr getWriteNodePageStorage() const;

/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));

global_context->initializeWriteNodePageStorageIfNeed(global_context->getPathPool());

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ dt_open_file_max_idle_seconds = 20
dt_page_gc_low_write_prob = 0.2
)"};
auto & global_ctx = TiFlashTestEnv::getGlobalContext();
if (global_ctx.getPageStorageRunMode() == PageStorageRunMode::UNI_PS)
{
// don't support reload uni ps config through region persister
return;
}
auto & global_path_pool = global_ctx.getPathPool();
RegionManager region_manager;
RegionPersister persister(global_ctx, region_manager);
Expand Down Expand Up @@ -445,6 +450,11 @@ dt_page_gc_low_write_prob = 0.2
)"};

auto & global_ctx = TiFlashTestEnv::getGlobalContext();
if (global_ctx.getPageStorageRunMode() == PageStorageRunMode::UNI_PS)
{
// don't support reload uni ps config through storage pool
return;
}
std::unique_ptr<StoragePathPool> path_pool = std::make_unique<StoragePathPool>(global_ctx.getPathPool().withTable("test", "t1", false));
std::unique_ptr<DM::StoragePool> storage_pool = std::make_unique<DM::StoragePool>(global_ctx, /*ns_id*/ 100, *path_pool, "test.t1");

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ SegmentPtr Segment::restoreSegment( //
return segment;
}

void Segment::serialize(WriteBatch & wb)
void Segment::serialize(WriteBatchWrapper & wb)
{
MemoryWriteBuffer buf(0, SEGMENT_BUFFER_SIZE);
writeIntBinary(STORAGE_FORMAT_CURRENT.segment, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class Segment

static SegmentPtr restoreSegment(const LoggerPtr & parent_log, DMContext & context, PageIdU64 segment_id);

void serialize(WriteBatch & wb);
void serialize(WriteBatchWrapper & wb);

/// Attach a new ColumnFile into the Segment. The ColumnFile will be added to MemFileSet and flushed to disk later.
/// The block data of the passed in ColumnFile should be placed on disk before calling this function.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
this->files = files_;
}

void StableValueSpace::saveMeta(WriteBatch & meta_wb)
void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb)
{
MemoryWriteBuffer buf(0, 8192);
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
void setFiles(const DMFiles & files_, const RowKeyRange & range, DMContext * dm_context = nullptr);

PageIdU64 getId() const { return id; }
void saveMeta(WriteBatch & meta_wb);
void saveMeta(WriteBatchWrapper & meta_wb);

size_t getRows() const;
size_t getBytes() const;
Expand Down
Loading