Skip to content

Commit

Permalink
Page storage bug fix (#87)
Browse files Browse the repository at this point in the history
* add test cases for PageStorage

* split PageStorage gc stage into small helper functions

* add test cases for PageStorage gc concurrency

* add stress test and dump utils of PageStorage

* Fix bug:
1. ensure PageFile with meta && data, in case gc drop file but be killed before drop dir
2. turn PageStorage::Config::sync_on_write = true by default
3. avoid PageStorage::gc run by multi-threads
4. print PageFile's path if checksum is not correct

* throw exception for must_exist==false and errno != ENOENT
  • Loading branch information
JaySon-Huang committed Jul 16, 2019
1 parent 7a4d042 commit d80e9e2
Show file tree
Hide file tree
Showing 11 changed files with 740 additions and 152 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Storages/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ if (ENABLE_TESTS)
add_subdirectory (tests)
add_subdirectory (Transaction/tests)
add_subdirectory (DeltaMerge/tests)
add_subdirectory (Page/tests)
endif ()

18 changes: 10 additions & 8 deletions dbms/src/Storages/Page/Page.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ using Pages = std::vector<Page>;
using PageMap = std::map<PageId, Page>;
using PageHandler = std::function<void(PageId page_id, const Page &)>;

// Indicate the page size && offset in PageFile. TODO: rename to `PageEntry`?
struct PageCache
{
PageFileId file_id = 0;
UInt32 level;
UInt32 size;
UInt64 offset;
UInt64 tag;
UInt64 checksum;

bool isValid() { return file_id; }
// if file_id == 0, means it is invalid
PageFileId file_id = 0;
UInt32 level = 0;
UInt32 size = 0;
UInt64 offset = 0;
UInt64 tag = 0;
UInt64 checksum = 0;

bool isValid() const { return file_id != 0; }
PageFileIdAndLevel fileIdLevel() const { return std::make_pair(file_id, level); }
};
static_assert(std::is_trivially_copyable_v<PageCache>);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace DB
{

#define MB 1048576ULL;
static constexpr UInt64 MB = 1048576ULL;

static constexpr UInt64 PAGE_SIZE_STEP = (1 << 10) * 16; // 16 KB
static constexpr UInt64 PAGE_BUFFER_SIZE = DBMS_DEFAULT_BUFFER_SIZE;
Expand Down
96 changes: 71 additions & 25 deletions dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ template <bool read, bool must_exist = true>
int openFile(const std::string & path)
{
ProfileEvents::increment(ProfileEvents::FileOpen);
int fd;

int flags;
if constexpr (read)
Expand All @@ -87,14 +86,16 @@ int openFile(const std::string & path)
flags = O_WRONLY | O_CREAT;
}

fd = ::open(path.c_str(), flags, 0666);
int fd = ::open(path.c_str(), flags, 0666);
if (-1 == fd)
{
ProfileEvents::increment(ProfileEvents::FileOpenFailed);
if constexpr (!must_exist)
{
if (errno == ENOENT)
{
return 0;
}
}
throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
Expand Down Expand Up @@ -345,6 +346,7 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //

/// Analyze meta file, and return <available meta size, available data size>.
std::pair<UInt64, UInt64> analyzeMetaFile( //
const String & path,
PageFileId file_id,
UInt32 level,
const char * meta_data,
Expand All @@ -356,44 +358,50 @@ std::pair<UInt64, UInt64> analyzeMetaFile( //

UInt64 page_data_file_size = 0;
char * pos = const_cast<char *>(meta_data);
while (pos < meta_data + meta_data_size)
while (pos < meta_data_end)
{
if (pos + sizeof(WBSize) > meta_data_end)
{
LOG_WARNING(log, "Incomplete write batch, ignored.");
break;
}
const char * wb_start_pos = pos;
auto wb_bytes = get<WBSize>(pos);
const auto wb_bytes = get<WBSize>(pos);
if (wb_start_pos + wb_bytes > meta_data_end)
{
LOG_WARNING(log, "Incomplete write batch, ignored.");
break;
}
auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum);

auto version = get<PageFileVersion>(pos);
auto wb_checksum = get<Checksum, false>(wb_start_pos + wb_bytes_without_checksum);

if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum))
throw Exception("Write batch checksum not match", ErrorCodes::CHECKSUM_DOESNT_MATCH);
// this field is always true now
const auto version = get<PageFileVersion>(pos);
if (version != PageFile::CURRENT_VERSION)
throw Exception("Version not match", ErrorCodes::LOGICAL_ERROR);

// check the checksum of WriteBatch
const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum);
const auto wb_checksum = get<Checksum, false>(wb_start_pos + wb_bytes_without_checksum);
if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum))
{
throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data),
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

// recover WriteBatch
while (pos < wb_start_pos + wb_bytes_without_checksum)
{
auto is_put = get<UInt8>(pos);
if (is_put)
{
PageCache pc{};

auto page_id = get<PageId>(pos);
pc.tag = get<PageTag>(pos);
pc.offset = get<PageOffset>(pos);
pc.size = get<PageSize>(pos);
pc.checksum = get<Checksum>(pos);
pc.file_id = file_id;
pc.level = level;
auto page_id = get<PageId>(pos);
PageCache pc;
pc.file_id = file_id;
pc.level = level;
pc.tag = get<PageTag>(pos);
pc.offset = get<PageOffset>(pos);
pc.size = get<PageSize>(pos);
pc.checksum = get<Checksum>(pos);

page_caches[page_id] = pc;
page_data_file_size += pc.size;
Expand All @@ -404,6 +412,7 @@ std::pair<UInt64, UInt64> analyzeMetaFile( //
page_caches.erase(page_id); // Reserve the order of removal.
}
}
// move `pos` over the checksum of WriteBatch
pos += sizeof(Checksum);

if (pos != wb_start_pos + wb_bytes)
Expand Down Expand Up @@ -443,6 +452,7 @@ void PageFile::Writer::write(const WriteBatch & wb, PageCacheMap & page_cache_ma
{
ProfileEvents::increment(ProfileEvents::PSMWritePages, wb.putWriteCount());

// TODO: investigate if not copy data into heap, write big pages can be faster?
ByteBuffer meta_buf, data_buf;
std::tie(meta_buf, data_buf) = PageMetaFormat::genWriteData(wb, page_file, page_cache_map);

Expand Down Expand Up @@ -482,10 +492,12 @@ PageMap PageFile::Reader::read(PageIdAndCaches & to_read)
return a.second.offset < b.second.offset;
});

// allocate data_buf that can hold all pages
size_t buf_size = 0;
for (const auto & p : to_read)
{
buf_size += p.second.size;

}
// TODO optimization:
// 1. Succeeding pages can be read by one call.
// 2. Pages with small gaps between them can also read together.
Expand All @@ -504,7 +516,10 @@ PageMap PageFile::Reader::read(PageIdAndCaches & to_read)
{
auto checksum = CityHash_v1_0_2::CityHash64(pos, page_cache.size);
if (checksum != page_cache.checksum)
throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path, ErrorCodes::CHECKSUM_DOESNT_MATCH);
{
throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}

Page page;
Expand Down Expand Up @@ -550,7 +565,10 @@ void PageFile::Reader::read(PageIdAndCaches & to_read, const PageHandler & handl
{
auto checksum = CityHash_v1_0_2::CityHash64(data_buf, page_cache.size);
if (checksum != page_cache.checksum)
throw Exception("Page checksum not match, broken file.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
{
throw Exception("Page [" + DB::toString(page_id) + "] checksum not match, broken file: " + data_file_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}

Page page;
Expand Down Expand Up @@ -610,10 +628,22 @@ std::pair<PageFile, bool> PageFile::recover(const std::string & parent_path, con
LOG_INFO(log, "Temporary page file, ignored: " + page_file_name);
return {{}, false};
}

// ensure both meta && data exist
PageFileId file_id = std::stoull(ss[1]);
UInt32 level = std::stoi(ss[2]);
return {PageFile(file_id, level, parent_path, false, false, log), true};
PageFile pf(file_id, level, parent_path, false, false, log);
if (!Poco::File(pf.metaPath()).exists())
{
LOG_INFO(log, "Broken page without meta file, ignored: " + pf.metaPath());
return {{}, false};
}
if (!Poco::File(pf.dataPath()).exists())
{
LOG_INFO(log, "Broken page without data file, ignored: " + pf.dataPath());
return {{}, false};
}

return {pf, true};
}

PageFile PageFile::newPageFile(PageFileId file_id, UInt32 level, const std::string & parent_path, bool is_tmp, Logger * log)
Expand All @@ -628,7 +658,7 @@ PageFile PageFile::openPageFileForRead(PageFileId file_id, UInt32 level, const s

void PageFile::readAndSetPageMetas(PageCacheMap & page_caches)
{
auto path = metaPath();
const auto path = metaPath();
Poco::File file(path);
size_t file_size = file.getSize();

Expand All @@ -641,7 +671,9 @@ void PageFile::readAndSetPageMetas(PageCacheMap & page_caches)

readFile(file_fd, 0, data, file_size, path);

std::tie(this->meta_file_pos, this->data_file_pos) = PageMetaFormat::analyzeMetaFile(file_id, level, data, file_size, page_caches, log);
// analyze meta file and update page_caches
std::tie(this->meta_file_pos, this->data_file_pos)
= PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, page_caches, log);
}

void PageFile::setFormal()
Expand All @@ -658,7 +690,21 @@ void PageFile::destroy()
// TODO: delay remove.
Poco::File file(folderPath());
if (file.exists())
{
// remove meta first, then remove data
Poco::File meta_file(metaPath());
if (meta_file.exists())
{
meta_file.remove();
}
Poco::File data_file(dataPath());
if (data_file.exists())
{
data_file.remove();
}
// drop dir
file.remove(true);
}
}

UInt64 PageFile::getDataFileSize() const
Expand Down
Loading

0 comments on commit d80e9e2

Please sign in to comment.