Skip to content

Commit

Permalink
Implememt pagestorage v3 (#3909)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqizho authored Jan 28, 2022
1 parent 30cc13c commit fe880ee
Show file tree
Hide file tree
Showing 10 changed files with 995 additions and 66 deletions.
6 changes: 3 additions & 3 deletions dbms/src/Common/tests/gtest_cpu_affinity_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
#include <Common/Config/TOMLConfiguration.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <gtest/gtest.h>
#include <unistd.h>

#include <boost/algorithm/string.hpp>

#define private public
#include <Common/CPUAffinityManager.h>
#undef private

#include <unistd.h>

namespace DB
{
namespace tests
Expand Down Expand Up @@ -127,4 +127,4 @@ TEST(CPUAffinityManager_test, CPUAffinityManager)
#endif

} // namespace tests
} // namespace DB
} // namespace DB
8 changes: 5 additions & 3 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,13 @@ void BlobStore::remove(const PageEntriesV3 & del_entries)
{
for (const auto & entry : del_entries)
{
// External page size is 0
if (entry.size == 0)
{
throw Exception(fmt::format("Invaild entry. entry size 0. [id={},offset={}]",
entry.file_id,
entry.offset));
continue;
// throw Exception(fmt::format("Invaild entry. entry size 0. [id={}] [offset={}]",
// entry.file_id,
// entry.offset));
}
removePosFromStats(entry.file_id, entry.offset, entry.size);
}
Expand Down
31 changes: 15 additions & 16 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,25 +216,24 @@ void CollapsingPageDirectory::dumpTo(std::unique_ptr<LogWriter> & log_writer)
*************************/

PageDirectory::PageDirectory()
: sequence(0)
, log(getLogWithPrefix(nullptr, "PageDirectory"))
: PageDirectory(0, nullptr)
{
}

PageDirectory PageDirectory::create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const WriteLimiterPtr & write_limiter)
PageDirectory::PageDirectory(UInt64 init_seq, WALStorePtr && wal_)
: sequence(init_seq)
, wal(std::move(wal_))
, log(getLogWithPrefix(nullptr, "PageDirectory"))
{
// TODO: Speedup restoring
CollapsingPageDirectory in_mem_directory;
auto callback = [&in_mem_directory](PageEntriesEdit && edit) {
in_mem_directory.apply(std::move(edit));
};
}

PageDirectory dir;
PageDirectory PageDirectory::create(const CollapsingPageDirectory & collapsing_directory, WALStorePtr && wal)
{
// Reset the `sequence` to the maximum of persisted.
// PageId max_page_id = in_mem_directory.max_applied_page_id; // TODO: return it to outer function
dir.sequence = in_mem_directory.max_applied_ver.sequence;
dir.wal = WALStore::create(callback, provider, delegator, write_limiter);
for (const auto & [page_id, versioned_entry] : in_mem_directory.table_directory)
PageDirectory dir(
/*init_seq=*/collapsing_directory.max_applied_ver.sequence,
std::move(wal));
for (const auto & [page_id, versioned_entry] : collapsing_directory.table_directory)
{
const auto & version = versioned_entry.first;
const auto & entry = versioned_entry.second;
Expand Down Expand Up @@ -374,7 +373,7 @@ PageId PageDirectory::getMaxId() const
PageId max_page_id = 0;
std::shared_lock read_lock(table_rw_mutex);

for (auto & [page_id, versioned] : mvcc_table_directory)
for (const auto & [page_id, versioned] : mvcc_table_directory)
{
(void)versioned;
max_page_id = std::max(max_page_id, page_id);
Expand Down Expand Up @@ -457,8 +456,8 @@ void PageDirectory::apply(PageEntriesEdit && edit)
// If we already hold the lock from `r.ori_page_id`, Then we should not request it again.
// This can happen when r.ori_page_id have other operating in current writebatch
if (auto entry = (updating_locks.find(r.ori_page_id) != updating_locks.end()
? iter->second->getEntryNotSafe(last_sequence)
: iter->second->getEntry(last_sequence));
? iter->second->getEntryNotSafe(last_sequence + 1)
: iter->second->getEntry(last_sequence + 1));
entry)
{
// copy the entry to be ref
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ class PageDirectory
{
public:
PageDirectory();
PageDirectory(UInt64 init_seq, WALStorePtr && wal);

static PageDirectory create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const WriteLimiterPtr & write_limiter);
static PageDirectory create(const CollapsingPageDirectory & collapsing_directory, WALStorePtr && wal);

PageDirectorySnapshotPtr createSnapshot() const;

Expand Down
175 changes: 141 additions & 34 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#include <Encryption/FileProvider.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/PageStorageImpl.h>
#include <Storages/PathPool.h>

#include "Storages/Page/V3/PageDirectory.h"

namespace DB
{
namespace ErrorCodes
Expand All @@ -20,17 +19,28 @@ PageStorageImpl::PageStorageImpl(
const Config & config_,
const FileProviderPtr & file_provider_)
: DB::PageStorage(name, delegator_, config_, file_provider_)
, log(getLogWithPrefix(nullptr, "PageStorage"))
, blob_store(file_provider_, delegator->defaultPath(), blob_config)
{
// TBD: init blob_store ptr.
}

PageStorageImpl::~PageStorageImpl() = default;


void PageStorageImpl::restore()
{
page_directory = PageDirectory::create(file_provider, delegator, /*write_limiter*/ nullptr);
// TODO: Speedup restoring
CollapsingPageDirectory collapsing_directory;
auto callback = [&collapsing_directory](PageEntriesEdit && edit) {
collapsing_directory.apply(std::move(edit));
};
// Restore `collapsing_directory` from disk
auto wal = WALStore::create(callback, file_provider, delegator, /*write_limiter*/ nullptr);
// PageId max_page_id = collapsing_directory.max_applied_page_id; // TODO: return it to outer function

page_directory = PageDirectory::create(collapsing_directory, std::move(wal));

// TODO: restore BlobStore
}

void PageStorageImpl::drop()
Expand All @@ -40,7 +50,7 @@ void PageStorageImpl::drop()

PageId PageStorageImpl::getMaxId()
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
return page_directory.getMaxId();
}

PageId PageStorageImpl::getNormalPageId(PageId /*page_id*/, SnapshotPtr /*snapshot*/)
Expand All @@ -50,60 +60,140 @@ PageId PageStorageImpl::getNormalPageId(PageId /*page_id*/, SnapshotPtr /*snapsh

DB::PageStorage::SnapshotPtr PageStorageImpl::getSnapshot()
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
return page_directory.createSnapshot();
}

std::tuple<size_t, double, unsigned> PageStorageImpl::getSnapshotsStat() const
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
return page_directory.getSnapshotsStat();
}

void PageStorageImpl::write(DB::WriteBatch && /*write_batch*/, const WriteLimiterPtr & /*write_limiter*/)
void PageStorageImpl::write(DB::WriteBatch && write_batch, const WriteLimiterPtr & write_limiter)
{
// Persist Page data to BlobStore
// PageEntriesEdit edit(write_batch.getWrites().size());
if (unlikely(write_batch.empty()))
return;

throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
// Persist Page data to BlobStore
auto edit = blob_store.write(write_batch, write_limiter);
page_directory.apply(std::move(edit));
}

DB::PageEntry PageStorageImpl::getEntry(PageId /*page_id*/, SnapshotPtr /*snapshot*/)
DB::PageEntry PageStorageImpl::getEntry(PageId page_id, SnapshotPtr snapshot)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

try
{
const auto & [id, entry] = page_directory.get(page_id, snapshot);
(void)id;
// TODO : after `PageEntry` in page.h been moved to v2.
// Then we don't copy from V3 to V2 format
PageEntry entry_ret;
entry_ret.file_id = entry.file_id;
entry_ret.offset = entry.offset;
entry_ret.size = entry.size;
entry_ret.field_offsets = entry.field_offsets;
entry_ret.checksum = entry.checksum;

return entry_ret;
}
catch (DB::Exception & e)
{
LOG_FMT_WARNING(log, "{}", e.message());
return {.file_id = INVALID_BLOBFILE_ID}; // return invalid PageEntry
}
}

DB::Page PageStorageImpl::read(PageId /*page_id*/, const ReadLimiterPtr & /*read_limiter*/, SnapshotPtr /*snapshot*/)
DB::Page PageStorageImpl::read(PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
{
// PageEntryV3 entry = page_directory.get(page_id, snapshot);
// DB::Page page = blob_store.read(entry, read_limiter);
// return page;
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

auto page_entry = page_directory.get(page_id, snapshot);
return blob_store.read(page_entry, read_limiter);
}

PageMap PageStorageImpl::read(const std::vector<PageId> & /*page_ids*/, const ReadLimiterPtr & /*read_limiter*/, SnapshotPtr /*snapshot*/)
PageMap PageStorageImpl::read(const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
{
// PageIDAndEntriesV2 entries = page_directory.get(page_ids, snapshot);
// DB::Page page = blob_store.read(entries, read_limiter);
// return page;
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

auto page_entries = page_directory.get(page_ids, snapshot);
return blob_store.read(page_entries, read_limiter);
}

void PageStorageImpl::read(const std::vector<PageId> & /*page_ids*/, const PageHandler & /*handler*/, const ReadLimiterPtr & /*read_limiter*/, SnapshotPtr /*snapshot*/)
void PageStorageImpl::read(const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

auto page_entries = page_directory.get(page_ids, snapshot);
blob_store.read(page_entries, handler, read_limiter);
}

PageMap PageStorageImpl::read(const std::vector<PageReadFields> & /*page_fields*/, const ReadLimiterPtr & /*read_limiter*/, SnapshotPtr /*snapshot*/)
PageMap PageStorageImpl::read(const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

BlobStore::FieldReadInfos read_infos;
for (const auto & [page_id, field_indices] : page_fields)
{
const auto & [id, entry] = page_directory.get(page_id, snapshot);
(void)id;
auto info = BlobStore::FieldReadInfo(page_id, entry, field_indices);
read_infos.emplace_back(info);
}

return blob_store.read(read_infos, read_limiter);
}

void PageStorageImpl::traverse(const std::function<void(const DB::Page & page)> & /*acceptor*/, SnapshotPtr /*snapshot*/)
void PageStorageImpl::traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
if (!snapshot)
{
snapshot = this->getSnapshot();
}

// TODO: This could hold the read lock of `page_directory` for a long time
const auto & page_ids = page_directory.getAllPageIds();
for (const auto & valid_page : page_ids)
{
const auto & page_entries = page_directory.get(valid_page, snapshot);
acceptor(blob_store.read(page_entries));
}
}

bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limiter*/, const ReadLimiterPtr & /*read_limiter*/)
{
// If another thread is running gc, just return;
bool v = false;
if (!gc_is_running.compare_exchange_strong(v, true))
return false;

SCOPE_EXIT({
bool is_running = true;
gc_is_running.compare_exchange_strong(is_running, false);
});

/// Get all pending external pages and BlobFiles. Note that we should get external pages before BlobFiles.
PathAndIdsVec external_pages;
if (external_pages_scanner)
{
external_pages = external_pages_scanner();
}

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
const auto & del_entries = page_directory.gc();
Expand All @@ -122,7 +212,11 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi

if (blob_need_gc.empty())
{
return true;
if (external_pages_remover)
{
external_pages_remover(external_pages, page_directory.getAllPageIds());
}
return false;
}

// 4. Filter out entries in MVCC by BlobId.
Expand All @@ -132,7 +226,11 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi

if (blob_gc_info.empty())
{
return true;
if (external_pages_remover)
{
external_pages_remover(external_pages, page_directory.getAllPageIds());
}
return false;
}

// 5. Do the BlobStore GC
Expand All @@ -151,13 +249,22 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi
// be reset to correct state during restore. If any exception thrown, then some BlobFiles
// will be remained as "read-only" files while entries in them are useless in actual.
// Those BlobFiles should be cleaned during next restore.
page_directory.gcApply(std::move(gc_edit), false);
const auto & page_ids = page_directory.gcApply(std::move(gc_edit), external_pages_scanner != nullptr);

if (external_pages_remover)
{
external_pages_remover(external_pages, page_ids);
}

return true;
}

void PageStorageImpl::registerExternalPagesCallbacks(ExternalPagesScanner /*scanner*/, ExternalPagesRemover /*remover*/)
void PageStorageImpl::registerExternalPagesCallbacks(ExternalPagesScanner scanner, ExternalPagesRemover remover)
{
throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
assert(scanner != nullptr);
assert(remover != nullptr);
external_pages_scanner = scanner;
external_pages_remover = remover;
}

} // namespace PS::V3
Expand Down
Loading

0 comments on commit fe880ee

Please sign in to comment.