Skip to content

Commit

Permalink
PageStorage: make stress test support both v2 and v3 (#3913)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqizho committed Jan 25, 2022
1 parent dfeed24 commit c368dcc
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 36 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Storages/Page/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
add_subdirectory(V2)

# PageStorage Stress test
if (ENABLE_V3_PAGESTORAGE)
add_headers_and_sources(page_stress_testing stress)
add_headers_and_sources(page_stress_testing stress/workload)
add_executable(page_stress_testing EXCLUDE_FROM_ALL ${page_stress_testing_sources})
target_link_libraries(page_stress_testing dbms page_storage_v3)
target_include_directories(page_stress_testing PRIVATE stress)
target_compile_options(page_stress_testing PRIVATE -Wno-format -lc++) # turn off printf format check
endif()
8 changes: 0 additions & 8 deletions dbms/src/Storages/Page/V2/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ target_compile_options(gtests_page_storage PRIVATE -Wno-unknown-pragmas)
target_compile_definitions(gtests_page_storage PRIVATE DBMS_PUBLIC_GTEST)
add_check(gtests_page_storage)

# non googletest
add_headers_and_sources(page_stress_testing stress)
add_headers_and_sources(page_stress_testing stress/workload)
add_executable(page_stress_testing EXCLUDE_FROM_ALL ${page_stress_testing_sources})
target_link_libraries(page_stress_testing page_storage_v2)
target_include_directories(page_stress_testing PRIVATE stress)
target_compile_options(page_stress_testing PRIVATE -Wno-format -lc++) # turn off printf format check

# FIXME: mock disk full by FailPoint, and move this test into a unittest case.
add_executable(test_page_storage_write_disk_full test_page_storage_write_disk_full.cpp)
target_link_libraries(test_page_storage_write_disk_full dbms)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ DB::ReadBufferPtr PSWriter::genRandomData(const DB::PageId pageId, DB::MemHolder

holder = DB::createMemHolder(buff, [&](char * p) { free(p); });

return std::make_shared<DB::ReadBufferFromMemory>(buff, buff_sz);
return std::make_shared<DB::ReadBufferFromMemory>(const_cast<char *>(buff), buff_sz);
}

void PSWriter::updatedRandomData()
{
size_t memory_size = approx_page_mb * DB::MB * 2;
if (unlikely(memory != nullptr))
if (memory == nullptr)
{
memory = static_cast<char *>(malloc(memory_size));
if (memory == nullptr)
Expand Down Expand Up @@ -131,7 +131,7 @@ void PSCommonWriter::updatedRandomData()
: batch_buffer_size);
size_t memory_size = single_buff_size * batch_buffer_nums;

if (likely(memory == nullptr))
if (memory == nullptr)
{
memory = static_cast<char *>(malloc(memory_size));
if (memory == nullptr)
Expand Down Expand Up @@ -170,7 +170,6 @@ bool PSCommonWriter::runImpl()
bytes_used += buffptr->buffer().size();
}

writing_page[index] = page_id;
ps->write(std::move(wb));
return (batch_buffer_limit == 0 || bytes_used < batch_buffer_limit);
}
Expand Down Expand Up @@ -302,7 +301,10 @@ DB::PageId PSWindowWriter::genRandomPageId()
auto random = std::round(distribution(gen));
// Move this "random" near the pageid_boundary, If "random" is still negative, then make it positive
random = std::abs(random + pageid_boundary);
return static_cast<DB::PageId>(random > pageid_boundary ? pageid_boundary++ : random);

auto page_id = static_cast<DB::PageId>(random > pageid_boundary ? pageid_boundary++ : random);
writing_page[index] = page_id;
return page_id;
}

void PSWindowReader::setWindowSize(size_t window_size_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ class PSSnapshotReader : public PSReader
protected:
size_t snapshots_hold_num;
size_t snapshot_get_interval_ms = 0;
std::list<PageStorage::SnapshotPtr> snapshots;
std::list<DB::PageStorage::SnapshotPtr> snapshots;
};
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ StressEnv StressEnv::parse(int argc, char ** argv)
("failpoints,F", value<std::vector<std::string>>(), "failpoint(s) to enable") //
("status_interval,S", value<UInt32>()->default_value(1), "Status statistics interval. 0 means no statistics") //
("situation_mask,M", value<UInt64>()->default_value(0), "Run special tests sequentially, example -M 2") //
("verify", value<bool>()->default_value(true), "Run special tests sequentially with verify."); //
("verify", value<bool>()->default_value(true), "Run special tests sequentially with verify.") //
("running_ps_version,V", value<UInt16>()->default_value(3), "Select a version of PageStorage. 2 or 3 can used");

po::variables_map options;
po::store(po::parse_command_line(argc, argv, desc), options);
Expand All @@ -68,6 +69,14 @@ StressEnv StressEnv::parse(int argc, char ** argv)
opt.status_interval = options["status_interval"].as<UInt32>();
opt.situation_mask = options["situation_mask"].as<UInt64>();
opt.verify = options["verify"].as<bool>();
opt.running_ps_version = options["running_ps_version"].as<UInt16>();

if (opt.running_ps_version != 2 && opt.running_ps_version != 3)
{
std::cerr << "Invalid running_ps_version, this arg should be 2 or 3." << std::endl;
std::cerr << desc << std::endl;
exit(0);
}

if (options.count("paths"))
opt.paths = options["paths"].as<std::vector<std::string>>();
Expand All @@ -93,6 +102,8 @@ void StressEnv::setup()
#ifdef FIU_ENABLE
fiu_init(0);
#endif


for (const auto & fp : failpoints)
{
DB::FailPointHelper::enableFailPoint(fp);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#pragma once

#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/PageStorage.h>
#include <fmt/format.h>

#include <atomic>

using namespace DB::PS::V2;
namespace Poco
{
class Logger;
}

using PSPtr = std::shared_ptr<DB::PS::V2::PageStorage>;
using PSPtr = std::shared_ptr<DB::PageStorage>;

enum StressEnvStat
{
Expand Down Expand Up @@ -70,6 +69,7 @@ struct StressEnv
size_t status_interval = 1;
size_t situation_mask = 0;
bool verify = true;
size_t running_ps_version = 3;

std::vector<std::string> paths;
std::vector<std::string> failpoints;
Expand All @@ -81,7 +81,8 @@ struct StressEnv
"num_writers: {}, num_readers: {}, init_pages: {}, clean_before_run: {}"
", timeout_s: {}, read_delay_ms: {}, num_writer_slots: {}"
", avg_page_size_mb: {}, paths: [{}], failpoints: [{}]"
", status_interval: {}, situation_mask: {}, verify: {}."
", status_interval: {}, situation_mask: {}, verify: {}"
", running_pagestorage_version : {}."
"}}",
num_writers,
num_readers,
Expand All @@ -95,7 +96,8 @@ struct StressEnv
fmt::join(failpoints.begin(), failpoints.end(), ","),
status_interval,
situation_mask,
verify
verify,
running_ps_version
//
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <Encryption/MockKeyManager.h>
#include <PSWorkload.h>
#include <Poco/Logger.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <TestUtils/MockDiskDelegator.h>

void StressWorkload::onDumpResult()
Expand Down Expand Up @@ -47,7 +49,7 @@ void StressWorkload::onDumpResult()
}
}

void StressWorkload::initPageStorage(PageStorage::Config & config, String path_prefix)
void StressWorkload::initPageStorage(DB::PageStorage::Config & config, String path_prefix)
{
DB::FileProviderPtr file_provider = std::make_shared<DB::FileProvider>(std::make_shared<DB::MockKeyManager>(false), false);

Expand All @@ -67,8 +69,22 @@ void StressWorkload::initPageStorage(PageStorage::Config & config, String path_p
delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(options.paths[0] + "/" + path_prefix);
}

ps = std::make_shared<DB::PS::V2::PageStorage>("stress_test", delegator, config, file_provider);
ps->restore();
if (options.running_ps_version == 2)
{
ps = std::make_shared<DB::PS::V2::PageStorage>("stress_test", delegator, config, file_provider);
ps->restore();
}
else if (options.running_ps_version == 3)
{
ps = std::make_shared<DB::PS::V3::PageStorageImpl>("stress_test", delegator, config, file_provider);
// FIXME : we need restore V3
}
else
{
throw DB::Exception(fmt::format("Invalid PageStorage version {}",
options.running_ps_version));
}

{
size_t num_of_pages = 0;
ps->traverse([&num_of_pages](const DB::Page & page) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
#include <PSStressEnv.h>
#include <Poco/ThreadPool.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/V2/PageStorage.h>
#include <Storages/Page/PageStorage.h>
#include <fmt/format.h>

#define NORMAL_WORKLOAD 0
using namespace DB::PS::V2;
template <typename Child>
class StressWorkloadFunc
{
Expand Down Expand Up @@ -47,7 +46,7 @@ class StressWorkload
virtual void onDumpResult();

protected:
void initPageStorage(PageStorage::Config & config, String path_prefix = "");
void initPageStorage(DB::PageStorage::Config & config, String path_prefix = "");

void startBackgroundTimer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HeavyMemoryCostInGC
{
stop_watch.start();

PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HeavyRead : public StressWorkload

void run() override
{
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());
PSWriter::fillAllPages(ps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HeavySkewWriteRead : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class HeavyWrite : public StressWorkload

void run() override
{
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HighValidBigFileGCWorkload
{
stop_watch.start();

PageStorage::Config config;
DB::PageStorage::Config config;
config.file_max_size = 8ULL * DB::GB;
config.file_roll_size = 8ULL * DB::GB;
initPageStorage(config, name());
Expand All @@ -63,7 +63,7 @@ class HighValidBigFileGCWorkload

// Generate normal data in the same Pagefile
{
PageStorage::Config config;
DB::PageStorage::Config config;
config.file_max_size = DB::PAGE_FILE_MAX_SIZE;
config.file_roll_size = DB::PAGE_FILE_ROLL_SIZE;
initPageStorage(config, name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HoldSnapshotsLongTime : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class NormalWorkload
{
pool.addCapacity(1 + options.num_writers + options.num_readers);

PageStorage::Config config;
DB::PageStorage::Config config;
config.num_write_slots = options.num_writer_slots;
initPageStorage(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class PageStorageInMemoryCapacity : public StressWorkload

std::tie(total_mem, begin_virtual_size, begin_resident_size) = getCurrentMemory();
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ThousandsOfOffset : public StressWorkload
void run() override
{
pool.addCapacity(1 + options.num_writers + options.num_readers);
PageStorage::Config config;
DB::PageStorage::Config config;
initPageStorage(config, name());

metrics_dumper = std::make_shared<PSMetricsDumper>(1);
Expand Down

0 comments on commit c368dcc

Please sign in to comment.