diff --git a/dbms/src/Storages/CMakeLists.txt b/dbms/src/Storages/CMakeLists.txt index 1ef292753b4..fba636bf649 100644 --- a/dbms/src/Storages/CMakeLists.txt +++ b/dbms/src/Storages/CMakeLists.txt @@ -4,4 +4,6 @@ if (ENABLE_TESTS) add_subdirectory (tests) add_subdirectory (Transaction/tests) add_subdirectory (DeltaMerge/tests) + add_subdirectory (Page/tests) endif () + diff --git a/dbms/src/Storages/Page/Page.h b/dbms/src/Storages/Page/Page.h index 4d631ac77aa..64c46988318 100644 --- a/dbms/src/Storages/Page/Page.h +++ b/dbms/src/Storages/Page/Page.h @@ -27,16 +27,18 @@ using Pages = std::vector; using PageMap = std::map; using PageHandler = std::function; +// Indicate the page size && offset in PageFile. TODO: rename to `PageEntry`? struct PageCache { - PageFileId file_id = 0; - UInt32 level; - UInt32 size; - UInt64 offset; - UInt64 tag; - UInt64 checksum; - - bool isValid() { return file_id; } + // if file_id == 0, means it is invalid + PageFileId file_id = 0; + UInt32 level = 0; + UInt32 size = 0; + UInt64 offset = 0; + UInt64 tag = 0; + UInt64 checksum = 0; + + bool isValid() const { return file_id != 0; } PageFileIdAndLevel fileIdLevel() const { return std::make_pair(file_id, level); } }; static_assert(std::is_trivially_copyable_v); diff --git a/dbms/src/Storages/Page/PageDefines.h b/dbms/src/Storages/Page/PageDefines.h index 184c94a01e1..f746415f765 100644 --- a/dbms/src/Storages/Page/PageDefines.h +++ b/dbms/src/Storages/Page/PageDefines.h @@ -8,7 +8,7 @@ namespace DB { -#define MB 1048576ULL; +static constexpr UInt64 MB = 1048576ULL; static constexpr UInt64 PAGE_SIZE_STEP = (1 << 10) * 16; // 16 KB static constexpr UInt64 PAGE_BUFFER_SIZE = DBMS_DEFAULT_BUFFER_SIZE; diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index 6f9254a14ac..e5fa7614e27 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -75,7 +75,6 @@ template int openFile(const std::string & path) { ProfileEvents::increment(ProfileEvents::FileOpen); - int fd; int flags; if constexpr (read) @@ -87,14 +86,16 @@ int openFile(const std::string & path) flags = O_WRONLY | O_CREAT; } - fd = ::open(path.c_str(), flags, 0666); + int fd = ::open(path.c_str(), flags, 0666); if (-1 == fd) { ProfileEvents::increment(ProfileEvents::FileOpenFailed); if constexpr (!must_exist) { if (errno == ENOENT) + { return 0; + } } throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); } @@ -345,6 +346,7 @@ std::pair genWriteData( // /// Analyze meta file, and return . std::pair analyzeMetaFile( // + const String & path, PageFileId file_id, UInt32 level, const char * meta_data, @@ -356,7 +358,7 @@ std::pair analyzeMetaFile( // UInt64 page_data_file_size = 0; char * pos = const_cast(meta_data); - while (pos < meta_data + meta_data_size) + while (pos < meta_data_end) { if (pos + sizeof(WBSize) > meta_data_end) { @@ -364,36 +366,42 @@ std::pair analyzeMetaFile( // break; } const char * wb_start_pos = pos; - auto wb_bytes = get(pos); + const auto wb_bytes = get(pos); if (wb_start_pos + wb_bytes > meta_data_end) { LOG_WARNING(log, "Incomplete write batch, ignored."); break; } - auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum); - auto version = get(pos); - auto wb_checksum = get(wb_start_pos + wb_bytes_without_checksum); - - if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum)) - throw Exception("Write batch checksum not match", ErrorCodes::CHECKSUM_DOESNT_MATCH); + // this field is always true now + const auto version = get(pos); if (version != PageFile::CURRENT_VERSION) throw Exception("Version not match", ErrorCodes::LOGICAL_ERROR); + // check the checksum of WriteBatch + const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum); + const auto wb_checksum = get(wb_start_pos + wb_bytes_without_checksum); + if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum)) + { + throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data), + ErrorCodes::CHECKSUM_DOESNT_MATCH); + } + + // recover WriteBatch while (pos < wb_start_pos + wb_bytes_without_checksum) { auto is_put = get(pos); if (is_put) { - PageCache pc{}; - auto page_id = get(pos); - pc.tag = get(pos); - pc.offset = get(pos); - pc.size = get(pos); - pc.checksum = get(pos); - pc.file_id = file_id; - pc.level = level; + auto page_id = get(pos); + PageCache pc; + pc.file_id = file_id; + pc.level = level; + pc.tag = get(pos); + pc.offset = get(pos); + pc.size = get(pos); + pc.checksum = get(pos); page_caches[page_id] = pc; page_data_file_size += pc.size; @@ -404,6 +412,7 @@ std::pair analyzeMetaFile( // page_caches.erase(page_id); // Reserve the order of removal. } } + // move `pos` over the checksum of WriteBatch pos += sizeof(Checksum); if (pos != wb_start_pos + wb_bytes) @@ -443,6 +452,7 @@ void PageFile::Writer::write(const WriteBatch & wb, PageCacheMap & page_cache_ma { ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount()); + // TODO: investigate if not copy data into heap, write big pages can be faster? ByteBuffer meta_buf, data_buf; std::tie(meta_buf, data_buf) = PageMetaFormat::genWriteData(wb, page_file, page_cache_map); @@ -482,10 +492,12 @@ PageMap PageFile::Reader::read(PageIdAndCaches & to_read) return a.second.offset < b.second.offset; }); + // allocate data_buf that can hold all pages size_t buf_size = 0; for (const auto & p : to_read) + { buf_size += p.second.size; - + } // TODO optimization: // 1. Succeeding pages can be read by one call. // 2. Pages with small gaps between them can also read together. @@ -504,7 +516,10 @@ PageMap PageFile::Reader::read(PageIdAndCaches & to_read) { auto checksum = CityHash_v1_0_2::CityHash64(pos, page_cache.size); if (checksum != page_cache.checksum) - throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path, ErrorCodes::CHECKSUM_DOESNT_MATCH); + { + throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path, + ErrorCodes::CHECKSUM_DOESNT_MATCH); + } } Page page; @@ -550,7 +565,10 @@ void PageFile::Reader::read(PageIdAndCaches & to_read, const PageHandler & handl { auto checksum = CityHash_v1_0_2::CityHash64(data_buf, page_cache.size); if (checksum != page_cache.checksum) - throw Exception("Page checksum not match, broken file.", ErrorCodes::CHECKSUM_DOESNT_MATCH); + { + throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path, + ErrorCodes::CHECKSUM_DOESNT_MATCH); + } } Page page; @@ -610,10 +628,22 @@ std::pair PageFile::recover(const std::string & parent_path, con LOG_INFO(log, "Temporary page file, ignored: " + page_file_name); return {{}, false}; } - + // ensure both meta && data exist PageFileId file_id = std::stoull(ss[1]); UInt32 level = std::stoi(ss[2]); - return {PageFile(file_id, level, parent_path, false, false, log), true}; + PageFile pf(file_id, level, parent_path, false, false, log); + if (!Poco::File(pf.metaPath()).exists()) + { + LOG_INFO(log, "Broken page without meta file, ignored: " + pf.metaPath()); + return {{}, false}; + } + if (!Poco::File(pf.dataPath()).exists()) + { + LOG_INFO(log, "Broken page without data file, ignored: " + pf.dataPath()); + return {{}, false}; + } + + return {pf, true}; } PageFile PageFile::newPageFile(PageFileId file_id, UInt32 level, const std::string & parent_path, bool is_tmp, Logger * log) @@ -628,7 +658,7 @@ PageFile PageFile::openPageFileForRead(PageFileId file_id, UInt32 level, const s void PageFile::readAndSetPageMetas(PageCacheMap & page_caches) { - auto path = metaPath(); + const auto path = metaPath(); Poco::File file(path); size_t file_size = file.getSize(); @@ -641,7 +671,9 @@ void PageFile::readAndSetPageMetas(PageCacheMap & page_caches) readFile(file_fd, 0, data, file_size, path); - std::tie(this->meta_file_pos, this->data_file_pos) = PageMetaFormat::analyzeMetaFile(file_id, level, data, file_size, page_caches, log); + // analyze meta file and update page_caches + std::tie(this->meta_file_pos, this->data_file_pos) + = PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, page_caches, log); } void PageFile::setFormal() @@ -658,7 +690,21 @@ void PageFile::destroy() // TODO: delay remove. Poco::File file(folderPath()); if (file.exists()) + { + // remove meta first, then remove data + Poco::File meta_file(metaPath()); + if (meta_file.exists()) + { + meta_file.remove(); + } + Poco::File data_file(dataPath()); + if (data_file.exists()) + { + data_file.remove(); + } + // drop dir file.remove(true); + } } UInt64 PageFile::getDataFileSize() const diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index a1e2a2519da..b85ab3da914 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -11,8 +11,10 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } // namespace ErrorCodes -std::set listAllPageFiles(std::string storage_path, bool remove_tmp_file, Logger * page_file_log) +std::set +PageStorage::listAllPageFiles(const std::string & storage_path, bool remove_tmp_file, Logger * page_file_log) { + // collect all pages from `storage_path` and recover to `PageFile` objects Poco::File folder(storage_path); if (!folder.exists()) folder.createDirectories(); @@ -45,7 +47,7 @@ PageStorage::PageStorage(const std::string & storage_path_, const Config & confi : storage_path(storage_path_), config(config_), page_file_log(&Logger::get("PageFile")), log(&Logger::get("PageStorage")) { /// page_files are in ascending ordered by (file_id, level). - auto page_files = listAllPageFiles(storage_path, true, page_file_log); + auto page_files = PageStorage::listAllPageFiles(storage_path, /* remove_tmp_file= */ true, page_file_log); for (auto & page_file : page_files) { const_cast(page_file).readAndSetPageMetas(page_cache_map); @@ -233,10 +235,14 @@ void PageStorage::traversePageCache(std::function gc_lock(gc_mutex); + // get all PageFiles + auto page_files = PageStorage::listAllPageFiles(storage_path, true, page_file_log); if (page_files.empty()) return false; + LOG_DEBUG(log, "PageStorage GC start"); + PageFileIdAndLevel writing_file_id_level; { std::lock_guard lock(write_mutex); @@ -248,7 +254,7 @@ bool PageStorage::gc() { /// Select the GC candidates and write them into an new file. - /// Since we don't update any shared informations, only a read lock is sufficient. + /// Since we don't update any shared information, only a read lock is sufficient. std::shared_lock lock(read_mutex); @@ -262,138 +268,37 @@ bool PageStorage::gc() } } + // select gc candidate files into `merge_files` UInt64 candidate_total_size = 0; size_t migrate_page_count = 0; - for (auto & page_file : page_files) - { - auto file_size = page_file.getDataFileSize(); - UInt64 valid_size; - float valid_rate; - size_t valid_page_count; - - auto it = file_valid_pages.find(page_file.fileIdLevel()); - if (it == file_valid_pages.end()) - { - valid_size = 0; - valid_rate = 0; - valid_page_count = 0; - } - else - { - valid_size = it->second.first; - valid_rate = (float)valid_size / file_size; - valid_page_count = it->second.second.size(); - } - - // Don't gc writing page file. - bool is_candidate = page_file.fileIdLevel() != writing_file_id_level - && (valid_rate < config.merge_hint_low_used_rate || file_size < config.file_small_size); - if (!is_candidate) - continue; - - merge_files.emplace(page_file.fileIdLevel()); - - migrate_page_count += valid_page_count; - candidate_total_size += valid_size; - if (candidate_total_size >= config.file_max_size) - break; - } + merge_files = gcSelectCandidateFiles(page_files, file_valid_pages, writing_file_id_level, candidate_total_size, migrate_page_count); bool should_merge = merge_files.size() >= config.merge_hint_low_used_file_num || (merge_files.size() >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size); if (!should_merge) + { + LOG_DEBUG(log, + "GC exit without merging. merge file size: " << merge_files.size() << ", candidate size: " << candidate_total_size); return false; + } LOG_DEBUG(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); - if (migrate_page_count) + // if there are no valid pages to be migrated, then jump over + if (migrate_page_count > 0) { - auto [largest_file_id, level] = *(merge_files.rbegin()); - PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, true, page_file_log); - - { - // No need to sync after each write. Do sync before closing is enough. - auto gc_file_writer = gc_file.createWriter(false); - - for (const auto & file_id_level : merge_files) - { - auto it = file_valid_pages.find(file_id_level); - if (it == file_valid_pages.end()) - { - // This file does not contain any valid page. - continue; - } - const auto & page_ids = it->second.second; - - PageFile to_merge_file - = PageFile::openPageFileForRead(file_id_level.first, file_id_level.second, storage_path, page_file_log); - auto to_merge_file_reader = to_merge_file.createReader(); - - PageIdAndCaches page_id_and_caches; - { - for (auto page_id : page_ids) - { - auto it2 = page_cache_map.find(page_id); - // This page is removed already. - if (it2 == page_cache_map.end()) - continue; - const auto & page_cache = it2->second; - // This page is covered by newer file. - if (page_cache.fileIdLevel() != file_id_level) - continue; - page_id_and_caches.emplace_back(page_id, page_cache); - } - } - - if (!page_id_and_caches.empty()) - { - PageMap pages = to_merge_file_reader->read(page_id_and_caches); - WriteBatch wb; - for (const auto & [page_id, page_cache] : page_id_and_caches) - { - auto & page = pages.find(page_id)->second; - wb.putPage(page_id, - page_cache.tag, - std::make_shared(page.data.begin(), page.data.size()), - page.data.size()); - } - - gc_file_writer->write(wb, gc_file_page_cache_map); - } - } - } - - if (gc_file_page_cache_map.empty()) - { - gc_file.destroy(); - } - else - { - gc_file.setFormal(); - } + gc_file_page_cache_map = gcMigratePages(file_valid_pages, merge_files); } } { - /// Here we have to update the cache informations which readers need to synchronize, a write lock is needed. - + /// Here we have to update the cache information which readers need to synchronize, a write lock is needed. std::unique_lock lock(read_mutex); - - for (const auto & [page_id, page_cache] : gc_file_page_cache_map) - { - auto it = page_cache_map.find(page_id); - if (it == page_cache_map.end()) - continue; - auto & old_page_cache = it->second; - // In case of page being updated during GC process. - if (old_page_cache.fileIdLevel() < page_cache.fileIdLevel()) - old_page_cache = page_cache; - } + gcUpdatePageMap(gc_file_page_cache_map); // TODO: potential bug: A read thread may just select a file F, while F is being GCed. And after GC, we remove F from // reader cache. But after that, A could come in and re-add F reader cache. It is not a very big issue, because // it only cause a hanging opened fd, which no one will use anymore. - // Remove reader cache. for (const auto & [file_id, level] : merge_files) { @@ -401,13 +306,153 @@ bool PageStorage::gc() } } + // destroy the files have already been gc for (const auto & [file_id, level] : merge_files) { auto page_file = PageFile::openPageFileForRead(file_id, level, storage_path, page_file_log); page_file.destroy(); } - return true; } +PageStorage::GcCandidates PageStorage::gcSelectCandidateFiles( // keep readable indent + const std::set & page_files, + const GcLivesPages & file_valid_pages, + const PageFileIdAndLevel & writing_file_id_level, + UInt64 & candidate_total_size, + size_t & migrate_page_count) const +{ + GcCandidates merge_files; + for (auto & page_file : page_files) + { + auto file_size = page_file.getDataFileSize(); + UInt64 valid_size; + float valid_rate; + size_t valid_page_count; + + auto it = file_valid_pages.find(page_file.fileIdLevel()); + if (it == file_valid_pages.end()) + { + valid_size = 0; + valid_rate = 0; + valid_page_count = 0; + } + else + { + valid_size = it->second.first; + valid_rate = (float)valid_size / file_size; + valid_page_count = it->second.second.size(); + } + + // Don't gc writing page file. + bool is_candidate = (page_file.fileIdLevel() != writing_file_id_level) + && (valid_rate < config.merge_hint_low_used_rate || file_size < config.file_small_size); + if (!is_candidate) + continue; + + merge_files.emplace(page_file.fileIdLevel()); + + migrate_page_count += valid_page_count; + candidate_total_size += valid_size; + if (candidate_total_size >= config.file_max_size) + break; + } + return merge_files; +} + +PageCacheMap PageStorage::gcMigratePages(const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const +{ + PageCacheMap gc_file_page_cache_map; + // merge `merge_files` to PageFile which PageId = max of all `merge_files` and level = level + 1 + auto [largest_file_id, level] = *(merge_files.rbegin()); + PageFile gc_file = PageFile::newPageFile(largest_file_id, level + 1, storage_path, /* is_tmp= */ true, page_file_log); + + size_t num_successful_migrate_pages = 0; + { + // No need to sync after each write. Do sync before closing is enough. + auto gc_file_writer = gc_file.createWriter(/* sync_on_write= */ false); + + for (const auto & file_id_level : merge_files) + { + auto it = file_valid_pages.find(file_id_level); + if (it == file_valid_pages.end()) + { + // This file does not contain any valid page. + continue; + } + const auto & page_ids = it->second.second; + + PageFile to_merge_file = PageFile::openPageFileForRead(file_id_level.first, file_id_level.second, storage_path, page_file_log); + auto to_merge_file_reader = to_merge_file.createReader(); + + PageIdAndCaches page_id_and_caches; + { + for (auto page_id : page_ids) + { + auto it2 = page_cache_map.find(page_id); + // This page is already removed. + if (it2 == page_cache_map.end()) + continue; + const auto & page_cache = it2->second; + // This page is covered by newer file. + if (page_cache.fileIdLevel() != file_id_level) + continue; + page_id_and_caches.emplace_back(page_id, page_cache); + num_successful_migrate_pages += 1; + } + } + + if (!page_id_and_caches.empty()) + { + // copy valid pages from `to_merge_file` to `gc_file` + PageMap pages = to_merge_file_reader->read(page_id_and_caches); + WriteBatch wb; + for (const auto & [page_id, page_cache] : page_id_and_caches) + { + auto & page = pages.find(page_id)->second; + wb.putPage(page_id, + page_cache.tag, + std::make_shared(page.data.begin(), page.data.size()), + page.data.size()); + } + + gc_file_writer->write(wb, gc_file_page_cache_map); + } + } + } + + if (gc_file_page_cache_map.empty()) + { + gc_file.destroy(); + } + else + { + gc_file.setFormal(); + auto id = gc_file.fileIdLevel(); + LOG_DEBUG(log, "GC have migrated " << num_successful_migrate_pages << " regions to PageFile_" << id.first << "_" << id.second); + } + return gc_file_page_cache_map; +} + +void PageStorage::gcUpdatePageMap(const PageCacheMap & gc_pages_map) +{ + for (const auto & [page_id, page_cache] : gc_pages_map) + { + auto it = page_cache_map.find(page_id); + // if the gc page have already been remove, just ignore it + if (it == page_cache_map.end()) + { + continue; + } + auto & old_page_cache = it->second; + // In case of page being updated during GC process. + if (old_page_cache.fileIdLevel() < page_cache.fileIdLevel()) + { + // no new page write to `page_cache_map`, replace it with gc page + old_page_cache = page_cache; + } + // else new page written by another thread, gc page is replaced. leave the page for next gc + } +} + } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 1aa00bd2eda..27141b9d54c 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -25,7 +26,7 @@ class PageStorage { Config() {} - bool sync_on_write = false; + bool sync_on_write = true; size_t file_roll_size = PAGE_FILE_ROLL_SIZE; size_t file_max_size = PAGE_FILE_MAX_SIZE; @@ -54,9 +55,22 @@ class PageStorage void traversePageCache(std::function acceptor); bool gc(); + static std::set + listAllPageFiles(const std::string & storage_path, bool remove_tmp_file, Logger * page_file_log); + private: PageFile::Writer & getWriter(); ReaderPtr getReader(const PageFileIdAndLevel & file_id_level); + // gc helper functions + using GcCandidates = std::set; + using GcLivesPages = std::map>; + GcCandidates gcSelectCandidateFiles(const std::set & page_files, + const GcLivesPages & file_valid_pages, + const PageFileIdAndLevel & writing_file_id_level, + UInt64 & candidate_total_size, + size_t & migrate_page_count) const; + PageCacheMap gcMigratePages(const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const; + void gcUpdatePageMap(const PageCacheMap & gc_pages_map); private: std::string storage_path; @@ -76,6 +90,7 @@ class PageStorage std::mutex write_mutex; std::shared_mutex read_mutex; + std::mutex gc_mutex; // A mutex used to protect only gc }; } // namespace DB diff --git a/dbms/src/Storages/Page/tests/CMakeLists.txt b/dbms/src/Storages/Page/tests/CMakeLists.txt new file mode 100644 index 00000000000..6521554819c --- /dev/null +++ b/dbms/src/Storages/Page/tests/CMakeLists.txt @@ -0,0 +1,12 @@ + +add_executable(page_test_page_storage gtest_page_storage.cpp) +target_link_libraries(page_test_page_storage gtest_main dbms) + +add_executable(page_test_page_file gtest_page_file.cpp) +target_link_libraries(page_test_page_file gtest_main dbms) + +add_executable(page_stress_page_storage stress_page_stroage.cpp) +target_link_libraries(page_stress_page_storage dbms) + +add_executable(page_utils_get_valid_pages utils_get_valid_pages.cpp) +target_link_libraries(page_utils_get_valid_pages dbms) diff --git a/dbms/src/Storages/Page/tests/gtest_page_file.cpp b/dbms/src/Storages/Page/tests/gtest_page_file.cpp new file mode 100644 index 00000000000..47e79b84cad --- /dev/null +++ b/dbms/src/Storages/Page/tests/gtest_page_file.cpp @@ -0,0 +1,20 @@ +#include "gtest/gtest.h" +#include + +namespace DB +{ +namespace tests +{ + +TEST(PageFile_test, Compare) +{ + PageFile pf0 = PageFile::openPageFileForRead(0, 0, ".", &Logger::get("PageFile")); + PageFile pf1 = PageFile::openPageFileForRead(0, 1, ".", &Logger::get("PageFile")); + + PageFile::Comparator comp; + ASSERT_EQ(comp(pf0, pf1), true); + ASSERT_EQ(comp(pf1, pf0), false); +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp new file mode 100644 index 00000000000..001b5ded632 --- /dev/null +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -0,0 +1,193 @@ +#include "gtest/gtest.h" + +#define private public +#include +#undef private + +namespace DB +{ +namespace tests +{ + +class PageStorage_test : public ::testing::Test +{ +public: + PageStorage_test(): path("./t"), storage() {} + +protected: + void SetUp() override { + // drop dir if exists + Poco::File file(path); + if (file.exists()) { + file.remove(true); + } + config.file_roll_size = 512; + config.merge_hint_low_used_file_num = 1; + storage = std::make_shared(path, config); + } +protected: + String path; + PageStorage::Config config; + std::shared_ptr storage; +}; + +TEST_F(PageStorage_test, WriteRead) +{ + const size_t buf_sz = 1024; + char c_buff[buf_sz]; + for (size_t i = 0; i < buf_sz; ++i) + { + c_buff[i] = i % 0xff; + } + + { + WriteBatch batch; + ReadBufferPtr buff = std::make_shared(c_buff,sizeof(c_buff)); + batch.putPage(0, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + storage->write(batch); + } + + Page page0 = storage->read(0); + ASSERT_EQ(page0.data.size(), buf_sz); + ASSERT_EQ(page0.page_id, 0UL); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page0.data.begin() + i), static_cast(i % 0xff)); + } + Page page1 = storage->read(1); + ASSERT_EQ(page1.data.size(), buf_sz); + ASSERT_EQ(page1.page_id, 1UL); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page1.data.begin() + i), static_cast(i % 0xff)); + } +} + +TEST_F(PageStorage_test, WriteReadGc) +{ + const size_t buf_sz = 256; + char c_buff[buf_sz]; + + const size_t num_repeat = 10; + PageId pid = 1; + const char page0_byte = 0x3f; + { + // put page0 + WriteBatch batch; + memset(c_buff, page0_byte, buf_sz); + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(0, 0, buff, buf_sz); + storage->write(batch); + } + // repeated put page1 + for (size_t n = 1; n <= num_repeat; ++n) + { + WriteBatch batch; + memset(c_buff, n, buf_sz); + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(pid, 0, buff, buf_sz); + storage->write(batch); + } + + { + Page page0 = storage->read(0); + ASSERT_EQ(page0.data.size(), buf_sz); + ASSERT_EQ(page0.page_id, 0UL); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page0.data.begin() + i), page0_byte); + } + + Page page1 = storage->read(pid); + ASSERT_EQ(page1.data.size(), buf_sz); + ASSERT_EQ(page1.page_id, pid); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page1.data.begin() + i), static_cast(num_repeat % 0xff)); + } + } + + storage->gc(); + + { + Page page0 = storage->read(0); + ASSERT_EQ(page0.data.size(), buf_sz); + ASSERT_EQ(page0.page_id, 0UL); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page0.data.begin() + i), page0_byte); + } + + Page page1 = storage->read(pid); + ASSERT_EQ(page1.data.size(), buf_sz); + ASSERT_EQ(page1.page_id, pid); + for (size_t i = 0; i < buf_sz; ++i) + { + EXPECT_EQ(*(page1.data.begin() + i), static_cast(num_repeat % 0xff)); + } + } + +} + +TEST_F(PageStorage_test, GcConcurrencyDelPage) +{ + PageId pid = 0; + // gc move Page0 -> PageFile{5,1} + PageCacheMap map; + map.emplace(pid, PageCache{.file_id=1, .level=0}); + // write thread del Page0 in page_map before gc thread get unique_lock of `read_mutex` + storage->page_cache_map.clear(); + // gc continue + storage->gcUpdatePageMap(map); + // page0 don't update to page_map + const PageCache entry = storage->getCache(pid); + ASSERT_FALSE(entry.isValid()); +} + +static void EXPECT_PagePos_LT(PageFileIdAndLevel p0, PageFileIdAndLevel p1) +{ + EXPECT_LT(p0, p1); +} + +TEST_F(PageStorage_test, GcPageMove) +{ + EXPECT_PagePos_LT({4, 0}, {5, 1}); + EXPECT_PagePos_LT({5, 0}, {5, 1}); + EXPECT_PagePos_LT({5, 1}, {6, 1}); + EXPECT_PagePos_LT({5, 2}, {6, 1}); + + const PageId pid = 0; + // old Page0 is in PageFile{5, 0} + storage->page_cache_map.emplace(pid, PageCache{.file_id=5, .level=0,}); + // gc move Page0 -> PageFile{5,1} + PageCacheMap map; + map.emplace(pid, PageCache{.file_id=5, .level=1,}); + storage->gcUpdatePageMap(map); + // page_map get updated + const PageCache entry = storage->getCache(pid); + ASSERT_TRUE(entry.isValid()); + ASSERT_EQ(entry.file_id, 5); + ASSERT_EQ(entry.level, 1); +} + +TEST_F(PageStorage_test, GcConcurrencySetPage) +{ + const PageId pid = 0; + // gc move Page0 -> PageFile{5,1} + PageCacheMap map; + map.emplace(pid, PageCache{.file_id=5, .level=1,}); + // write thread insert newer Page0 before gc thread get unique_lock on `read_mutex` + storage->page_cache_map.emplace(pid, PageCache{.file_id=6, .level=0,}); + // gc continue + storage->gcUpdatePageMap(map); + // read + const PageCache entry = storage->getCache(pid); + ASSERT_TRUE(entry.isValid()); + ASSERT_EQ(entry.file_id, 6); + ASSERT_EQ(entry.level, 0); +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Page/tests/stress_page_stroage.cpp b/dbms/src/Storages/Page/tests/stress_page_stroage.cpp new file mode 100644 index 00000000000..ab6650c3e8a --- /dev/null +++ b/dbms/src/Storages/Page/tests/stress_page_stroage.cpp @@ -0,0 +1,162 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +using PSPtr = std::shared_ptr; + +const DB::PageId MAX_PAGE_ID = 1000; + +std::atomic running_without_exception = true; + +class PSWriter : public Poco::Runnable +{ + PSPtr ps; + std::mt19937 gen; + +public: + PSWriter(const PSPtr & ps_) : ps(ps_), gen() {} + void run() override + { + while (running_without_exception) + { + assert(ps != nullptr); + std::normal_distribution<> d{MAX_PAGE_ID / 2, 150}; + const DB::PageId pageId = static_cast(std::round(d(gen))) % MAX_PAGE_ID; + //const DB::PageId pageId = random() % MAX_PAGE_ID; + + DB::WriteBatch wb; + // fill page with random bytes + const size_t buff_sz = 2048 * 1024 + random() % 3000; + char *buff = new char[buff_sz]; + const char buff_ch = random() % 0xFF; + memset(buff, buff_ch, buff_sz); + wb.putPage(pageId, 0, std::make_shared(buff, buff_sz), buff_sz); + delete []buff; + + ps->write(wb); + } + LOG_INFO(&Logger::get("root"), "writer exit"); + } +}; + +class PSReader : public Poco::Runnable +{ + PSPtr ps; + +public: + PSReader(const PSPtr & ps_) : ps(ps_) {} + void run() override + { + while (running_without_exception) + { + { + const uint32_t micro_seconds_to_sleep = random() % 50; + usleep(micro_seconds_to_sleep * 1000); + } + assert(ps != nullptr); + const DB::PageId pageId = random() % MAX_PAGE_ID; + try + { + ps->read({pageId,}); + } + catch (DB::Exception & e) + { + LOG_TRACE(&Logger::get("root"), e.displayText()); + } + } + LOG_INFO(&Logger::get("root"), "reader exit"); + } +}; + +class PSGc +{ + PSPtr ps; + +public: + PSGc(const PSPtr & ps_) : ps(ps_) {} + void onTime(Poco::Timer & /* t */) + { + assert(ps != nullptr); + try + { + //throw DB::Exception("fake exception"); + ps->gc(); + } + catch (DB::Exception & e) + { + // if gc throw exception stop the test + running_without_exception = false; + } + } +}; + +int main(int argc, char ** argv) +{ + (void)argc; + (void)argv; + + bool drop_before_run = false; + if (argc > 2) { + DB::String drop_str = argv[2]; + if (drop_str == "drop") { + drop_before_run = true; + } + } + + Poco::AutoPtr channel = new Poco::ConsoleChannel(std::cerr); + Poco::AutoPtr formatter(new Poco::PatternFormatter); + formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t"); + Poco::AutoPtr formatting_channel(new Poco::FormattingChannel(formatter, channel)); + Logger::root().setChannel(formatting_channel); + Logger::root().setLevel("trace"); + + + const DB::String path = "./stress"; + // drop dir if exists + Poco::File file(path); + if (file.exists() && drop_before_run) + { + file.remove(true); + } + + // create PageStorage + DB::PageStorage::Config config; + PSPtr ps = std::make_shared(path, config); + + // create thread pool + const size_t num_readers = 4; + Poco::ThreadPool pool(/* minCapacity= */ 2 + num_readers); + + // start one writer thread + PSWriter writer(ps); + pool.start(writer, "writer"); + + // start one gc thread + PSGc gc(ps); + Poco::Timer timer(0, 30 * 1000); + timer.setStartInterval(1000); + timer.setPeriodicInterval(30 * 1000); + timer.start(Poco::TimerCallback(gc, &PSGc::onTime)); + + // start mutiple read thread + std::vector> readers(num_readers); + for (size_t i = 0; i < num_readers; ++i) + { + readers[i] = std::make_shared(ps); + pool.start(*readers[i]); + } + + pool.joinAll(); + + return -1; +} diff --git a/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp new file mode 100644 index 00000000000..275d0e06e29 --- /dev/null +++ b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp @@ -0,0 +1,91 @@ +#include +#include +#include +#include +#include +#include + +#include + +void Usage(const char * prog) +{ + fprintf(stderr, + "Usage: %s \n" + "\tmode==1 -> dump all page entries\n" + "\tmode==2 -> dump valid page entries\n", + prog); +} + +int main(int argc, char ** argv) +{ + (void)argc; + (void)argv; + + if (argc < 3) + { + Usage(argv[0]); + return 1; + } + + Poco::AutoPtr channel = new Poco::ConsoleChannel(std::cerr); + Poco::AutoPtr formatter(new Poco::PatternFormatter); + formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t"); + Poco::AutoPtr formatting_channel(new Poco::FormattingChannel(formatter, channel)); + Logger::root().setChannel(formatting_channel); + Logger::root().setLevel("trace"); + + DB::String path = argv[1]; + const int32_t MODE_DUMP_ALL_ENTRIES = 1; + const int32_t MODE_DUMP_VALID_ENTRIES = 2; + DB::String mode_str = argv[2]; + int32_t mode = strtol(mode_str.c_str(), nullptr, 10); + if (mode != MODE_DUMP_ALL_ENTRIES && mode != MODE_DUMP_VALID_ENTRIES) + { + Usage(argv[0]); + return 1; + } + auto page_files = DB::PageStorage::listAllPageFiles(path, true, &Logger::get("root")); + + DB::PageCacheMap valid_page_entries; + for (auto & page_file : page_files) + { + DB::PageCacheMap page_entries; + const_cast(page_file).readAndSetPageMetas(page_entries); + printf("File: page_%llu_%llu with %llu entries:\n", page_file.getFileId(), page_file.getLevel(), page_entries.size()); + for (auto & [pid, entry] : page_entries) + { + if (mode == MODE_DUMP_ALL_ENTRIES) + { + printf("\tpid:%9lld\t\t" + "%llu\t%llu\t%llu\t%9llu\t%llu\t%016llx\n", + pid, // + entry.file_id, + entry.level, + entry.size, + entry.offset, + entry.tag, + entry.checksum); + } + valid_page_entries[pid] = entry; + } + } + + if (mode == MODE_DUMP_VALID_ENTRIES) + { + printf("Valid page entries: %lld\n", valid_page_entries.size()); + for (auto & [pid, entry] : valid_page_entries) + { + printf("\tpid:%9lld\t\t" + "%llu\t%llu\t%llu\t%9llu\t%llu\t%016llx\n", + pid, // + entry.file_id, + entry.level, + entry.size, + entry.offset, + entry.tag, + entry.checksum); + } + } + + return 0; +}