Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage: make stress test support both v2 and v3 #3913

Merged
merged 2 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dbms/src/Storages/Page/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
add_subdirectory(V2)

# PageStorage Stress test
if (ENABLE_V3_PAGESTORAGE)
add_headers_and_sources(page_stress_testing stress)
add_headers_and_sources(page_stress_testing stress/workload)
add_executable(page_stress_testing EXCLUDE_FROM_ALL ${page_stress_testing_sources})
target_link_libraries(page_stress_testing dbms page_storage_v3)
target_include_directories(page_stress_testing PRIVATE stress)
target_compile_options(page_stress_testing PRIVATE -Wno-format -lc++) # turn off printf format check
endif()
8 changes: 0 additions & 8 deletions dbms/src/Storages/Page/V2/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ target_compile_options(gtests_page_storage PRIVATE -Wno-unknown-pragmas)
target_compile_definitions(gtests_page_storage PRIVATE DBMS_PUBLIC_GTEST)
add_check(gtests_page_storage)

# non googletest
add_headers_and_sources(page_stress_testing stress)
add_headers_and_sources(page_stress_testing stress/workload)
add_executable(page_stress_testing EXCLUDE_FROM_ALL ${page_stress_testing_sources})
target_link_libraries(page_stress_testing page_storage_v2)
target_include_directories(page_stress_testing PRIVATE stress)
target_compile_options(page_stress_testing PRIVATE -Wno-format -lc++) # turn off printf format check

# FIXME: mock disk full by FailPoint, and move this test into a unittest case.
add_executable(test_page_storage_write_disk_full test_page_storage_write_disk_full.cpp)
target_link_libraries(test_page_storage_write_disk_full dbms)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder

holder = DB::createMemHolder(buff, [&](char * p) { free(p); });

return std::make_shared<DB::ReadBufferFromMemory>(buff, buff_sz);
return std::make_shared<DB::ReadBufferFromMemory>(const_cast<char *>(buff), buff_sz);
}

void PSWriter::updatedRandomData()
{
size_t memory_size = approx_page_mb * DB::MB * 2;
if (unlikely(memory != nullptr))
if (memory == nullptr)
{
memory = static_cast<char *>(malloc(memory_size));
if (memory == nullptr)
Expand Down Expand Up @@ -131,7 +131,7 @@ void PSCommonWriter::updatedRandomData()
: batch_buffer_size);
size_t memory_size = single_buff_size * batch_buffer_nums;

if (likely(memory == nullptr))
if (memory == nullptr)
{
memory = static_cast<char *>(malloc(memory_size));
if (memory == nullptr)
Expand Down Expand Up @@ -170,7 +170,6 @@ bool PSCommonWriter::runImpl()
bytes_used += buffptr->buffer().size();
}

writing_page[index] = page_id;
ps->write(std::move(wb));
return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit);
}
Expand Down Expand Up @@ -302,7 +301,10 @@ DB::PageId PSWindowWriter::genRandomPageId()
auto random = std::round(distribution(gen));
// Move this "random" near the pageid_boundary, If "random" is still negative, then make it positive
random = std::abs(random + pageid_boundary);
return static_cast<DB::PageId>(random > pageid_boundary ? pageid_boundary++ : random);

auto page_id = static_cast<DB::PageId>(random > pageid_boundary ? pageid_boundary++ : random);
writing_page[index] = page_id;
return page_id;
}

void PSWindowReader::setWindowSize(size_t window_size_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ class PSSnapshotReader : public PSReader
protected:
size_t snapshots_hold_num;
size_t snapshot_get_interval_ms = 0;
std::list<PageStorage::SnapshotPtr> snapshots;
std::list<DB::PageStorage::SnapshotPtr> snapshots;
};
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ StressEnv StressEnv::parse(int argc, char ** argv)
("failpoints,F", value<std::vector<std::string>>(), "failpoint(s) to enable") //
("status_interval,S", value<UInt32>()->default_value(1), "Status statistics interval. 0 means no statistics") //
("situation_mask,M", value<UInt64>()->default_value(0), "Run special tests sequentially, example -M 2") //
("verify", value<bool>()->default_value(true), "Run special tests sequentially with verify."); //
("verify", value<bool>()->default_value(true), "Run special tests sequentially with verify.") //
("running_ps_version,V", value<UInt16>()->default_value(3), "Select a version of PageStorage. 2 or 3 can used");

po::variables_map options;
po::store(po::parse_command_line(argc, argv, desc), options);
Expand All @@ -68,6 +69,14 @@ StressEnv StressEnv::parse(int argc, char ** argv)
opt.status_interval = options["status_interval"].as<UInt32>();
opt.situation_mask = options["situation_mask"].as<UInt64>();
opt.verify = options["verify"].as<bool>();
opt.running_ps_version = options["running_ps_version"].as<UInt16>();

if (opt.running_ps_version != 2 && opt.running_ps_version != 3)
{
std::cerr << "Invalid running_ps_version, this arg should be 2 or 3." << std::endl;
std::cerr << desc << std::endl;
exit(0);
}

if (options.count("paths"))
opt.paths = options["paths"].as<std::vector<std::string>>();
Expand All @@ -93,6 +102,8 @@ void StressEnv::setup()
#ifdef FIU_ENABLE
fiu_init(0);
#endif


for (const auto & fp : failpoints)
{
DB::FailPointHelper::enableFailPoint(fp);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#pragma once

#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/PageStorage.h>
#include <fmt/format.h>

#include <atomic>

using namespace DB::PS::V2;
namespace Poco
{
class Logger;
}

using PSPtr = std::shared_ptr<DB::PS::V2::PageStorage>;
using PSPtr = std::shared_ptr<DB::PageStorage>;

enum StressEnvStat
{
Expand Down Expand Up @@ -70,6 +69,7 @@ struct StressEnv
size_t status_interval = 1;
size_t situation_mask = 0;
bool verify = true;
size_t running_ps_version = 3;

std::vector<std::string> paths;
std::vector<std::string> failpoints;
Expand All @@ -81,7 +81,8 @@ struct StressEnv
"num_writers: {}, num_readers: {}, init_pages: {}, clean_before_run: {}"
", timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}"
", avg_page_size_mb: {}, paths: [{}], failpoints: [{}]"
", status_interval: {}, situation_mask: {}, verify: {}."
", status_interval: {}, situation_mask: {}, verify: {}"
", running_pagestorage_version : {}."
"}}",
num_writers,
num_readers,
Expand All @@ -95,7 +96,8 @@ struct StressEnv
fmt::join(failpoints.begin(), failpoints.end(), ","),
status_interval,
situation_mask,
verify
verify,
running_ps_version
//
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <Encryption/MockKeyManager.h>
#include <PSWorkload.h>
#include <Poco/Logger.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <TestUtils/MockDiskDelegator.h>

void StressWorkload::onDumpResult()
Expand Down Expand Up @@ -47,7 +49,7 @@ void StressWorkload::onDumpResult()
}
}

void StressWorkload::initPageStorage(PageStorage::Config & config, String path_prefix)
void StressWorkload::initPageStorage(DB::PageStorage::Config & config, String path_prefix)
{
DB::FileProviderPtr file_provider = std::make_shared<DB::FileProvider>(std::make_shared<DB::MockKeyManager>(false), false);

Expand All @@ -67,8 +69,22 @@ void StressWorkload::initPageStorage(PageStorage::Config & config, String path_p
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(options.paths[0] + "/" + path_prefix);
}

ps = std::make_shared<DB::PS::V2::PageStorage>("stress_test", delegator, config, file_provider);
ps->restore();
if (options.running_ps_version == 2)
{
ps = std::make_shared<DB::PS::V2::PageStorage>("stress_test", delegator, config, file_provider);
ps->restore();
}
else if (options.running_ps_version == 3)
{
ps = std::make_shared<DB::PS::V3::PageStorageImpl>("stress_test", delegator, config, file_provider);
// FIXME : we need restore V3
}
else
{
throw DB::Exception(fmt::format("Invalid PageStorage version {}",
options.running_ps_version));
}

{
size_t num_of_pages = 0;
ps->traverse([&num_of_pages](const DB::Page & page) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
#include <PSStressEnv.h>
#include <Poco/ThreadPool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/PageStorage.h>
#include <fmt/format.h>

#define NORMAL_WORKLOAD 0
using namespace DB::PS::V2;
template <typename Child>
class StressWorkloadFunc
{
Expand Down Expand Up @@ -47,7 +46,7 @@ class StressWorkload
virtual void onDumpResult();

protected:
void initPageStorage(PageStorage::Config & config, String path_prefix = "");
void initPageStorage(DB::PageStorage::Config & config, String path_prefix = "");

void startBackgroundTimer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HeavyMemoryCostInGC
{
stop_watch.start();

PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HeavyRead : public StressWorkload

void run() override
{
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());
PSWriter::fillAllPages(ps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HeavySkewWriteRead : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class HeavyWrite : public StressWorkload

void run() override
{
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HighValidBigFileGCWorkload
{
stop_watch.start();

PageStorage::Config config;
DB::PageStorage::Config config;
config.file_max_size = 8ULL * DB::GB;
config.file_roll_size = 8ULL * DB::GB;
initPageStorage(config, name());
Expand All @@ -63,7 +63,7 @@ class HighValidBigFileGCWorkload

// Generate normal data in the same Pagefile
{
PageStorage::Config config;
DB::PageStorage::Config config;
config.file_max_size = DB::PAGE_FILE_MAX_SIZE;
config.file_roll_size = DB::PAGE_FILE_ROLL_SIZE;
initPageStorage(config, name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HoldSnapshotsLongTime : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class NormalWorkload
{
pool.addCapacity(1 + options.num_writers + options.num_readers);

PageStorage::Config config;
DB::PageStorage::Config config;
config.num_write_slots = options.num_writer_slots;
initPageStorage(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class PageStorageInMemoryCapacity : public StressWorkload

std::tie(total_mem, begin_virtual_size, begin_resident_size) = getCurrentMemory();
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ThousandsOfOffset : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down