Skip to content

Commit

Permalink
Implement delete
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang committed Dec 13, 2021
1 parent ccb060a commit e6bcc78
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 15 deletions.
54 changes: 44 additions & 10 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Common/Exception.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/WriteBatch.h>

#include <mutex>

Expand All @@ -21,7 +22,8 @@ std::optional<PageEntryV3> PageDirectory::VersionedPageEntries::getEntry(UInt64
if (auto iter = MapUtils::findLess(entries, VersionType(seq + 1));
iter != entries.end())
{
return iter->second;
if (!iter->second.is_delete)
return iter->second.entry;
}
return std::nullopt;
}
Expand Down Expand Up @@ -96,23 +98,55 @@ void PageDirectory::apply(PageEntriesEdit && edit)
const auto & records = edit.getRecords();
for (const auto & r : records)
{
auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr));
if (created)
switch (r.type)
{
iter->second = std::make_shared<VersionedPageEntries>();
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
case WriteBatch::WriteType::DEL:
{
auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr));
if (created)
{
iter->second = std::make_shared<VersionedPageEntries>();
}
updating_locks.emplace(r.page_id, iter->second->acquireLock());
updating_pages.emplace_back(iter->second);
break;
}
case WriteBatch::WriteType::REF:
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
break;
}
updating_locks.emplace(r.page_id, iter->second->acquireLock());
updating_pages.emplace_back(iter->second);
}

for (size_t idx = 0; idx < records.size(); ++idx)
{
// Append a new version for this page
updating_pages[idx]->createNewVersion(last_sequence + 1, records[idx].entry);
updating_locks.erase(records[idx].page_id);
const auto & r = records[idx];
switch (r.type)
{
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
{
// Append a new version for this page
updating_pages[idx]->createNewVersion(last_sequence + 1, records[idx].entry);
updating_locks.erase(records[idx].page_id);
break;
}
case WriteBatch::WriteType::DEL:
{
updating_pages[idx]->createDelete(last_sequence + 1);
updating_locks.erase(records[idx].page_id);
break;
}
case WriteBatch::WriteType::REF:
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
break;
}
}
}

// The edit committed, incr the sequence number
// The edit committed, incr the sequence number to publish changes for `createSnapshot`
sequence.fetch_add(1);
}

Expand Down
26 changes: 23 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,27 @@ class PageDirectory
}
};

struct EntryOrDelete
{
bool is_delete;
PageEntryV3 entry;

explicit EntryOrDelete(bool del)
: is_delete(del)
{
assert(del == true);
}
explicit EntryOrDelete(const PageEntryV3 & entry_)
: is_delete(false)
, entry(entry_)
{}
};

using PageLock = std::unique_ptr<std::lock_guard<std::mutex>>;
class VersionedPageEntries
{
public:
PageLock
acquireLock() const
PageLock acquireLock() const
{
return std::make_unique<std::lock_guard<std::mutex>>(m);
}
Expand All @@ -74,12 +89,17 @@ class PageDirectory
entries.emplace(VersionType(seq), entry);
}

void createDelete(UInt64 seq)
{
entries.emplace(VersionType(seq), EntryOrDelete(/*del*/ true));
}

std::optional<PageEntryV3> getEntry(UInt64 seq);

private:
mutable std::mutex m;
// Entries sorted by version
std::map<VersionType, PageEntryV3> entries;
std::map<VersionType, EntryOrDelete> entries;
};
using VersionedPageEntriesPtr = std::shared_ptr<VersionedPageEntries>;

Expand Down
50 changes: 48 additions & 2 deletions dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class PageDirectoryTest : public ::testing::Test
PageDirectory dir;
};

TEST_F(PageDirectoryTest, ApplyRead)
TEST_F(PageDirectoryTest, ApplyPutRead)
try
{
auto snap0 = dir.createSnapshot();
Expand All @@ -245,7 +245,7 @@ try
}

auto snap2 = dir.createSnapshot();
EXPECT_ENTRY_NOT_EXIST(dir, 2, snap1);
EXPECT_ENTRY_NOT_EXIST(dir, 2, snap1); // creating snap2 won't affect the result of snap1
EXPECT_ENTRY_EQ(entry2, dir, 2, snap2);
EXPECT_ENTRY_EQ(entry1, dir, 1, snap2);
{
Expand All @@ -256,5 +256,51 @@ try
}
CATCH

TEST_F(PageDirectoryTest, ApplyPutDelRead)
try
{
PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567};
{
PageEntriesEdit edit;
edit.put(1, entry1);
edit.put(2, entry2);
dir.apply(std::move(edit));
}

auto snap1 = dir.createSnapshot();
EXPECT_ENTRY_EQ(entry1, dir, 1, snap1);
EXPECT_ENTRY_EQ(entry2, dir, 2, snap1);

PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567};
PageEntryV3 entry4{.file_id = 4, .size = 1024, .offset = 0x123, .checksum = 0x4567};
{
PageEntriesEdit edit;
edit.del(2);
edit.put(3, entry3);
edit.put(4, entry4);
dir.apply(std::move(edit));
}

auto snap2 = dir.createSnapshot();
// sanity check for snap1
EXPECT_ENTRY_EQ(entry1, dir, 1, snap1);
EXPECT_ENTRY_EQ(entry2, dir, 2, snap1);
EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1);
EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1);
// check for snap2
EXPECT_ENTRY_NOT_EXIST(dir, 2, snap2); // deleted
EXPECT_ENTRY_EQ(entry1, dir, 1, snap2);
EXPECT_ENTRY_EQ(entry3, dir, 3, snap2);
EXPECT_ENTRY_EQ(entry4, dir, 4, snap2);
{
PageIds ids{1, 3, 4};
PageIDAndEntriesV3 expected_entries{{1, entry1}, {3, entry3}, {4, entry4}};
EXPECT_ENTRIES_EQ(expected_entries, dir, ids, snap2);
}
}
CATCH


} // namespace PS::V3::tests
} // namespace DB

0 comments on commit e6bcc78

Please sign in to comment.