Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PageStorage APIs support throw_on_not_exist. #4736

Merged
merged 7 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ struct Page
std::set<FieldOffset> field_offsets;

public:
inline bool isValid() const { return page_id != INVALID_PAGE_ID; }

ByteBuffer getFieldData(size_t index) const
{
auto iter = field_offsets.find(FieldOffset(index));
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/PageDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ static constexpr NamespaceId TEST_NAMESPACE_ID = 1000;
using PageId = UInt64;
using PageIds = std::vector<PageId>;
using PageIdSet = std::unordered_set<PageId>;
static constexpr PageId INVALID_PAGE_ID = 0;

using PageIdV3Internal = UInt128;
using PageIdV3Internals = std::vector<PageIdV3Internal>;
Expand Down
45 changes: 30 additions & 15 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,47 +218,60 @@ class PageStorage : private boost::noncopyable
// Get some statistics of all living snapshots and the oldest living snapshot.
virtual SnapshotsStatistics getSnapshotsStat() const = 0;

virtual size_t getNumberOfPages() = 0;

void write(WriteBatch && write_batch, const WriteLimiterPtr & write_limiter = nullptr)
{
writeImpl(std::move(write_batch), write_limiter);
}

// If we can't get the entry.
// Then the null entry will be return
PageEntry getEntry(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot = {})
{
return getEntryImpl(ns_id, page_id, snapshot);
}

Page read(NamespaceId ns_id, PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {})
Page read(NamespaceId ns_id, PageId page_id, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return readImpl(ns_id, page_id, read_limiter, snapshot);
return readImpl(ns_id, page_id, read_limiter, snapshot, throw_on_not_exist);
}

PageMap read(NamespaceId ns_id, const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {})
PageMap read(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return readImpl(ns_id, page_ids, read_limiter, snapshot);
return readImpl(ns_id, page_ids, read_limiter, snapshot, throw_on_not_exist);
}

void read(NamespaceId ns_id, const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {})
/**
* If throw_on_not_exist is false, Also we do have some of page_id not found.
* Then the return value will record the all of page_id which not found.
*/
PageIds read(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
{
readImpl(ns_id, page_ids, handler, read_limiter, snapshot);
return readImpl(ns_id, page_ids, handler, read_limiter, snapshot, throw_on_not_exist);
}

using FieldIndices = std::vector<size_t>;
using PageReadFields = std::pair<PageId, FieldIndices>;

PageMap read(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {})
PageMap read(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return readImpl(ns_id, page_fields, read_limiter, snapshot);
return readImpl(ns_id, page_fields, read_limiter, snapshot, throw_on_not_exist);
}

Page read(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter = nullptr, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return readImpl(ns_id, page_field, read_limiter, snapshot, throw_on_not_exist);
}

void traverse(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot = {})
{
traverseImpl(acceptor, snapshot);
}

PageId getNormalPageId(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot = {})
PageId getNormalPageId(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot = {}, bool throw_on_not_exist = true)
{
return getNormalPageIdImpl(ns_id, page_id, snapshot);
return getNormalPageIdImpl(ns_id, page_id, snapshot, throw_on_not_exist);
}

// We may skip the GC to reduce useless reading by default.
Expand All @@ -278,17 +291,19 @@ class PageStorage : private boost::noncopyable

virtual PageEntry getEntryImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot) = 0;

virtual Page readImpl(NamespaceId ns_id, PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) = 0;
virtual Page readImpl(NamespaceId ns_id, PageId page_id, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual PageMap readImpl(NamespaceId ns_id, const PageIds & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual PageMap readImpl(NamespaceId ns_id, const std::vector<PageId> & page_ids, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) = 0;
virtual PageIds readImpl(NamespaceId ns_id, const PageIds & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual void readImpl(NamespaceId ns_id, const std::vector<PageId> & page_ids, const PageHandler & handler, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) = 0;
virtual PageMap readImpl(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual PageMap readImpl(NamespaceId ns_id, const std::vector<PageReadFields> & page_fields, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot) = 0;
virtual Page readImpl(NamespaceId ns_id, const PageReadFields & page_field, const ReadLimiterPtr & read_limiter, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual void traverseImpl(const std::function<void(const DB::Page & page)> & acceptor, SnapshotPtr snapshot) = 0;

virtual PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot) = 0;
virtual PageId getNormalPageIdImpl(NamespaceId ns_id, PageId page_id, SnapshotPtr snapshot, bool throw_on_not_exist) = 0;

virtual bool gcImpl(bool not_skip, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter) = 0;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V1/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Page PageStorage::read(PageId page_id, SnapshotPtr snapshot)
return file_reader->read(to_read)[page_id];
}

PageMap PageStorage::read(const std::vector<PageId> & page_ids, SnapshotPtr snapshot)
PageMap PageStorage::read(const PageIds & page_ids, SnapshotPtr snapshot)
{
if (!snapshot)
{
Expand Down Expand Up @@ -391,7 +391,7 @@ PageMap PageStorage::read(const std::vector<PageId> & page_ids, SnapshotPtr snap
return page_map;
}

void PageStorage::read(const std::vector<PageId> & page_ids, const PageHandler & handler, SnapshotPtr snapshot)
void PageStorage::read(const PageIds & page_ids, const PageHandler & handler, SnapshotPtr snapshot)
{
if (!snapshot)
{
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/V1/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class PageStorage

PageEntry getEntry(PageId page_id, SnapshotPtr snapshot);
Page read(PageId page_id, SnapshotPtr snapshot);
PageMap read(const std::vector<PageId> & page_ids, SnapshotPtr snapshot);
void read(const std::vector<PageId> & page_ids, const PageHandler & handler, SnapshotPtr snapshot);
PageMap read(const PageIds & page_ids, SnapshotPtr snapshot);
void read(const PageIds & page_ids, const PageHandler & handler, SnapshotPtr snapshot);
void traverse(const std::function<void(const Page & page)> & acceptor, SnapshotPtr snapshot);
bool gc();

Expand Down Expand Up @@ -197,8 +197,8 @@ class PageReader
{}

Page read(PageId page_id) const { return storage.read(page_id, snap); }
PageMap read(const std::vector<PageId> & page_ids) const { return storage.read(page_ids, snap); }
void read(const std::vector<PageId> & page_ids, PageHandler & handler) const { storage.read(page_ids, handler, snap); };
PageMap read(const PageIds & page_ids) const { return storage.read(page_ids, snap); }
void read(const PageIds & page_ids, PageHandler & handler) const { storage.read(page_ids, handler, snap); };

PageId getNormalPageId(PageId page_id) const { return storage.getNormalPageId(page_id, snap); }
UInt64 getPageChecksum(PageId page_id) const { return storage.getEntry(page_id, snap).checksum; }
Expand Down
70 changes: 69 additions & 1 deletion dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ PageMap PageFile::Reader::read(PageFile::Reader::FieldReadInfos & to_read, const
for (auto & [page_id, entry, fields] : to_read)
{
(void)page_id;
(void)entry;
// Sort fields to get better read on disk
std::sort(fields.begin(), fields.end());
for (const auto field_index : fields)
Expand Down Expand Up @@ -1070,6 +1069,75 @@ PageMap PageFile::Reader::read(PageFile::Reader::FieldReadInfos & to_read, const
return page_map;
}

Page PageFile::Reader::read(FieldReadInfo & to_read, const ReadLimiterPtr & read_limiter)
{
ProfileEvents::increment(ProfileEvents::PSMReadPages, 1);

size_t buf_size = 0;

std::sort(to_read.fields.begin(), to_read.fields.end());
for (const auto field_index : to_read.fields)
{
buf_size += to_read.entry.getFieldSize(field_index);
}

char * data_buf = static_cast<char *>(alloc(buf_size));
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { free(p, buf_size); });

Page page_rc;
std::set<Page::FieldOffset> fields_offset_in_page;

size_t read_size_this_entry = 0;
char * write_offset = data_buf;

for (const auto field_index : to_read.fields)
{
// TODO: Continuously fields can read by one system call.
const auto [beg_offset, end_offset] = to_read.entry.getFieldOffsets(field_index);
const auto size_to_read = end_offset - beg_offset;
PageUtil::readFile(data_file, to_read.entry.offset + beg_offset, write_offset, size_to_read, read_limiter);
fields_offset_in_page.emplace(field_index, read_size_this_entry);

if constexpr (PAGE_CHECKSUM_ON_READ)
{
auto expect_checksum = to_read.entry.field_offsets[field_index].second;
auto field_checksum = CityHash_v1_0_2::CityHash64(write_offset, size_to_read);
if (unlikely(to_read.entry.size != 0 && field_checksum != expect_checksum))
{
throw Exception(fmt::format("Page [{}] field [{}], entry offset [{}], entry size[{}], checksum not match, "
"broken file: {}, expected: 0x{:X}, but: 0x{:X}",
to_read.page_id,
field_index,
to_read.entry.offset,
to_read.entry.size,
data_file_path,
expect_checksum,
field_checksum),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

read_size_this_entry += size_to_read;
write_offset += size_to_read;
}
}

Page page;
page.page_id = to_read.page_id;
page.data = ByteBuffer(data_buf, write_offset);
page.mem_holder = mem_holder;
page.field_offsets.swap(fields_offset_in_page);

if (unlikely(write_offset != data_buf + buf_size))
{
throw Exception(fmt::format("Pos not match, expect to read {} bytes, but only {}.", buf_size, write_offset - data_buf),
ErrorCodes::LOGICAL_ERROR);
}

last_read_time = Clock::now();

return page;
}

bool PageFile::Reader::isIdle(const Seconds & max_idle_time)
{
if (max_idle_time.count() == 0)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V2/PageFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class PageFile : public Allocator<false>
};
using FieldReadInfos = std::vector<FieldReadInfo>;
PageMap read(FieldReadInfos & to_read, const ReadLimiterPtr & read_limiter = nullptr);
Page read(FieldReadInfo & to_read, const ReadLimiterPtr & read_limiter = nullptr);

bool isIdle(const Seconds & max_idle_time);

Expand Down
Loading