From 1e0a37a1fd9c08b4a5be4aa49c095540c4b6b86e Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 23 Dec 2021 12:31:47 +0800 Subject: [PATCH] PageStorage: Mvcc directory (without GC/restore) (#3637) --- dbms/src/Common/ErrorCodes.cpp | 2 + dbms/src/IO/WriteBufferFromString.h | 3 +- dbms/src/Storages/Page/V3/LogFile/LogWriter.h | 2 + dbms/src/Storages/Page/V3/MapUtils.h | 30 + dbms/src/Storages/Page/V3/PageDirectory.cpp | 201 ++++++ dbms/src/Storages/Page/V3/PageDirectory.h | 71 ++- dbms/src/Storages/Page/V3/PageEntriesEdit.h | 31 + .../Page/V3/spacemap/SpaceMapSTDMap.h | 22 +- .../Storages/Page/V3/tests/gtest_free_map.cpp | 37 -- .../Page/V3/tests/gtest_map_utils.cpp | 67 ++ .../Page/V3/tests/gtest_page_directory.cpp | 576 ++++++++++++++++++ .../Storages/Page/V3/tests/gtest_wal_log.cpp | 3 +- 12 files changed, 973 insertions(+), 72 deletions(-) create mode 100644 dbms/src/Storages/Page/V3/MapUtils.h create mode 100644 dbms/src/Storages/Page/V3/PageDirectory.cpp create mode 100644 dbms/src/Storages/Page/V3/tests/gtest_map_utils.cpp create mode 100644 dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index d2cbf156b72..3b6e31d5f51 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -406,6 +406,8 @@ extern const int DIVIDED_BY_ZERO = 10011; extern const int INVALID_TIME = 10012; extern const int DEADLOCK_AVOIDED = 10013; extern const int PTHREAD_ERROR = 10014; +extern const int PS_ENTRY_NOT_EXISTS = 10015; +extern const int PS_ENTRY_NO_VALID_VERSION = 10016; } // namespace ErrorCodes } // namespace DB diff --git a/dbms/src/IO/WriteBufferFromString.h b/dbms/src/IO/WriteBufferFromString.h index 2cb0b10cbe0..c7fb812c080 100644 --- a/dbms/src/IO/WriteBufferFromString.h +++ b/dbms/src/IO/WriteBufferFromString.h @@ -25,7 +25,8 @@ class StringHolder } // namespace detail /// Creates the string by itself and allows to get it. -class WriteBufferFromOwnString : public detail::StringHolder +class WriteBufferFromOwnString + : public detail::StringHolder , public WriteBufferFromString { public: diff --git a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h index 8163ab0f823..f361c52e34b 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogWriter.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogWriter.h @@ -1,3 +1,5 @@ +#pragma once + #include #include diff --git a/dbms/src/Storages/Page/V3/MapUtils.h b/dbms/src/Storages/Page/V3/MapUtils.h new file mode 100644 index 00000000000..8bdb8eed1b6 --- /dev/null +++ b/dbms/src/Storages/Page/V3/MapUtils.h @@ -0,0 +1,30 @@ +#pragma once + +namespace DB::MapUtils +{ +// Return an iterator to the last element whose key is less than or equal to `key`. +// If no such element is found, the past-the-end iterator is returned. +template +typename C::const_iterator +findLessEQ(const C & c, const typename C::key_type & key) +{ + auto iter = c.upper_bound(key); // first element > `key` + // Nothing greater than key + if (iter == c.cbegin()) + return c.cend(); + // its prev must be less than or equal to `key` + return --iter; +} + +template +typename C::const_iterator +findLess(const C & c, const typename C::key_type & key) +{ + auto iter = c.lower_bound(key); // first element >= `key` + if (iter == c.cbegin()) + return c.cend(); // Nothing < `key` + // its prev must be less than `key` + return --iter; +} + +} // namespace DB::MapUtils diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp new file mode 100644 index 00000000000..e05b9c5ecd0 --- /dev/null +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -0,0 +1,201 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ +extern const int NOT_IMPLEMENTED; +extern const int PS_ENTRY_NOT_EXISTS; +extern const int PS_ENTRY_NO_VALID_VERSION; +} // namespace ErrorCodes +namespace PS::V3 +{ +std::optional PageDirectory::VersionedPageEntries::getEntry(UInt64 seq) const +{ + auto page_lock = acquireLock(); + // entries are sorted by , find the first one less than + if (auto iter = MapUtils::findLess(entries, PageVersionType(seq + 1)); + iter != entries.end()) + { + if (!iter->second.is_delete) + return iter->second.entry; + } + return std::nullopt; +} + +PageDirectory::PageDirectory() + : sequence(0) + , log(getLogWithPrefix(nullptr, "PageDirectory")) +{ +} + +void PageDirectory::restore() +{ + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +} + +PageDirectorySnapshotPtr PageDirectory::createSnapshot() const +{ + auto snap = std::make_shared(sequence.load()); + snapshots.emplace_back(std::weak_ptr(snap)); + return snap; +} + +PageIDAndEntryV3 PageDirectory::get(PageId page_id, const PageDirectorySnapshotPtr & snap) const +{ + MVCCMapType::const_iterator iter; + { + std::shared_lock read_lock(table_rw_mutex); + iter = mvcc_table_directory.find(page_id); + if (iter == mvcc_table_directory.end()) + throw Exception(fmt::format("Entry [id={}] not exist!", page_id), ErrorCodes::PS_ENTRY_NOT_EXISTS); + } + + if (auto entry = iter->second->getEntry(snap->sequence); entry) + { + return PageIDAndEntryV3(page_id, *entry); + } + + throw Exception(fmt::format("Entry [id={}] [seq={}] not exist!", page_id, snap->sequence), ErrorCodes::PS_ENTRY_NO_VALID_VERSION); +} + +PageIDAndEntriesV3 PageDirectory::get(const PageIds & page_ids, const PageDirectorySnapshotPtr & snap) const +{ + std::vector iters; + iters.reserve(page_ids.size()); + { + std::shared_lock read_lock(table_rw_mutex); + for (size_t idx = 0; idx < page_ids.size(); ++idx) + { + if (auto iter = mvcc_table_directory.find(page_ids[idx]); + iter != mvcc_table_directory.end()) + { + iters.emplace_back(iter); + } + else + { + throw Exception(fmt::format("Entry [id={}] at [idx={}] not exist!", page_ids[idx], idx), ErrorCodes::PS_ENTRY_NOT_EXISTS); + } + } + } + + PageIDAndEntriesV3 id_entries; + for (size_t idx = 0; idx < page_ids.size(); ++idx) + { + const auto & iter = iters[idx]; + if (auto entry = iter->second->getEntry(snap->sequence); entry) + { + id_entries.emplace_back(page_ids[idx], *entry); + } + else + throw Exception(fmt::format("Entry [id={}] [seq={}] at [idx={}] not exist!", page_ids[idx], snap->sequence, idx), ErrorCodes::PS_ENTRY_NO_VALID_VERSION); + } + + return id_entries; +} + +void PageDirectory::apply(PageEntriesEdit && edit) +{ + std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline + UInt64 last_sequence = sequence.load(); + + // stage 1, get the entry to be ref + auto snap = createSnapshot(); + for (auto & r : edit.getRecords()) + { + // Set the version of inserted entries + r.version = PageVersionType(last_sequence + 1); + + if (r.type != WriteBatch::WriteType::REF) + { + continue; + } + auto iter = mvcc_table_directory.find(r.ori_page_id); + if (iter == mvcc_table_directory.end()) + { + throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); + } + if (auto entry = iter->second->getEntry(last_sequence); entry) + { + // copy the entry to be ref + r.entry = *entry; + } + else + { + throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); + } + } + + // stage 2, persisted the changes to WAL + // wal.apply(edit); + + // stage 3, create entry version list for pageId. nothing need to be rollback + std::unordered_map updating_locks; + std::vector updating_pages; + updating_pages.reserve(edit.size()); + for (const auto & r : edit.getRecords()) + { + auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr)); + if (created) + { + iter->second = std::make_shared(); + } + updating_locks.emplace(r.page_id, iter->second->acquireLock()); + updating_pages.emplace_back(iter->second); + } + + // stage 4, there are no rollback since we already persist `edit` to WAL, just ignore error if any + const auto & records = edit.getRecords(); + for (size_t idx = 0; idx < records.size(); ++idx) + { + const auto & r = records[idx]; + switch (r.type) + { + case WriteBatch::WriteType::PUT: + [[fallthrough]]; + case WriteBatch::WriteType::UPSERT: + [[fallthrough]]; + case WriteBatch::WriteType::REF: + { + // Put/upsert/ref all should append a new version for this page + updating_pages[idx]->createNewVersion(last_sequence + 1, records[idx].entry); + updating_locks.erase(records[idx].page_id); + break; + } + case WriteBatch::WriteType::DEL: + { + updating_pages[idx]->createDelete(last_sequence + 1); + updating_locks.erase(records[idx].page_id); + break; + } + } + } + + // The edit committed, incr the sequence number to publish changes for `createSnapshot` + sequence.fetch_add(1); +} + +bool PageDirectory::gc() +{ + // Cleanup released snapshots + for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) + { + if (iter->expired()) + iter = snapshots.erase(iter); + else + ++iter; + } + throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); +} + +} // namespace PS::V3 +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index ed5e4aab53a..4eb38c0d758 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -1,12 +1,16 @@ #pragma once +#include #include #include +#include #include #include #include +#include #include +#include #include #include @@ -15,44 +19,85 @@ namespace DB::PS::V3 class PageDirectorySnapshot : public DB::PageStorageSnapshot { public: + UInt64 sequence; + explicit PageDirectorySnapshot(UInt64 seq) + : sequence(seq) + {} }; using PageDirectorySnapshotPtr = std::shared_ptr; class PageDirectory { public: + PageDirectory(); + + void restore(); + PageDirectorySnapshotPtr createSnapshot() const; - PageIDAndEntriesV3 get(const PageId & read_id, const PageDirectorySnapshotPtr & snap) const; - PageIDAndEntriesV3 get(const PageIds & read_ids, const PageDirectorySnapshotPtr & snap) const; + PageIDAndEntryV3 get(PageId page_id, const PageDirectorySnapshotPtr & snap) const; + PageIDAndEntriesV3 get(const PageIds & page_ids, const PageDirectorySnapshotPtr & snap) const; void apply(PageEntriesEdit && edit); bool gc(); private: - struct VersionType - { - UInt64 sequence = 0; - UInt64 epoch = 0; - }; - struct VersionedPageEntry + struct EntryOrDelete { - VersionType ver; + bool is_delete; PageEntryV3 entry; + + explicit EntryOrDelete(bool del) + : is_delete(del) + { + assert(del == true); + } + explicit EntryOrDelete(const PageEntryV3 & entry_) + : is_delete(false) + , entry(entry_) + {} }; + + using PageLock = std::unique_ptr>; class VersionedPageEntries { - std::list entries; + public: + PageLock acquireLock() const + { + return std::make_unique>(m); + } + + void createNewVersion(UInt64 seq, const PageEntryV3 & entry) + { + entries.emplace(PageVersionType(seq), entry); + } + + void createDelete(UInt64 seq) + { + entries.emplace(PageVersionType(seq), EntryOrDelete(/*del*/ true)); + } + + std::optional getEntry(UInt64 seq) const; + + private: mutable std::mutex m; + // Entries sorted by version + std::map entries; }; + using VersionedPageEntriesPtr = std::shared_ptr; - std::shared_mutex table_rw_mutex; - std::unordered_map mvcc_table_directory; +private: + std::atomic sequence; + mutable std::shared_mutex table_rw_mutex; + using MVCCMapType = std::unordered_map; + MVCCMapType mvcc_table_directory; - std::list> snapshots; + mutable std::list> snapshots; WALStore wal; + + LogWithPrefixPtr log; }; } // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index 53b17b812b8..0ef63a29e0b 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -7,6 +7,31 @@ namespace DB::PS::V3 { +// `PageDirectory::apply` with create a version={directory.sequence, epoch=0}. +// After data compaction and page entries need to be updated, will create +// some entries with a version={old_sequence, epoch=old_epoch+1}. +struct PageVersionType +{ + UInt64 sequence; // The write sequence + UInt64 epoch; // The GC epoch + + explicit PageVersionType(UInt64 seq) + : sequence(seq) + , epoch(0) + {} + + PageVersionType() + : PageVersionType(0) + {} + + bool operator<(const PageVersionType & rhs) const + { + if (sequence == rhs.sequence) + return epoch < rhs.epoch; + return sequence < rhs.sequence; + } +}; + /// Page entries change to apply to PageDirectory class PageEntriesEdit { @@ -71,9 +96,15 @@ class PageEntriesEdit PageId page_id; PageId ori_page_id; PageEntryV3 entry; + PageVersionType version; }; using EditRecords = std::vector; + void appendRecord(const EditRecord & rec) + { + records.emplace_back(rec); + } + EditRecords & getRecords() { return records; } const EditRecords & getRecords() const { return records; } diff --git a/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h b/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h index d74b61c1ec8..ac30198222e 100644 --- a/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h +++ b/dbms/src/Storages/Page/V3/spacemap/SpaceMapSTDMap.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -15,23 +16,6 @@ extern const int NOT_IMPLEMENTED; namespace PS::V3 { -namespace details -{ -// Return an iterator to the last element whose key is less than or equal to `key`. -// If no such element is found, the past-the-end iterator is returned. -template -typename C::const_iterator -findLessEQ(const C & c, const typename C::key_type & key) -{ - auto iter = c.upper_bound(key); // first element > `key` - // Nothing greater than key - if (iter == c.cbegin()) - return c.cend(); - // its prev must be less than or equal to `key` - return --iter; -} - -} // namespace details class STDMapSpaceMap : public SpaceMap , public ext::SharedPtrHelper @@ -73,7 +57,7 @@ class STDMapSpaceMap bool isMarkUnused(UInt64 offset, size_t length) override { - auto it = details::findLessEQ(free_map, offset); // first free block <= `offset` + auto it = MapUtils::findLessEQ(free_map, offset); // first free block <= `offset` if (it == free_map.end()) { // No free blocks <= `offset` @@ -85,7 +69,7 @@ class STDMapSpaceMap bool markUsedImpl(UInt64 offset, size_t length) override { - auto it = details::findLessEQ(free_map, offset); // first free block <= `offset` + auto it = MapUtils::findLessEQ(free_map, offset); // first free block <= `offset` if (it == free_map.end()) { return false; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_free_map.cpp b/dbms/src/Storages/Page/V3/tests/gtest_free_map.cpp index 8123722fda4..108c152256a 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_free_map.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_free_map.cpp @@ -11,43 +11,6 @@ namespace DB::PS::V3::tests { -::testing::AssertionResult MapIterCompare( - const char * lhs_expr, - const char * rhs_expr, - const std::map::const_iterator lhs, - const std::pair rhs) -{ - if (lhs->first == rhs.first && lhs->second == rhs.second) - return ::testing::AssertionSuccess(); - return ::testing::internal::EqFailure( - lhs_expr, - rhs_expr, - fmt::format("{{{},{}}}", lhs->first, lhs->second), - fmt::format("{{{}, {}}}", rhs.first, rhs.second), - false); -} - -#define ASSERT_ITER_EQ(iter, val) ASSERT_PRED_FORMAT2(MapIterCompare, iter, val) - -TEST(STDMapUtil, FindLessEqual) -{ - std::map m0{}; - ASSERT_EQ(details::findLessEQ(m0, 1), m0.end()); - - std::map m1{{1, 1}, {2, 2}, {3, 3}, {6, 6}}; - ASSERT_EQ(details::findLessEQ(m1, 0), m1.end()); - ASSERT_ITER_EQ(details::findLessEQ(m1, 1), std::make_pair(1, 1)); - ASSERT_ITER_EQ(details::findLessEQ(m1, 2), std::make_pair(2, 2)); - ASSERT_ITER_EQ(details::findLessEQ(m1, 3), std::make_pair(3, 3)); - for (int x = 4; x < 20; ++x) - { - if (x < 6) - ASSERT_ITER_EQ(details::findLessEQ(m1, x), std::make_pair(3, 3)); - else - ASSERT_ITER_EQ(details::findLessEQ(m1, x), std::make_pair(6, 6)); - } -} - struct Range { size_t start; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_map_utils.cpp b/dbms/src/Storages/Page/V3/tests/gtest_map_utils.cpp new file mode 100644 index 00000000000..20d1d0e65af --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_map_utils.cpp @@ -0,0 +1,67 @@ +#include +#include +#include + +#include +namespace DB::PS::V3::tests +{ +struct Empty +{ +}; + +::testing::AssertionResult MapIterCompare( + const char * lhs_expr, + const char * rhs_expr, + const std::map::const_iterator lhs, + const std::pair rhs) +{ + if (lhs->first == rhs.first) + return ::testing::AssertionSuccess(); + return ::testing::internal::EqFailure( + lhs_expr, + rhs_expr, + fmt::format("{{{}, Empty}}", lhs->first), + fmt::format("{{{}, Empty}}", rhs.first), + false); +} +#define ASSERT_ITER_EQ(iter, val) ASSERT_PRED_FORMAT2(MapIterCompare, iter, val) + +TEST(STDMapUtil, FindLessEqual) +{ + std::map m0{}; + ASSERT_EQ(MapUtils::findLessEQ(m0, 1), m0.end()); + + std::map m1{{1, {}}, {2, {}}, {3, {}}, {6, {}}}; + ASSERT_EQ(MapUtils::findLessEQ(m1, 0), m1.end()); + ASSERT_ITER_EQ(MapUtils::findLessEQ(m1, 1), std::make_pair(1, Empty{})); + ASSERT_ITER_EQ(MapUtils::findLessEQ(m1, 2), std::make_pair(2, Empty{})); + for (int x = 3; x < 20; ++x) + { + if (x < 6) + ASSERT_ITER_EQ(MapUtils::findLessEQ(m1, x), std::make_pair(3, Empty{})); + else + ASSERT_ITER_EQ(MapUtils::findLessEQ(m1, x), std::make_pair(6, Empty{})); + } +} + +TEST(STDMapUtil, FindLess) +{ + std::map m0{}; + ASSERT_EQ(MapUtils::findLess(m0, 1), m0.end()); + + std::map m1{{1, {}}, {2, {}}, {3, {}}, {6, {}}}; + ASSERT_EQ(MapUtils::findLess(m1, 0), m1.end()); + ASSERT_EQ(MapUtils::findLess(m1, 1), m1.end()); + ASSERT_ITER_EQ(MapUtils::findLess(m1, 2), std::make_pair(1, Empty{})); + ASSERT_ITER_EQ(MapUtils::findLess(m1, 3), std::make_pair(2, Empty{})); + ASSERT_ITER_EQ(MapUtils::findLess(m1, 4), std::make_pair(3, Empty{})); + for (int x = 5; x < 20; ++x) + { + if (x <= 6) + ASSERT_ITER_EQ(MapUtils::findLess(m1, x), std::make_pair(3, Empty{})); + else + ASSERT_ITER_EQ(MapUtils::findLess(m1, x), std::make_pair(6, Empty{})); + } +} + +} // namespace DB::PS::V3::tests diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp new file mode 100644 index 00000000000..e9f8f24db8f --- /dev/null +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -0,0 +1,576 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ +extern const int PS_ENTRY_NOT_EXISTS; +extern const int PS_ENTRY_NO_VALID_VERSION; +} // namespace ErrorCodes +namespace PS::V3::tests +{ +String toString(const PageEntryV3 & entry) +{ + return fmt::format("PageEntry{{file: {}, offset: 0x{:X}, size: {}, checksum: 0x{:X}}}", entry.file_id, entry.offset, entry.size, entry.checksum); +} + +inline bool isSameEntry(const PageEntryV3 & lhs, const PageEntryV3 & rhs) +{ + // Maybe need more fields check later + return (lhs.file_id == rhs.file_id && lhs.offset == rhs.offset && lhs.size == rhs.size); +} + +::testing::AssertionResult getEntryCompare( + const char * expected_entry_expr, + const char * dir_expr, + const char * page_id_expr, + const char * snap_expr, + const PageEntryV3 & expected_entry, + const PageDirectory & dir, + const PageId page_id, + const PageDirectorySnapshotPtr & snap) +{ + auto check_id_entry = [&](const PageIDAndEntryV3 & expected_id_entry, const PageIDAndEntryV3 & actual_id_entry) -> ::testing::AssertionResult { + const auto & [pid, entry] = actual_id_entry; + String err_msg; + if (pid != expected_id_entry.first) + { + err_msg = fmt::format("Try to get entry [id={}] but get [id={}]", page_id_expr, pid); + return ::testing::AssertionFailure(::testing::Message(err_msg.c_str())); + } + if (isSameEntry(entry, expected_entry)) + { + return ::testing::AssertionSuccess(); + } + // else not the expected entry we want + auto actual_expr = fmt::format("Get entry [id={}] from {} with snap{}", page_id_expr, dir_expr, snap_expr); + return testing::internal::EqFailure( + expected_entry_expr, + actual_expr.c_str(), + toString(expected_entry), + toString(entry), + false); + }; + String error; + try + { + auto id_entry = dir.get(page_id, snap); + return check_id_entry({page_id, expected_entry}, id_entry); + } + catch (DB::Exception & ex) + { + if (ex.code() == ErrorCodes::PS_ENTRY_NOT_EXISTS) + error = fmt::format("Try to get entry [id={}] but not exists. Err message: {}", page_id_expr, ex.message()); + else if (ex.code() == ErrorCodes::PS_ENTRY_NO_VALID_VERSION) + error = fmt::format("Try to get entry [id={}] with version {} from {} but failed. Err message: {}", page_id_expr, snap->sequence, snap_expr, ex.message()); + else + error = ex.displayText(); + return ::testing::AssertionFailure(::testing::Message(error.c_str())); + } + catch (...) + { + error = getCurrentExceptionMessage(true); + } + return ::testing::AssertionFailure(::testing::Message(error.c_str())); +} +#define ASSERT_ENTRY_EQ(expected_entry, dir, pid, snap) \ + ASSERT_PRED_FORMAT4(getEntryCompare, expected_entry, dir, pid, snap) +#define EXPECT_ENTRY_EQ(expected_entry, dir, pid, snap) \ + EXPECT_PRED_FORMAT4(getEntryCompare, expected_entry, dir, pid, snap) + +String toString(const PageIDAndEntriesV3 & entries) +{ + FmtBuffer buf; + buf.append("["); + buf.joinStr( + entries.begin(), + entries.end(), + [](const PageIDAndEntryV3 & id_entry, FmtBuffer & buf) { + buf.fmtAppend("<{},{}>", id_entry.first, toString(id_entry.second)); + }, + ", "); + buf.append("]"); + return buf.toString(); +} +::testing::AssertionResult getEntriesCompare( + const char * expected_entries_expr, + const char * dir_expr, + const char * page_ids_expr, + const char * snap_expr, + const PageIDAndEntriesV3 & expected_entries, + const PageDirectory & dir, + const PageIds page_ids, + const PageDirectorySnapshotPtr & snap) +{ + auto check_id_entries = [&](const PageIDAndEntriesV3 & expected_id_entries, const PageIDAndEntriesV3 & actual_id_entries) -> ::testing::AssertionResult { + if (expected_id_entries.size() == actual_id_entries.size()) + { + for (size_t idx = 0; idx == expected_id_entries.size(); ++idx) + { + const auto & expected_id_entry = expected_id_entries[idx]; + const auto & actual_id_entry = expected_id_entries[idx]; + if (actual_id_entry.first != expected_id_entry.first) + { + auto err_msg = fmt::format("Try to get entry [id={}] but get [id={}] at [index={}]", expected_id_entry.first, actual_id_entry.first, idx); + return ::testing::AssertionFailure(::testing::Message(err_msg.c_str())); + } + if (!isSameEntry(expected_id_entry.second, actual_id_entry.second)) + { + // not the expected entry we want + String err_msg; + auto expect_expr = fmt::format("Entry at {} [index={}]", idx); + auto actual_expr = fmt::format("Get entries {} from {} with snap {} [index={}", page_ids_expr, dir_expr, snap_expr, idx); + return testing::internal::EqFailure( + expect_expr.c_str(), + actual_expr.c_str(), + toString(expected_id_entry.second), + toString(actual_id_entry.second), + false); + } + } + return ::testing::AssertionSuccess(); + } + + // else not the expected entry we want + auto expected_expr = fmt::format("Entries from {} [size={}]", expected_entries_expr, expected_entries.size()); + auto actual_expr = fmt::format("Get entries {} from {} with snap {}, [size={}]", page_ids_expr, dir_expr, snap_expr, actual_id_entries.size()); + return testing::internal::EqFailure( + expected_expr.c_str(), + actual_expr.c_str(), + toString(expected_entries), + toString(actual_id_entries), + false); + }; + String error; + try + { + auto id_entries = dir.get(page_ids, snap); + return check_id_entries(expected_entries, id_entries); + } + catch (DB::Exception & ex) + { + if (ex.code() == ErrorCodes::PS_ENTRY_NOT_EXISTS) + error = fmt::format("Try to get entries with [ids={}] but not exists. Err message: {}", page_ids_expr, ex.message()); + else if (ex.code() == ErrorCodes::PS_ENTRY_NO_VALID_VERSION) + error = fmt::format("Try to get entries with [ids={}] with version {} from {} but failed. Err message: {}", page_ids_expr, snap->sequence, snap_expr, ex.message()); + else + error = ex.displayText(); + return ::testing::AssertionFailure(::testing::Message(error.c_str())); + } + catch (...) + { + error = getCurrentExceptionMessage(true); + } + return ::testing::AssertionFailure(::testing::Message(error.c_str())); +} +#define ASSERT_ENTRIES_EQ(expected_entries, dir, pid, snap) \ + ASSERT_PRED_FORMAT4(getEntriesCompare, expected_entries, dir, pid, snap) +#define EXPECT_ENTRIES_EQ(expected_entries, dir, pid, snap) \ + EXPECT_PRED_FORMAT4(getEntriesCompare, expected_entries, dir, pid, snap) + +::testing::AssertionResult getEntryNotExist( + const char * dir_expr, + const char * page_id_expr, + const char * snap_expr, + const PageDirectory & dir, + const PageId page_id, + const PageDirectorySnapshotPtr & snap) +{ + String error; + try + { + auto id_entry = dir.get(page_id, snap); + error = fmt::format( + "Expect entry [id={}] from {} with snap{} not exist, but got <{}, {}>", + page_id_expr, + dir_expr, + snap_expr, + id_entry.first, + toString(id_entry.second)); + } + catch (DB::Exception & ex) + { + if (ex.code() == ErrorCodes::PS_ENTRY_NOT_EXISTS || ex.code() == ErrorCodes::PS_ENTRY_NO_VALID_VERSION) + return ::testing::AssertionSuccess(); + else + error = ex.displayText(); + return ::testing::AssertionFailure(::testing::Message(error.c_str())); + } + catch (...) + { + error = getCurrentExceptionMessage(true); + } + return ::testing::AssertionFailure(::testing::Message(error.c_str())); +} +#define EXPECT_ENTRY_NOT_EXIST(dir, pid, snap) \ + EXPECT_PRED_FORMAT3(getEntryNotExist, dir, pid, snap) +::testing::AssertionResult getEntriesNotExist( + const char * dir_expr, + const char * page_ids_expr, + const char * snap_expr, + const PageDirectory & dir, + const PageIds page_ids, + const PageDirectorySnapshotPtr & snap) +{ + String error; + try + { + auto id_entry = dir.get(page_ids, snap); + error = fmt::format( + "Expect entry [id={}] from {} with snap{} not exist, but got {}", + page_ids_expr, + dir_expr, + snap_expr, + toString(id_entry)); + } + catch (DB::Exception & ex) + { + if (ex.code() == ErrorCodes::PS_ENTRY_NOT_EXISTS || ex.code() == ErrorCodes::PS_ENTRY_NO_VALID_VERSION) + return ::testing::AssertionSuccess(); + else + error = ex.displayText(); + return ::testing::AssertionFailure(::testing::Message(error.c_str())); + } + catch (...) + { + error = getCurrentExceptionMessage(true); + } + return ::testing::AssertionFailure(::testing::Message(error.c_str())); +} +#define EXPECT_ENTRIES_NOT_EXIST(dir, pids, snap) \ + EXPECT_PRED_FORMAT3(getEntriesNotExist, dir, pids, snap) + +class PageDirectoryTest : public ::testing::Test +{ +protected: + PageDirectory dir; +}; + +TEST_F(PageDirectoryTest, ApplyPutRead) +try +{ + auto snap0 = dir.createSnapshot(); + EXPECT_ENTRY_NOT_EXIST(dir, 1, snap0); + + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + dir.apply(std::move(edit)); + } + + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); + + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_NOT_EXIST(dir, 2, snap1); // creating snap2 won't affect the result of snap1 + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap2); + { + PageIds ids{1, 2}; + PageIDAndEntriesV3 expected_entries{{1, entry1}, {2, entry2}}; + EXPECT_ENTRIES_EQ(expected_entries, dir, ids, snap2); + } +} +CATCH + +TEST_F(PageDirectoryTest, ApplyPutDelRead) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + + PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry4{.file_id = 4, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.del(2); + edit.put(3, entry3); + edit.put(4, entry4); + dir.apply(std::move(edit)); + } + + auto snap2 = dir.createSnapshot(); + // sanity check for snap1 + EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1); + // check for snap2 + EXPECT_ENTRY_NOT_EXIST(dir, 2, snap2); // deleted + EXPECT_ENTRY_EQ(entry1, dir, 1, snap2); + EXPECT_ENTRY_EQ(entry3, dir, 3, snap2); + EXPECT_ENTRY_EQ(entry4, dir, 4, snap2); + { + PageIds ids{1, 3, 4}; + PageIDAndEntriesV3 expected_entries{{1, entry1}, {3, entry3}, {4, entry4}}; + EXPECT_ENTRIES_EQ(expected_entries, dir, ids, snap2); + } +} +CATCH + +TEST_F(PageDirectoryTest, ApplyUpdateOnRefEntries) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3->2 + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + + // Update 3, 2 won't get updated. Update 2, 3 won't get updated. + // Note that users should not rely on this behavior + PageEntryV3 entry_updated{.file_id = 999, .size = 16, .offset = 0x123, .checksum = 0x123}; + { + PageEntriesEdit edit; + edit.put(3, entry_updated); + dir.apply(std::move(edit)); + } + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry_updated, dir, 3, snap2); + + PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .offset = 0x123, .checksum = 0x123}; + { + PageEntriesEdit edit; + edit.put(2, entry_updated2); + dir.apply(std::move(edit)); + } + auto snap3 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry_updated, dir, 3, snap2); + EXPECT_ENTRY_EQ(entry_updated2, dir, 2, snap3); + EXPECT_ENTRY_EQ(entry_updated, dir, 3, snap3); +} +CATCH + +TEST_F(PageDirectoryTest, ApplyDeleteOnRefEntries) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3->2 + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + + // Delete 3, 2 won't get deleted. + { + PageEntriesEdit edit; + edit.del(3); + dir.apply(std::move(edit)); + } + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_NOT_EXIST(dir, 3, snap2); + + // Delete 2, 3 won't get deleted. + { + PageEntriesEdit edit; + edit.del(2); + dir.apply(std::move(edit)); + } + auto snap3 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_NOT_EXIST(dir, 3, snap2); + EXPECT_ENTRY_NOT_EXIST(dir, 2, snap3); + EXPECT_ENTRY_NOT_EXIST(dir, 3, snap3); +} +CATCH + +/// Put ref page to ref page, ref path collapse to normal page +TEST_F(PageDirectoryTest, ApplyRefOnRefEntries) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3->2 + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + + // Ref 4 -> 3 + { + PageEntriesEdit edit; + edit.ref(4, 3); + dir.apply(std::move(edit)); + } + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap2); + EXPECT_ENTRY_EQ(entry2, dir, 4, snap2); +} +CATCH + +/// Put duplicated RefPages in different WriteBatch +TEST_F(PageDirectoryTest, ApplyDuplicatedRefEntries) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3->2 + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + + // Ref 3 -> 2 + { + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap2); +} +CATCH + +/// Put duplicated RefPages due to ref-path-collapse +TEST_F(PageDirectoryTest, ApplyCollapseDuplicatedRefEntries) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3->2 + PageEntriesEdit edit; + edit.ref(3, 2); + dir.apply(std::move(edit)); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + + + { // Ref 4 -> 3, collapse to 4 -> 2 + PageEntriesEdit edit; + edit.ref(4, 3); + dir.apply(std::move(edit)); + } + auto snap2 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); + EXPECT_ENTRY_EQ(entry2, dir, 3, snap2); + EXPECT_ENTRY_EQ(entry2, dir, 4, snap2); +} +CATCH + +TEST_F(PageDirectoryTest, ApplyRefToNotExistEntry) +try +{ + PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + { + PageEntriesEdit edit; + edit.put(1, entry1); + edit.put(2, entry2); + dir.apply(std::move(edit)); + } + + { // Ref 3-> 999 + PageEntriesEdit edit; + edit.ref(3, 999); + ASSERT_THROW({ dir.apply(std::move(edit)); }, DB::Exception); + } + auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1); + + // TODO: restore, invalid ref page is filtered +} +CATCH +} // namespace PS::V3::tests +} // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp index b561d630c46..4c34d8ac419 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp @@ -236,8 +236,7 @@ class LogFileRWTest : public ::testing::TestWithParam> std::unique_ptr getNewReader(const WALRecoveryMode wal_recovery_mode = WALRecoveryMode::TolerateCorruptedTailRecords, size_t log_num = 0) { std::unique_ptr file_reader = std::make_unique(reader_contents, /*fail_after_read_partial_*/ !allow_retry_read); - auto log_reader = std::make_unique(std::move(file_reader), &report, /* verify_checksum */ true, /* log_number */ log_num, wal_recovery_mode, log); - return log_reader; + return std::make_unique(std::move(file_reader), &report, /* verify_checksum */ true, /* log_number */ log_num, wal_recovery_mode, log); } void resetReader(const WALRecoveryMode wal_recovery_mode = WALRecoveryMode::TolerateCorruptedTailRecords)