Skip to content

Commit

Permalink
Handoff checksum Implementation (#7523)
Browse files Browse the repository at this point in the history
Summary:
in PR facebook/rocksdb#7419 , we introduce the new Append and PositionedAppend APIs to WritableFile at File System, which enable RocksDB to pass the data verification information (e.g., checksum of the data) to the lower layer. In this PR, we use the new API in WritableFileWriter, such that the file created via WritableFileWrite can pass the checksum to the storage layer. To control which types file should apply the checksum handoff, we add checksum_handoff_file_types to DBOptions. User can use this option to control which file types (Currently supported file tyes: kLogFile, kTableFile, kDescriptorFile.) should use the new Append and PositionedAppend APIs to handoff the verification information.

Pull Request resolved: facebook/rocksdb#7523

Test Plan: add new unit test, pass make check/ make asan_check

Reviewed By: pdillinger

Differential Revision: D24313271

Pulled By: zhichao-cao

fbshipit-source-id: aafd69091ae85c3318e3e17cbb96fe7338da11d0
Signed-off-by: Changlong Chen <levisonchen@live.cn>
  • Loading branch information
zhichao-cao authored and Changlong Chen committed Jun 18, 2021
1 parent 19eb77a commit 2300a90
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
40 changes: 37 additions & 3 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "port/port.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/crc32c.h"
#include "util/random.h"
#include "util/rate_limiter.h"

Expand Down Expand Up @@ -395,6 +396,8 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
assert(!use_direct_io());
const char* src = data;
size_t left = size;
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];

while (left > 0) {
size_t allowed;
Expand All @@ -420,8 +423,16 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
#endif
{
auto prev_perf_level = GetPerfLevel();

IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info,
nullptr);
} else {
s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
}
SetPerfLevel(prev_perf_level);
}
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -451,6 +462,19 @@ void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
}
}

// Currently, crc32c checksum is used to calculate the checksum value of the
// content in the input buffer for handoff. In the future, the checksum might be
// calculated from the existing crc32c checksums of the in WAl and Manifest
// records, or even SST file blocks.
// TODO: effectively use the existing checksum of the data being writing to
// generate the crc32c checksum instead of a raw calculation.
void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
size_t size,
char* buf) {
uint32_t v_crc32c = crc32c::Extend(0, data, size);
EncodeFixed32(buf, v_crc32c);
}

// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
Expand Down Expand Up @@ -481,6 +505,8 @@ IOStatus WritableFileWriter::WriteDirect() {
const char* src = buf_.BufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.CurrentSize();
DataVerificationInfo v_info;
char checksum_buf[sizeof(uint32_t)];

while (left > 0) {
// Check how much is allowed
Expand All @@ -501,8 +527,16 @@ IOStatus WritableFileWriter::WriteDirect() {
start_ts = FileOperationInfo::StartNow();
}
// direct writes must be positional
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), nullptr);
if (perform_data_verification_) {
Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), v_info, nullptr);
} else {
s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
IOOptions(), nullptr);
}

if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now();
NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
Expand Down
9 changes: 7 additions & 2 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class WritableFileWriter {

bool ShouldNotifyListeners() const { return !listeners_.empty(); }
void UpdateFileChecksum(const Slice& data);
void Crc32cHandoffChecksumCalculation(const char* data, size_t size,
char* buf);

std::string file_name_;
FSWritableFilePtr writable_file_;
Expand All @@ -141,6 +143,7 @@ class WritableFileWriter {
std::vector<std::shared_ptr<EventListener>> listeners_;
std::unique_ptr<FileChecksumGenerator> checksum_generator_;
bool checksum_finalized_;
bool perform_data_verification_;

public:
WritableFileWriter(
Expand All @@ -150,7 +153,8 @@ class WritableFileWriter {
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
FileChecksumGenFactory* file_checksum_gen_factory = nullptr)
FileChecksumGenFactory* file_checksum_gen_factory = nullptr,
bool perform_data_verification = false)
: file_name_(_file_name),
writable_file_(std::move(file), io_tracer, _file_name),
clock_(clock),
Expand All @@ -167,7 +171,8 @@ class WritableFileWriter {
stats_(stats),
listeners_(),
checksum_generator_(nullptr),
checksum_finalized_(false) {
checksum_finalized_(false),
perform_data_verification_(perform_data_verification) {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
Expand Down

0 comments on commit 2300a90

Please sign in to comment.