Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC]New page storage #3197

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ else (ENABLE_FAILPOINTS)
message (STATUS "Failpoints are disabled")
endif (ENABLE_FAILPOINTS)

# Enable PageStorage V3 test.
option (ENABLE_V3_PAGESTORAGE "Enables V3 PageStorage" ON)

# Flags for test coverage
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (TEST_COVERAGE_XML "Output XML report for test coverage" OFF)
Expand Down
4 changes: 4 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ add_headers_and_sources(dbms src/Storages/Page/V2)
add_headers_and_sources(dbms src/Storages/Page/V2/mvcc)
add_headers_and_sources(dbms src/Storages/Page/V2/VersionSet)
add_headers_and_sources(dbms src/Storages/Page/V2/gc)
if (ENABLE_V3_PAGESTORAGE)
add_headers_and_sources(dbms src/Storages/Page/V3)
add_headers_and_sources(dbms src/Storages/Page/V3/spacemap)
endif()
add_headers_and_sources(dbms src/Storages/Page/)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ if (ENABLE_TESTS)
add_subdirectory (tests EXCLUDE_FROM_ALL)
add_subdirectory (Transaction/tests EXCLUDE_FROM_ALL)
add_subdirectory (Page/V2/tests EXCLUDE_FROM_ALL)
if (ENABLE_V3_PAGESTORAGE)
add_subdirectory (Page/V3/tests EXCLUDE_FROM_ALL)
endif ()
add_subdirectory (DeltaMerge/tests EXCLUDE_FROM_ALL)
endif ()

6 changes: 5 additions & 1 deletion dbms/src/Storages/Page/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
add_subdirectory(V2)

add_subdirectory(V2)
if (ENABLE_V3_PAGESTORAGE)
add_subdirectory(V3)
endif ()
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.h>
#include <Storages/Page/WriteBatch.h>
#include <Storages/Page/mvcc/VersionSetWithDelta.h>
#include <Storages/PathPool.h>

#include <condition_variable>
#include <functional>
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ inline T get(std::conditional_t<advance, char *&, const char *> pos)
return v;
}

/// Cast and advance sizeof(T) bytes.
template <typename T, bool advance = true>
T * cast(std::conditional_t<advance, char *&, const char *> pos)
{
T * t = reinterpret_cast<T *>(pos);
if constexpr (advance)
pos += sizeof(T);
return t;
}

} // namespace PageUtil

} // namespace DB
278 changes: 278 additions & 0 deletions dbms/src/Storages/Page/V3/BlobFile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
#include <Poco/File.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/Page/V3/BlobFile.h>
#include <Storages/Page/V3/PageSpec.h>
#include <Storages/Page/V3/spacemap/BitMap.h>
#include <stdlib.h>

using namespace DB::PS::V2;
namespace DB::PS::V3
{
BlobFile::BlobFile(String path_, FileProviderPtr file_provider_)
: file_provider{file_provider_}
, path(path_)
, log(&Poco::Logger::get("NewPageStorage"))
, versioned_page_entries("NewPageStorage", version_set_config, log)
{
if (Poco::File page_file_dir(path); !page_file_dir.exists())
{
page_file_dir.createDirectories();
}

if (Poco::File page_file_data(path + PAGE_FILE_DATA); !page_file_data.exists())
{
page_file_data.createFile();
}

// TBD: add a V3 magic number
if (Poco::File page_file_meta(path + PAGE_FILE_META); !page_file_meta.exists())
{
page_file_meta.createFile();
}
else
{
meta_position = page_file_meta.getSize();
}

// todo try/catch here
space_map = SpaceMap::create(path, BMAP64_RBTREE, file_provider_);

data_writer = file_provider->newWritableFile(
path + PAGE_FILE_DATA,
EncryptionPath(path + PAGE_FILE_DATA, ""),
false,
false);

meta_writer = file_provider->newWritableFile(
path + PAGE_FILE_META,
EncryptionPath(path + PAGE_FILE_META, ""),
false,
false);

data_reader = file_provider->newRandomAccessFile(
path + PAGE_FILE_DATA,
EncryptionPath(path + PAGE_FILE_DATA, ""));

meta_reader = file_provider->newRandomAccessFile(
path + PAGE_FILE_META,
EncryptionPath(path + PAGE_FILE_META, ""));
};

void BlobFile::restore()
{
PageEntriesEdit edit;
edit = space_map->restore();
versioned_page_entries.apply(edit);
}

void BlobFile::write(WriteBatch && write_batch)
{
PageEntriesEdit edit;
size_t meta_write_bytes = sizeof(PFMetaHeader);
auto & writes = write_batch.getWrites();
size_t write_batch_size = writes.size();

if (write_batch_size == 0)
{
return;
}
write_batch.setSequence(++write_batch_seq);

// Read and get offset.
UInt64 data_sizes[write_batch_size];
UInt64 offsets_in_file[write_batch_size];
UInt64 total_data_sizes = 0;
size_t index = 0;
for (const auto & write : writes)
{
total_data_sizes += write.size;
data_sizes[index++] = write.size;
if (write.type == WriteBatch::WriteType::PUT || write.type == WriteBatch::WriteType::UPSERT)
{
meta_write_bytes += write.offsets.size() * sizeof(PFMetaFieldOffset);
}

meta_write_bytes += sizeof(PFMeta);
}

// This will multi-io , so give up different offset, just let it combine to big io.
/* space_map->getDataRange(data_sizes, write_batch_size, offsets_in_file, true); */

auto offset_begin = space_map->getDataRange(total_data_sizes, true);
if (offset_begin != UINT64_MAX)
{
space_map->splitDataInRange(data_sizes, write_batch_size, offsets_in_file, offset_begin, total_data_sizes);
}

char * meta_buffer = (char *)alloc(meta_write_bytes);
char * data_buffer = (char *)alloc(total_data_sizes);
SCOPE_EXIT({
free(meta_buffer, meta_write_bytes);
free(data_buffer, total_data_sizes);
});

char * meta_pos = meta_buffer;
auto header = PageUtil::cast<PFMetaHeader>(meta_pos);
header->bits.meta_byte_size = meta_write_bytes;
header->bits.meta_seq_id = write_batch.getSequence();
// TBD CRC
header->bits.page_checksum = 0;

PageEntry entry;
index = 0;
for (auto & write : writes)
{
auto meta = PageUtil::cast<PFMeta>(meta_pos);
meta->pu.type = (UInt8)write.type;
switch (write.type)
{
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::UPSERT:
{
meta->pu.page_id = write.page_id;
meta->pu.page_offset = offsets_in_file[index];
meta->pu.page_size = write.size;
meta->pu.field_offset_len = write.offsets.size();

entry.offset = meta->pu.page_offset;
entry.size = write.size;
entry.field_offsets.swap(write.offsets);

for (size_t i = 0; i < entry.field_offsets.size(); ++i)
{
auto field_offset = PageUtil::cast<PFMetaFieldOffset>(meta_pos);
field_offset->bits.field_offset = (UInt64)entry.field_offsets[i].first;
}

if (write.type == WriteBatch::WriteType::PUT)
edit.put(write.page_id, entry);
else if (write.type == WriteBatch::WriteType::UPSERT)
edit.upsertPage(write.page_id, entry);
break;
}
case WriteBatch::WriteType::DEL:
{
meta->del.page_id = write.page_id;
edit.del(write.page_id);
break;
}
case WriteBatch::WriteType::REF:
{
meta->ref.page_id = write.page_id;
meta->ref.og_page_id = write.ori_page_id;
edit.ref(write.page_id, write.ori_page_id);
break;
}
}
index++;
}

for (size_t j = 0; j < write_batch_size; j++)
{
// std::cout << "buffer_need_write.size() : " << buffer_need_write.size() << std::endl;
std::cout << "offsets_in_file[j] : " << offsets_in_file[j] << std::endl;
}

// FIXME: should mark pagemap after written
if (offset_begin != UINT64_MAX)
{
UInt64 range_move = 0;
for (size_t i = 0; i < write_batch_size; i++)
{
if (data_sizes[i] != 0)
{
// auto & buffer_need_write = writes[i].read_buffer->buffer();

// Still need copy , because need compact data into single IO.
// it depends on we used un-align buffer.
// memcpy(data_buffer + range_move,buffer_need_write.begin(),buffer_need_write.size());
writes[i].read_buffer->readStrict(data_buffer + range_move, writes[i].size);
range_move += data_sizes[i];
}
}

// TBD: maybe we can use a fixed memory store.
// Then we won't need alloc data memory and it can make buffer bigger.
PageUtil::writeFile(data_writer, offset_begin, data_buffer, total_data_sizes, nullptr, false);
PageUtil::syncFile(data_writer);
}

PageUtil::writeFile(meta_writer, meta_position, meta_buffer, meta_write_bytes, nullptr, false);
PageUtil::syncFile(meta_writer);
meta_position += meta_write_bytes;

versioned_page_entries.apply(edit);
}

String BlobFile::getPath()
{
return path;
}


void BlobFile::read(PageIdAndEntries & to_read, const PageHandler & handler)
{
std::sort(to_read.begin(), to_read.end(), [](const PageIdAndEntry & a, const PageIdAndEntry & b) {
return a.second.offset < b.second.offset;
});

size_t buf_size = 0;
for (const auto & p : to_read)
buf_size = std::max(buf_size, p.second.size);

char * data_buf = (char *)alloc(buf_size);
MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { free(p, buf_size); });

auto it = to_read.begin();
while (it != to_read.end())
{
auto && [page_id, entry] = *it;

PageUtil::readFile(data_reader, entry.offset, data_buf, entry.size, nullptr);

// TODO CRC
Page page;
page.page_id = page_id;
page.data = ByteBuffer(data_buf, data_buf + entry.size);
page.mem_holder = mem_holder;

++it;
handler(page_id, page);
}
}


void BlobFile::read(const std::vector<PageId> & page_ids, const PageHandler & handler, SnapshotPtr snapshot)
{
if (!snapshot)
{
snapshot = this->getSnapshot();
}

PageIdAndEntries to_read;
for (auto page_id : page_ids)
{
const auto page_entry = snapshot->version()->find(page_id);
if (!page_entry)
throw Exception("Page " + DB::toString(page_id) + " not found", ErrorCodes::LOGICAL_ERROR);
to_read.emplace_back(page_id, *page_entry);
}

read(to_read, handler);
}

void BlobFile::read(const PageId & page_id, const PageHandler & handler, SnapshotPtr snapshot)
{
if (!snapshot)
{
snapshot = this->getSnapshot();
}

const auto page_entry = snapshot->version()->find(page_id);
if (!page_entry)
throw Exception("Page " + DB::toString(page_id) + " not found", ErrorCodes::LOGICAL_ERROR);
PageIdAndEntries to_read = {{page_id, *page_entry}};
read(to_read, handler);
}

} // namespace DB::PS::V3
Loading