Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the exception caused when put and ref are in the same WriteBatch #3829

Merged
merged 4 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 26 additions & 31 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,12 @@ void PageDirectory::apply(PageEntriesEdit && edit)
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

// stage 1, get the entry to be ref
auto snap = createSnapshot();
for (auto & r : edit.getRecords())
{
// Set the version of inserted entries
r.version = PageVersionType(last_sequence + 1);

if (r.type != WriteBatch::WriteType::REF)
{
continue;
}
auto iter = mvcc_table_directory.find(r.ori_page_id);
if (iter == mvcc_table_directory.end())
{
throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR);
}
if (auto entry = iter->second->getEntry(last_sequence); entry)
{
// copy the entry to be ref
r.entry = *entry;
}
else
{
throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR);
}
}

// stage 2, persisted the changes to WAL
// stage 1, persisted the changes to WAL
// wal.apply(edit);

// stage 3, create entry version list for pageId. nothing need to be rollback
// stage 2, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks;
std::vector<VersionedPageEntriesPtr> updating_pages;
updating_pages.reserve(edit.size());
Expand All @@ -163,7 +138,7 @@ void PageDirectory::apply(PageEntriesEdit && edit)
updating_pages.emplace_back(iter->second);
}

// stage 4, there are no rollback since we already persist `edit` to WAL, just ignore error if any
// stage 3, there are no rollback since we already persist `edit` to WAL, just ignore error if any
const auto & records = edit.getRecords();
for (size_t idx = 0; idx < records.size(); ++idx)
{
Expand All @@ -173,11 +148,31 @@ void PageDirectory::apply(PageEntriesEdit && edit)
case WriteBatch::WriteType::PUT:
[[fallthrough]];
case WriteBatch::WriteType::UPSERT:
[[fallthrough]];
updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry);
break;
case WriteBatch::WriteType::REF:
{
// Put/upsert/ref all should append a new version for this page
updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry);
// We can't handle `REF` before other writes, because `PUT` and `REF`
// maybe in the same WriteBatch.
// Also we can't throw an exception if we can't find the origin page_id,
// because WAL have already applied the change and there is no
// mechanism to roll back changes in the WAL.
auto iter = mvcc_table_directory.find(r.ori_page_id);
if (iter == mvcc_table_directory.end())
{
LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
break;
}

if (auto entry = iter->second->getEntry(last_sequence); entry)
{
// copy the entry to be ref
updating_pages[idx]->createNewVersion(last_sequence + 1, *entry);
}
else
{
LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1);
}
break;
}
case WriteBatch::WriteType::DEL:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Page/V3/PageEntriesEdit.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class PageEntriesEdit
PageId page_id;
PageId ori_page_id;
PageEntryV3 entry;
PageVersionType version;
};
using EditRecords = std::vector<EditRecord>;

Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,21 +608,25 @@ try
{
PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567};
{
PageEntriesEdit edit;
edit.put(1, entry1);
edit.put(2, entry2);
dir.apply(std::move(edit));
}

{ // Ref 3-> 999
{ // Ref 4-> 999
PageEntriesEdit edit;
edit.ref(3, 999);
ASSERT_THROW({ dir.apply(std::move(edit)); }, DB::Exception);
edit.put(3, entry3);
edit.ref(4, 999);
dir.apply(std::move(edit));
}
auto snap1 = dir.createSnapshot();
EXPECT_ENTRY_EQ(entry1, dir, 1, snap1);
EXPECT_ENTRY_EQ(entry2, dir, 2, snap1);
EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1);
EXPECT_ENTRY_EQ(entry3, dir, 3, snap1);
EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1);

// TODO: restore, invalid ref page is filtered
}
Expand Down