Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor WAL log writer && IO limiter #4107

Merged
merged 9 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Poco/Logger.h>
#include <Storages/Page/PageUtil.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/LogFile/LogWriter.h>
#include <common/logger_useful.h>
Expand All @@ -12,50 +13,63 @@
namespace DB::PS::V3
{
LogWriter::LogWriter(
std::unique_ptr<WriteBufferFromFileBase> && dest_,
String path_,
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_)
: dest(std::move(dest_))
: path(path_)
, file_provider(file_provider_)
, block_offset(0)
, log_number(log_number_)
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
, write_buffer(nullptr, 0)
{
// Must be `BLOCK_SIZE`, or we can not ensure the correctness of writing.
assert(dest->internalBuffer().size() == Format::BLOCK_SIZE);
log_file = file_provider->newWritableFile(
path,
EncryptionPath(path, ""),
false,
/*create_new_encryption_info_*/ false);

buffer = static_cast<char *>(alloc(buffer_size));
write_buffer = WriteBuffer(buffer, buffer_size);
}

void LogWriter::resetBuffer()
{
write_buffer = WriteBuffer(buffer, buffer_size);
}

LogWriter::~LogWriter()
{
if (dest)
{
flush();
log_file->fsync();
log_file->close();

dest->close(); // close explicitly
}
free(buffer, buffer_size);
}

size_t LogWriter::writtenBytes() const
{
return dest->getMaterializedBytes();
return written_bytes;
}

void LogWriter::flush()
void LogWriter::flush(const WriteLimiterPtr & write_limiter)
{
dest->sync();
PageUtil::writeFile(log_file, written_bytes, write_buffer.buffer().begin(), write_buffer.offset(), write_limiter, false);
log_file->fsync();
written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}

void LogWriter::close()
{
if (dest)
{
dest->close();
dest.reset();
}
log_file->close();
}

void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const WriteLimiterPtr & write_limiter)
{
// Header size varies depending on whether we are recycling or not.
const int header_size = recycle_log_files ? Format::RECYCLABLE_HEADER_SIZE : Format::HEADER_SIZE;
Expand All @@ -64,6 +78,17 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
// we still want to iterate once to emit a single zero-length record.
bool begin = true;
size_t payload_left = payload_size;

size_t head_sizes = ((payload_size / Format::BLOCK_SIZE) + 1) * Format::RECYCLABLE_HEADER_SIZE;
if (payload_size + head_sizes >= buffer_size)
{
size_t new_buff_size = payload_size + ((head_sizes / Format::BLOCK_SIZE) + 1) * Format::BLOCK_SIZE;

buffer = static_cast<char *>(realloc(buffer, buffer_size, new_buff_size));
buffer_size = new_buff_size;
resetBuffer();
}

do
{
const Int64 leftover = Format::BLOCK_SIZE - block_offset;
Expand All @@ -75,7 +100,7 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
{
// Fill the trailer with all zero
static constexpr char MAX_ZERO_HEADER[Format::RECYCLABLE_HEADER_SIZE]{'\x00'};
writeString(MAX_ZERO_HEADER, leftover, *dest);
writeString(MAX_ZERO_HEADER, leftover, write_buffer);
}
block_offset = 0;
}
Expand All @@ -101,7 +126,9 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size)
} while (payload.hasPendingData());

if (!manual_flush)
dest->sync();
{
flush(write_limiter);
}
}

void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length)
Expand Down Expand Up @@ -150,9 +177,9 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload
// Write the checksum, header and the payload
digest.update(payload.position(), length);
Format::ChecksumType checksum = digest.checksum();
writeIntBinary(checksum, *dest);
writeString(header_buff.buffer().begin(), header_buff.count(), *dest);
writeString(payload.position(), length, *dest);
writeIntBinary(checksum, write_buffer);
writeString(header_buff.buffer().begin(), header_buff.count(), write_buffer);
writeString(payload.position(), length, write_buffer);

block_offset += header_size + length;
}
Expand Down
24 changes: 19 additions & 5 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <Encryption/FileProvider.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <common/types.h>

Expand Down Expand Up @@ -55,11 +56,12 @@ namespace PS::V3
* Log number = 32bit log file number, so that we can distinguish between
* records written by the most recent log writer vs a previous one.
*/
class LogWriter final
class LogWriter final : public Allocator<false>
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved
{
public:
LogWriter(
std::unique_ptr<WriteBufferFromFileBase> && dest_,
String path_,
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_ = false);
Expand All @@ -69,9 +71,9 @@ class LogWriter final

~LogWriter();

void addRecord(ReadBuffer & payload, size_t payload_size);
void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr);

void flush();
void flush(const WriteLimiterPtr & write_limiter = nullptr);

void close();

Expand All @@ -85,14 +87,26 @@ class LogWriter final
private:
void emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload, size_t length);

void resetBuffer();

private:
std::unique_ptr<WriteBufferFromFileBase> dest;
String path;
FileProviderPtr file_provider;

WritableFilePtr log_file;

size_t block_offset; // Current offset in block
Format::LogNumberType log_number;
const bool recycle_log_files;
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::flush()
const bool manual_flush;

size_t written_bytes = 0;

char * buffer;
size_t buffer_size = Format::BLOCK_SIZE;
WriteBuffer write_buffer;
};
} // namespace PS::V3
} // namespace DB
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ std::set<PageId> PageDirectory::getAllPageIds()
return page_ids;
}

void PageDirectory::apply(PageEntriesEdit && edit)
void PageDirectory::apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter)
{
std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline
UInt64 last_sequence = sequence.load();

// stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0]
wal->apply(edit, PageVersionType(last_sequence + 1, 0));
wal->apply(edit, PageVersionType(last_sequence + 1, 0), write_limiter);

// stage 2, create entry version list for pageId. nothing need to be rollback
std::unordered_map<PageId, std::pair<PageLock, int>> updating_locks;
Expand Down Expand Up @@ -496,7 +496,7 @@ void PageDirectory::apply(PageEntriesEdit && edit)
sequence.fetch_add(1);
}

std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids)
std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter)
{
{
std::shared_lock read_lock(table_rw_mutex);
Expand All @@ -519,7 +519,7 @@ std::set<PageId> PageDirectory::gcApply(PageEntriesEdit && migrated_edit, bool n
} // Then we should release the read lock on `table_rw_mutex`

// Apply migrate edit into WAL with the increased epoch version
wal->apply(migrated_edit);
wal->apply(migrated_edit, write_limiter);

if (!need_scan_page_ids)
{
Expand Down Expand Up @@ -567,12 +567,12 @@ PageDirectory::getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc)
}


std::vector<PageEntriesV3> PageDirectory::gc()
std::vector<PageEntriesV3> PageDirectory::gc(const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
[[maybe_unused]] bool done_anything = false;
UInt64 lowest_seq = sequence.load();

done_anything |= wal->compactLogs();
done_anything |= wal->compactLogs(write_limiter, read_limiter);

{
// Cleanup released snapshots
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ class PageDirectory

std::set<PageId> getAllPageIds();

void apply(PageEntriesEdit && edit);
void apply(PageEntriesEdit && edit, const WriteLimiterPtr & write_limiter = nullptr);
jiaqizho marked this conversation as resolved.
Show resolved Hide resolved

std::pair<std::map<BlobFileId, PageIdAndVersionedEntries>, PageSize>
getEntriesByBlobIds(const std::vector<BlobFileId> & blob_need_gc);

std::set<PageId> gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids);
std::set<PageId> gcApply(PageEntriesEdit && migrated_edit, bool need_scan_page_ids, const WriteLimiterPtr & write_limiter = nullptr);

std::vector<PageEntriesV3> gc();
std::vector<PageEntriesV3> gc(const WriteLimiterPtr & write_limiter = nullptr, const ReadLimiterPtr & read_limiter = nullptr);

size_t numPages() const
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void PageStorageImpl::restore()
collapsing_directory.apply(std::move(edit));
};
// Restore `collapsing_directory` from disk
auto wal = WALStore::create(callback, file_provider, delegator, /*write_limiter*/ nullptr);
auto wal = WALStore::create(callback, file_provider, delegator);
// PageId max_page_id = collapsing_directory.max_applied_page_id; // TODO: return it to outer function

// TODO: Now `PageDirectory::create` and `BlobStore::restore` iterate all entries in `collapsing_directory`,
Expand Down Expand Up @@ -78,7 +78,7 @@ void PageStorageImpl::write(DB::WriteBatch && write_batch, const WriteLimiterPtr

// Persist Page data to BlobStore
auto edit = blob_store.write(write_batch, write_limiter);
page_directory.apply(std::move(edit));
page_directory.apply(std::move(edit), write_limiter);
}

DB::PageEntry PageStorageImpl::getEntry(PageId page_id, SnapshotPtr snapshot)
Expand Down Expand Up @@ -178,7 +178,7 @@ void PageStorageImpl::traverse(const std::function<void(const DB::Page & page)>
}
}

bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limiter*/, const ReadLimiterPtr & /*read_limiter*/)
bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & write_limiter, const ReadLimiterPtr & read_limiter)
{
// If another thread is running gc, just return;
bool v = false;
Expand All @@ -192,7 +192,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi

// 1. Do the MVCC gc, clean up expired snapshot.
// And get the expired entries.
const auto & del_entries = page_directory.gc();
const auto & del_entries = page_directory.gc(write_limiter, read_limiter);

// 2. Remove the expired entries in BlobStore.
// It won't delete the data on the disk.
Expand Down Expand Up @@ -224,7 +224,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi
// 5. Do the BlobStore GC
// After BlobStore GC, these entries will be migrated to a new blob.
// Then we should notify MVCC apply the change.
PageEntriesEdit gc_edit = blob_store.gc(blob_gc_info, total_page_size);
PageEntriesEdit gc_edit = blob_store.gc(blob_gc_info, total_page_size, write_limiter, read_limiter);
if (gc_edit.empty())
{
throw Exception("Something wrong after BlobStore GC.", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -237,7 +237,7 @@ bool PageStorageImpl::gc(bool /*not_skip*/, const WriteLimiterPtr & /*write_limi
// be reset to correct state during restore. If any exception thrown, then some BlobFiles
// will be remained as "read-only" files while entries in them are useless in actual.
// Those BlobFiles should be cleaned during next restore.
const auto & page_ids = page_directory.gcApply(std::move(gc_edit), external_pages_remover != nullptr);
const auto & page_ids = page_directory.gcApply(std::move(gc_edit), external_pages_remover != nullptr, write_limiter);

(void)page_ids;

Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,25 @@ LogFilenameSet WALStoreReader::listAllFiles(
return log_files;
}

WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, LogFilenameSet files)
WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, LogFilenameSet files, const ReadLimiterPtr & read_limiter)
{
auto reader = std::make_shared<WALStoreReader>(provider, std::move(files));
auto reader = std::make_shared<WALStoreReader>(provider, std::move(files), read_limiter);
reader->openNextFile();
return reader;
}

WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator)
WALStoreReaderPtr WALStoreReader::create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter)
{
Poco::Logger * logger = &Poco::Logger::get("WALStore");
LogFilenameSet log_files = listAllFiles(delegator, logger);
return create(provider, std::move(log_files));
return create(provider, std::move(log_files), read_limiter);
}

WALStoreReader::WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_)
WALStoreReader::WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_, const ReadLimiterPtr & read_limiter_)
: provider(provider_)
, files(std::move(files_))
, next_reading_file(files.begin())
, read_limiter(read_limiter_)
, logger(&Poco::Logger::get("LogReader"))
{}

Expand Down Expand Up @@ -135,7 +136,7 @@ bool WALStoreReader::openNextFile()
EncryptionPath{parent_path, filename},
/*estimated_size*/ Format::BLOCK_SIZE,
/*aio_threshold*/ 0,
/*read_limiter*/ nullptr,
/*read_limiter*/ read_limiter,
/*buffer_size*/ Format::BLOCK_SIZE // Must be `Format::BLOCK_SIZE`
);
reader = std::make_unique<LogReader>(
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class WALStoreReader
public:
static LogFilenameSet listAllFiles(PSDiskDelegatorPtr & delegator, Poco::Logger * logger);

static WALStoreReaderPtr create(FileProviderPtr & provider, LogFilenameSet files);
static WALStoreReaderPtr create(FileProviderPtr & provider, LogFilenameSet files, const ReadLimiterPtr & read_limiter = nullptr);

static WALStoreReaderPtr create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator);
static WALStoreReaderPtr create(FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter = nullptr);

bool remained() const;

Expand All @@ -56,7 +56,7 @@ class WALStoreReader
return reader->getLogNumber();
}

WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_);
WALStoreReader(FileProviderPtr & provider_, LogFilenameSet && files_, const ReadLimiterPtr & read_limiter_ = nullptr);

WALStoreReader(const WALStoreReader &) = delete;
WALStoreReader & operator=(const WALStoreReader &) = delete;
Expand All @@ -69,6 +69,7 @@ class WALStoreReader

const LogFilenameSet files;
LogFilenameSet::const_iterator next_reading_file;
const ReadLimiterPtr read_limiter;
std::unique_ptr<LogReader> reader;
Poco::Logger * logger;
};
Expand Down
Loading