Skip to content

Commit

Permalink
PageStorage: Fine grained lock on mvcc map (#4137)
Browse files Browse the repository at this point in the history
ref #3594
  • Loading branch information
JaySon-Huang authored Mar 2, 2022
1 parent a89b3bb commit 6611d54
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 227 deletions.
84 changes: 41 additions & 43 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
written_blobs.emplace_back(file_id, file_offset, data_size);
LOG_FMT_INFO(
log,
"BlobStore gc write (partially) done [blobid={}] [file_offset={}] [size={}] [total_size={}]",
"BlobStore gc write (partially) done [blob_id={}] [file_offset={}] [size={}] [total_size={}]",
file_id,
file_offset,
data_size,
Expand All @@ -605,62 +605,60 @@ PageEntriesEdit BlobStore::gc(std::map<BlobFileId, PageIdAndVersionedEntries> &
}
};

// Although this is a three-layer loop
// the amount of data can be seen as linear and fixed
// If it is changed to a single-layer loop, the parameters will change.
// which will bring more memory waste, and the execution speed will not be faster
// blob_file_0, [<page_id_0, ver0, entry0>,
// <page_id_0, ver1, entry1>,
// <page_id_1, ver1, entry1>, ... ]
// blob_file_1, [...]
// ...
for (const auto & [file_id, versioned_pageid_entry_list] : entries_need_gc)
{
for (const auto & [page_id, versioned_entry] : versioned_pageid_entry_list)
for (const auto & [page_id, versioned, entry] : versioned_pageid_entry_list)
{
for (const auto & [versioned, entry] : versioned_entry)
// When we can't load the remaining data.
// we will use the original buffer to find an area to load the remaining data
if (offset_in_data + entry.size > config_file_limit)
{
// When we can't load the remaining data.
// we will use the original buffer to find an area to load the remaining data
if (offset_in_data + entry.size > config_file_limit)
assert(file_offset_beg == 0);
// Remove the span that is not actually used
if (offset_in_data != alloc_size)
{
assert(file_offset_beg == 0);
// Remove the span that is not actually used
if (offset_in_data != alloc_size)
{
removePosFromStats(blobfile_id, offset_in_data, alloc_size - offset_in_data);
}
remaining_page_size += alloc_size - offset_in_data;

// Write data into Blob.
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);

// Reset the position to reuse the buffer allocated
data_pos = data_buf;
offset_in_data = 0;

// Acquire a span from stats for remaining data
auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
remaining_page_size -= next_alloc_size;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size);
removePosFromStats(blobfile_id, offset_in_data, alloc_size - offset_in_data);
}
remaining_page_size += alloc_size - offset_in_data;

PageEntryV3 new_entry;
// Write data into Blob.
write_blob(blobfile_id, data_buf, file_offset_beg, offset_in_data);

read(file_id, entry.offset, data_pos, entry.size, read_limiter);
// Reset the position to reuse the buffer allocated
data_pos = data_buf;
offset_in_data = 0;

// No need do crc again, crc won't be changed.
new_entry.checksum = entry.checksum;
// Acquire a span from stats for remaining data
auto next_alloc_size = (remaining_page_size > config_file_limit ? config_file_limit : remaining_page_size);
remaining_page_size -= next_alloc_size;
std::tie(blobfile_id, file_offset_beg) = getPosFromStats(next_alloc_size);
}

// Need copy the field_offsets
new_entry.field_offsets = entry.field_offsets;
PageEntryV3 new_entry;

// Entry size won't be changed.
new_entry.size = entry.size;
read(file_id, entry.offset, data_pos, entry.size, read_limiter);

new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_beg + offset_in_data;
// No need do crc again, crc won't be changed.
new_entry.checksum = entry.checksum;

offset_in_data += new_entry.size;
data_pos += new_entry.size;
// Need copy the field_offsets
new_entry.field_offsets = entry.field_offsets;

edit.upsertPage(page_id, versioned, new_entry);
}
// Entry size won't be changed.
new_entry.size = entry.size;

new_entry.file_id = blobfile_id;
new_entry.offset = file_offset_beg + offset_in_data;

offset_in_data += new_entry.size;
data_pos += new_entry.size;

edit.upsertPage(page_id, versioned, new_entry);
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern const int LOGICAL_ERROR;
namespace PS::V3
{
class CollapsingPageDirectory;
using PageIdAndVersionedEntries = std::vector<std::pair<PageId, VersionedEntries>>;
using PageIdAndVersionedEntries = std::vector<std::tuple<PageId, PageVersionType, PageEntryV3>>;

class BlobStore : public Allocator<false>
{
Expand Down
143 changes: 86 additions & 57 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,34 @@ std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
return getEntryNotSafe(seq);
}

std::pair<VersionedEntries, PageSize> VersionedPageEntries::getEntriesByBlobId(BlobFileId blob_id)
PageSize VersionedPageEntries::getEntriesByBlobIds(
const std::unordered_set<BlobFileId> & blob_ids,
PageId page_id,
std::map<BlobFileId, PageIdAndVersionedEntries> & blob_versioned_entries)
{
PageSize single_page_size = 0;
VersionedEntries versioned_entries;

// blob_file_0, [<page_id_0, ver0, entry0>,
// <page_id_0, ver1, entry1>,
// <page_id_1, ver1, entry1> ]
// blob_file_1, [...]
// ...
// the total entries size taken out
PageSize total_entries_size = 0;
auto page_lock = acquireLock();
for (const auto & [versioned_type, entry_del] : entries)
for (const auto & [versioned_type, entry_or_del] : entries)
{
if (entry_del.is_delete)
if (entry_or_del.is_delete)
{
continue;
}

const auto & entry = entry_del.entry;

if (entry.file_id == blob_id)
const auto & entry = entry_or_del.entry;
if (blob_ids.count(entry.file_id) > 0)
{
single_page_size += entry.size;
versioned_entries.emplace_back(std::make_pair(versioned_type, entry_del.entry));
blob_versioned_entries[entry.file_id].emplace_back(page_id, versioned_type, entry);
total_entries_size += entry.size;
}
}

return std::make_pair(std::move(versioned_entries), single_page_size);
return total_entries_size;
}

std::pair<PageEntriesV3, bool> VersionedPageEntries::deleteAndGC(UInt64 lowest_seq)
Expand Down Expand Up @@ -396,7 +401,10 @@ std::set<PageId> PageDirectory::getAllPageIds()

void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter)
{
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
// Note that we need to make sure increasing `sequence` in order, so it
// also needs to be protected by `write_lock` throughout the `apply`
// TODO: It is totally serialized, make it a pipeline
std::unique_lock write_lock(table_rw_mutex);
UInt64 last_sequence = sequence.load();

// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0]
Expand Down Expand Up @@ -498,25 +506,26 @@ void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write

std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter)
{
for (auto & record : migrated_edit.getMutRecords())
{
std::shared_lock read_lock(table_rw_mutex);
for (auto & record : migrated_edit.getMutRecords())
MVCCMapType::const_iterator iter;
{
auto iter = mvcc_table_directory.find(record.page_id);
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.find(record.page_id);
if (unlikely(iter == mvcc_table_directory.end()))
{
throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Can't found [page_id={}] while doing gcApply", record.page_id), ErrorCodes::LOGICAL_ERROR);
}
} // release the read lock on `table_rw_mutex`

// Increase the epoch for migrated record.
record.version.epoch += 1;
// Increase the epoch for migrated record.
record.version.epoch += 1;

// Append the gc version to version list
auto & versioned_entries = iter->second;
auto page_lock = versioned_entries->acquireLock();
versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry);
}
} // Then we should release the read lock on `table_rw_mutex`
// Append the gc version to version list
const auto & versioned_entries = iter->second;
auto page_lock = versioned_entries->acquireLock();
versioned_entries->createNewVersion(record.version.sequence, record.version.epoch, record.entry);
}

// Apply migrate edit into WAL with the increased epoch version
wal->apply(migrated_edit, write_limiter);
Expand All @@ -530,37 +539,44 @@ std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool n
}

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc)
PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) const
{
std::map<BlobFileId, PageIdAndVersionedEntries> blob_versioned_entries;
std::unordered_set<BlobFileId> blob_id_set;
for (const auto blob_id : blob_ids)
blob_id_set.insert(blob_id);
assert(blob_id_set.size() == blob_ids.size());

std::map<BlobFileId, PageIdAndVersionedEntries> blob_versioned_entries;
PageSize total_page_size = 0;

MVCCMapType::const_iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.cbegin();
if (iter == mvcc_table_directory.end())
return {blob_versioned_entries, total_page_size};
}

for (const auto & blob_id : blob_need_gc)
{
PageIdAndVersionedEntries versioned_pageid_entries;

for (const auto & [page_id, version_entries] : mvcc_table_directory)
{
VersionedEntries versioned_entries;
PageSize page_size;
std::tie(versioned_entries, page_size) = version_entries->getEntriesByBlobId(blob_id);
if (page_size != 0)
{
versioned_pageid_entries.emplace_back(
std::make_pair(page_id, std::move(versioned_entries)));
total_page_size += page_size;
}
}

if (versioned_pageid_entries.empty())
{
throw Exception(fmt::format("Can't get any entries from [BlobFileId={}]", blob_id));
}
while (true)
{
// `iter` is an iter that won't be invalid cause by `apply`/`gcApply`.
// do scan on the version list without lock on `mvcc_table_directory`.
PageId page_id = iter->first;
const auto & version_entries = iter->second;
total_page_size += version_entries->getEntriesByBlobIds(blob_id_set, page_id, blob_versioned_entries);

blob_versioned_entries[blob_id] = std::move(versioned_pageid_entries);
{
std::shared_lock read_lock(table_rw_mutex);
iter++;
if (iter == mvcc_table_directory.end())
break;
}
}
for (const auto blob_id : blob_ids)
{
if (blob_versioned_entries.find(blob_id) == blob_versioned_entries.end())
{
throw Exception(fmt::format("Can't get any entries from [blob_id={}]", blob_id));
}
}
return std::make_pair(std::move(blob_versioned_entries), total_page_size);
Expand Down Expand Up @@ -590,16 +606,26 @@ std::vector<PageEntriesV3> PageDirectory::gc(const WriteLimiterPtr & write_limit
}

std::vector<PageEntriesV3> all_del_entries;
MVCCMapType::iterator iter;
{
std::shared_lock read_lock(table_rw_mutex);
iter = mvcc_table_directory.begin();
if (iter == mvcc_table_directory.end())
return all_del_entries;
}

while (true)
{
std::unique_lock write_lock(table_rw_mutex);
for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/)
// `iter` is an iter that won't be invalid cause by `apply`/`gcApply`.
// do gc on the version list without lock on `mvcc_table_directory`.
const auto & [del_entries, all_deleted] = iter->second->deleteAndGC(lowest_seq);
if (!del_entries.empty())
{
const auto & [del_entries, all_deleted] = iter->second->deleteAndGC(lowest_seq);
if (!del_entries.empty())
{
all_del_entries.emplace_back(std::move(del_entries));
}
all_del_entries.emplace_back(std::move(del_entries));
}

{
std::unique_lock write_lock(table_rw_mutex);
if (all_deleted)
{
iter = mvcc_table_directory.erase(iter);
Expand All @@ -608,6 +634,9 @@ std::vector<PageEntriesV3> PageDirectory::gc(const WriteLimiterPtr & write_limit
{
iter++;
}

if (iter == mvcc_table_directory.end())
break;
}
}

Expand Down
21 changes: 14 additions & 7 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,14 @@ class VersionedPageEntries
std::optional<PageEntryV3> getEntryNotSafe(UInt64 seq) const;

/**
* Take out the `VersionedEntries` which exist in the `BlobFileId`.
* Also return the total size of entries.
* If there are entries point to file in `blob_ids`, take out the <page_id, ver, entry> and
* store them into `blob_versioned_entries`.
* Return the total size of entries in this version list.
*/
std::pair<VersionedEntries, PageSize> getEntriesByBlobId(BlobFileId blob_id);
PageSize getEntriesByBlobIds(
const std::unordered_set<BlobFileId> & blob_ids,
PageId page_id,
std::map<BlobFileId, PageIdAndVersionedEntries> & blob_versioned_entries);

/**
* GC will give a `lowest_seq`.
Expand Down Expand Up @@ -185,11 +189,11 @@ class CollapsingPageDirectory
CollapsingPageDirectory & operator=(const CollapsingPageDirectory &) = delete;
};

// `PageDiectory` store multi-versions entries for the same
// `PageDirectory` store multi-versions entries for the same
// page id. User can acquire a snapshot from it and get a
// consist result by the snapshot.
// All its functions are consider concurrent safe.
// User should call `gc` periodly to remove outdated version
// User should call `gc` periodic to remove outdated version
// of entries in order to keep the memory consumption as well
// as the restoring time in a reasonable level.
class PageDirectory
Expand Down Expand Up @@ -217,7 +221,7 @@ class PageDirectory
void apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter = nullptr);

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc);
getEntriesByBlobIds(const std::vector<BlobFileId> & blob_ids) const;

std::set<PageId> gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter = nullptr);

Expand Down Expand Up @@ -255,7 +259,10 @@ class PageDirectory
private:
std::atomic<UInt64> sequence;
mutable std::shared_mutex table_rw_mutex;
using MVCCMapType = std::unordered_map<PageId, VersionedPageEntriesPtr>;
// Only `std::map` is allow for `MVCCMap`. Cause `std::map::insert` ensure that
// "No iterators or references are invalidated"
// https://en.cppreference.com/w/cpp/container/map/insert
using MVCCMapType = std::map<PageId, VersionedPageEntriesPtr>;
MVCCMapType mvcc_table_directory;

mutable std::mutex snapshots_mutex;
Expand Down
Loading

0 comments on commit 6611d54

Please sign in to comment.