diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index afa8fcf1a6d..8f2b867c929 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -108,37 +108,12 @@ void PageDirectory::apply(PageEntriesEdit && edit) std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline UInt64 last_sequence = sequence.load(); - // stage 1, get the entry to be ref auto snap = createSnapshot(); - for (auto & r : edit.getRecords()) - { - // Set the version of inserted entries - r.version = PageVersionType(last_sequence + 1); - - if (r.type != WriteBatch::WriteType::REF) - { - continue; - } - auto iter = mvcc_table_directory.find(r.ori_page_id); - if (iter == mvcc_table_directory.end()) - { - throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); - } - if (auto entry = iter->second->getEntry(last_sequence); entry) - { - // copy the entry to be ref - r.entry = *entry; - } - else - { - throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); - } - } - // stage 2, persisted the changes to WAL + // stage 1, persisted the changes to WAL // wal.apply(edit); - // stage 3, create entry version list for pageId. nothing need to be rollback + // stage 2, create entry version list for pageId. nothing need to be rollback std::unordered_map> updating_locks; std::vector updating_pages; updating_pages.reserve(edit.size()); @@ -163,7 +138,7 @@ void PageDirectory::apply(PageEntriesEdit && edit) updating_pages.emplace_back(iter->second); } - // stage 4, there are no rollback since we already persist `edit` to WAL, just ignore error if any + // stage 3, there are no rollback since we already persist `edit` to WAL, just ignore error if any const auto & records = edit.getRecords(); for (size_t idx = 0; idx < records.size(); ++idx) { @@ -173,11 +148,31 @@ void PageDirectory::apply(PageEntriesEdit && edit) case WriteBatch::WriteType::PUT: [[fallthrough]]; case WriteBatch::WriteType::UPSERT: - [[fallthrough]]; + updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry); + break; case WriteBatch::WriteType::REF: { - // Put/upsert/ref all should append a new version for this page - updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry); + // We can't handle `REF` before other writes, because `PUT` and `REF` + // maybe in the same WriteBatch. + // Also we can't throw an exception if we can't find the origin page_id, + // because WAL have already applied the change and there is no + // mechanism to roll back changes in the WAL. + auto iter = mvcc_table_directory.find(r.ori_page_id); + if (iter == mvcc_table_directory.end()) + { + LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1); + break; + } + + if (auto entry = iter->second->getEntry(last_sequence); entry) + { + // copy the entry to be ref + updating_pages[idx]->createNewVersion(last_sequence + 1, *entry); + } + else + { + LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1); + } break; } case WriteBatch::WriteType::DEL: diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index 0ef63a29e0b..e1d601a634d 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -96,7 +96,6 @@ class PageEntriesEdit PageId page_id; PageId ori_page_id; PageEntryV3 entry; - PageVersionType version; }; using EditRecords = std::vector; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index d039adcdf23..94a8099bcdc 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -608,6 +608,7 @@ try { PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -615,14 +616,17 @@ try dir.apply(std::move(edit)); } - { // Ref 3-> 999 + { // Ref 4-> 999 PageEntriesEdit edit; - edit.ref(3, 999); - ASSERT_THROW({ dir.apply(std::move(edit)); }, DB::Exception); + edit.put(3, entry3); + edit.ref(4, 999); + dir.apply(std::move(edit)); } auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); - EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1); + EXPECT_ENTRY_EQ(entry3, dir, 3, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1); // TODO: restore, invalid ref page is filtered }