Skip to content

Commit

Permalink
PageStorage: Mvcc directory (without GC/restore) (#3637)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Dec 23, 2021
1 parent 8f53c68 commit 1e0a37a
Show file tree
Hide file tree
Showing 12 changed files with 973 additions and 72 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ extern const int DIVIDED_BY_ZERO = 10011;
extern const int INVALID_TIME = 10012;
extern const int DEADLOCK_AVOIDED = 10013;
extern const int PTHREAD_ERROR = 10014;
extern const int PS_ENTRY_NOT_EXISTS = 10015;
extern const int PS_ENTRY_NO_VALID_VERSION = 10016;
} // namespace ErrorCodes

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/IO/WriteBufferFromString.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class StringHolder
} // namespace detail

/// Creates the string by itself and allows to get it.
class WriteBufferFromOwnString : public detail::StringHolder
class WriteBufferFromOwnString
: public detail::StringHolder
, public WriteBufferFromString
{
public:
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <common/types.h>

Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/Page/V3/MapUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

namespace DB::MapUtils
{
// Return an iterator to the last element whose key is less than or equal to `key`.
// If no such element is found, the past-the-end iterator is returned.
template <typename C>
typename C::const_iterator
findLessEQ(const C & c, const typename C::key_type & key)
{
auto iter = c.upper_bound(key); // first element > `key`
// Nothing greater than key
if (iter == c.cbegin())
return c.cend();
// its prev must be less than or equal to `key`
return --iter;
}

template <typename C>
typename C::const_iterator
findLess(const C & c, const typename C::key_type & key)
{
auto iter = c.lower_bound(key); // first element >= `key`
if (iter == c.cbegin())
return c.cend(); // Nothing < `key`
// its prev must be less than `key`
return --iter;
}

} // namespace DB::MapUtils
201 changes: 201 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#include <Common/Exception.h>
#include <Common/LogWithPrefix.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/WriteBatch.h>
#include <common/logger_useful.h>

#include <memory>
#include <mutex>

namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int PS_ENTRY_NOT_EXISTS;
extern const int PS_ENTRY_NO_VALID_VERSION;
} // namespace ErrorCodes
namespace PS::V3
{
std::optional<PageEntryV3> PageDirectory::VersionedPageEntries::getEntry(UInt64 seq) const
{
auto page_lock = acquireLock();
// entries are sorted by <ver, epoch>, find the first one less than <ver+1, 0>
if (auto iter = MapUtils::findLess(entries, PageVersionType(seq + 1));
iter != entries.end())
{
if (!iter->second.is_delete)
return iter->second.entry;
}
return std::nullopt;
}

PageDirectory::PageDirectory()
: sequence(0)
, log(getLogWithPrefix(nullptr, "PageDirectory"))
{
}

void PageDirectory::restore()
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}

PageDirectorySnapshotPtr PageDirectory::createSnapshot() const
{
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load());
snapshots.emplace_back(std::weak_ptr<PageDirectorySnapshot>(snap));
return snap;
}

PageIDAndEntryV3 PageDirectory::get(PageId page_id, const PageDirectorySnapshotPtr & snap) const
{
MVCCMapType::const_iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.find(page_id);
if (iter == mvcc_table_directory.end())
throw Exception(fmt::format("Entry [id={}] not exist!", page_id), ErrorCodes::PS_ENTRY_NOT_EXISTS);
}

if (auto entry = iter->second->getEntry(snap->sequence); entry)
{
return PageIDAndEntryV3(page_id, *entry);
}

throw Exception(fmt::format("Entry [id={}] [seq={}] not exist!", page_id, snap->sequence), ErrorCodes::PS_ENTRY_NO_VALID_VERSION);
}

PageIDAndEntriesV3 PageDirectory::get(const PageIds & page_ids, const PageDirectorySnapshotPtr & snap) const
{
std::vector<MVCCMapType::const_iterator> iters;
iters.reserve(page_ids.size());
{
std::shared_lock read_lock(table_rw_mutex);
for (size_t idx = 0; idx < page_ids.size(); ++idx)
{
if (auto iter = mvcc_table_directory.find(page_ids[idx]);
iter != mvcc_table_directory.end())
{
iters.emplace_back(iter);
}
else
{
throw Exception(fmt::format("Entry [id={}] at [idx={}] not exist!", page_ids[idx], idx), ErrorCodes::PS_ENTRY_NOT_EXISTS);
}
}
}

PageIDAndEntriesV3 id_entries;
for (size_t idx = 0; idx < page_ids.size(); ++idx)
{
const auto & iter = iters[idx];
if (auto entry = iter->second->getEntry(snap->sequence); entry)
{
id_entries.emplace_back(page_ids[idx], *entry);
}
else
throw Exception(fmt::format("Entry [id={}] [seq={}] at [idx={}] not exist!", page_ids[idx], snap->sequence, idx), ErrorCodes::PS_ENTRY_NO_VALID_VERSION);
}

return id_entries;
}

void PageDirectory::apply(PageEntriesEdit && edit)
{
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

// stage 1, get the entry to be ref
auto snap = createSnapshot();
for (auto & r : edit.getRecords())
{
// Set the version of inserted entries
r.version = PageVersionType(last_sequence + 1);

if (r.type != WriteBatch::WriteType::REF)
{
continue;
}
auto iter = mvcc_table_directory.find(r.ori_page_id);
if (iter == mvcc_table_directory.end())
{
throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR);
}
if (auto entry = iter->second->getEntry(last_sequence); entry)
{
// copy the entry to be ref
r.entry = *entry;
}
else
{
throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR);
}
}

// stage 2, persisted the changes to WAL
// wal.apply(edit);

// stage 3, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, PageLock> updating_locks;
std::vector<VersionedPageEntriesPtr> updating_pages;
updating_pages.reserve(edit.size());
for (const auto & r : edit.getRecords())
{
auto [iter, created] = mvcc_table_directory.insert(std::make_pair(r.page_id, nullptr));
if (created)
{
iter->second = std::make_shared<VersionedPageEntries>();
}
updating_locks.emplace(r.page_id, iter->second->acquireLock());
updating_pages.emplace_back(iter->second);
}

// stage 4, there are no rollback since we already persist `edit` to WAL, just ignore error if any
const auto & records = edit.getRecords();
for (size_t idx = 0; idx < records.size(); ++idx)
{
const auto & r = records[idx];
switch (r.type)
{
case WriteBatch::WriteType::PUT:
[[fallthrough]];
case WriteBatch::WriteType::UPSERT:
[[fallthrough]];
case WriteBatch::WriteType::REF:
{
// Put/upsert/ref all should append a new version for this page
updating_pages[idx]->createNewVersion(last_sequence + 1, records[idx].entry);
updating_locks.erase(records[idx].page_id);
break;
}
case WriteBatch::WriteType::DEL:
{
updating_pages[idx]->createDelete(last_sequence + 1);
updating_locks.erase(records[idx].page_id);
break;
}
}
}

// The edit committed, incr the sequence number to publish changes for `createSnapshot`
sequence.fetch_add(1);
}

bool PageDirectory::gc()
{
// Cleanup released snapshots
for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */)
{
if (iter->expired())
iter = snapshots.erase(iter);
else
++iter;
}
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
}

} // namespace PS::V3
} // namespace DB
71 changes: 58 additions & 13 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#pragma once

#include <Common/LogWithPrefix.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V3/MapUtils.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageEntry.h>
#include <Storages/Page/V3/WALStore.h>
#include <common/types.h>

#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>

Expand All @@ -15,44 +19,85 @@ namespace DB::PS::V3
class PageDirectorySnapshot : public DB::PageStorageSnapshot
{
public:
UInt64 sequence;
explicit PageDirectorySnapshot(UInt64 seq)
: sequence(seq)
{}
};
using PageDirectorySnapshotPtr = std::shared_ptr<PageDirectorySnapshot>;

class PageDirectory
{
public:
PageDirectory();

void restore();

PageDirectorySnapshotPtr createSnapshot() const;

PageIDAndEntriesV3 get(const PageId & read_id, const PageDirectorySnapshotPtr & snap) const;
PageIDAndEntriesV3 get(const PageIds & read_ids, const PageDirectorySnapshotPtr & snap) const;
PageIDAndEntryV3 get(PageId page_id, const PageDirectorySnapshotPtr & snap) const;
PageIDAndEntriesV3 get(const PageIds & page_ids, const PageDirectorySnapshotPtr & snap) const;

void apply(PageEntriesEdit && edit);

bool gc();

private:
struct VersionType
{
UInt64 sequence = 0;
UInt64 epoch = 0;
};
struct VersionedPageEntry
struct EntryOrDelete
{
VersionType ver;
bool is_delete;
PageEntryV3 entry;

explicit EntryOrDelete(bool del)
: is_delete(del)
{
assert(del == true);
}
explicit EntryOrDelete(const PageEntryV3 & entry_)
: is_delete(false)
, entry(entry_)
{}
};

using PageLock = std::unique_ptr<std::lock_guard<std::mutex>>;
class VersionedPageEntries
{
std::list<VersionedPageEntry> entries;
public:
PageLock acquireLock() const
{
return std::make_unique<std::lock_guard<std::mutex>>(m);
}

void createNewVersion(UInt64 seq, const PageEntryV3 & entry)
{
entries.emplace(PageVersionType(seq), entry);
}

void createDelete(UInt64 seq)
{
entries.emplace(PageVersionType(seq), EntryOrDelete(/*del*/ true));
}

std::optional<PageEntryV3> getEntry(UInt64 seq) const;

private:
mutable std::mutex m;
// Entries sorted by version
std::map<PageVersionType, EntryOrDelete> entries;
};
using VersionedPageEntriesPtr = std::shared_ptr<VersionedPageEntries>;

std::shared_mutex table_rw_mutex;
std::unordered_map<PageId, VersionedPageEntries> mvcc_table_directory;
private:
std::atomic<UInt64> sequence;
mutable std::shared_mutex table_rw_mutex;
using MVCCMapType = std::unordered_map<PageId, VersionedPageEntriesPtr>;
MVCCMapType mvcc_table_directory;

std::list<std::weak_ptr<PageDirectorySnapshot>> snapshots;
mutable std::list<std::weak_ptr<PageDirectorySnapshot>> snapshots;

WALStore wal;

LogWithPrefixPtr log;
};

} // namespace DB::PS::V3
Loading

0 comments on commit 1e0a37a

Please sign in to comment.