Skip to content

Commit

Permalink
Fix UnmarkEOF for partial blocks
Browse files Browse the repository at this point in the history
Summary:
Blocks in the transaction log are a fixed size, but the last block in the transaction log file is usually a partial block. When a new record is added after the reader hit the end of the file, a new physical record will be appended to the last block. ReadPhysicalRecord can only read full blocks and assumes that the file position indicator is aligned to the start of a block. If the reader is forced to read further by simply clearing the EOF flag, ReadPhysicalRecord will read a full block starting from somewhere in the middle of a real block, causing it to lose alignment and to have a partial physical record at the end of the read buffer. This will result in length mismatches and checksum failures. When the log file is tailed for replication this will cause the log iterator to become invalid, necessitating the creation of a new iterator which will have to read the log file from scratch.

This diff fixes this issue by reading the remaining portion of the last block we read from. This is done when the reader is forced to read further (UnmarkEOF is called).

Test Plan:
- Added unit tests
- Stress test (with replication). Check dbdir/LOG file for corruptions.
- Test on test tier

Reviewers: emayanke, haobo, dhruba

Reviewed By: haobo

CC: vamsi, sheki, dhruba, kailiu, igor

Differential Revision: https://reviews.facebook.net/D15249
  • Loading branch information
swkrueger committed Jan 27, 2014
1 parent 832158e commit 3d33da7
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 29 deletions.
70 changes: 68 additions & 2 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
read_error_(false),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
Expand Down Expand Up @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}

void Reader::UnmarkEOF() {
if (read_error_) {
return;
}

eof_ = false;

if (eof_offset_ == 0) {
return;
}

// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
// of a block.
//
// consumed_bytes + buffer_size() + remaining == kBlockSize

size_t consumed_bytes = eof_offset_ - buffer_.size();
size_t remaining = kBlockSize - eof_offset_;

// backing_store_ is used to concatenate what is left in buffer_ and
// the remainder of the block. If buffer_ already uses backing_store_,
// we just append the new data.
if (buffer_.data() != backing_store_ + consumed_bytes) {
// Buffer_ does not use backing_store_ for storage.
// Copy what is left in buffer_ to backing_store.
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
}

Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);

size_t added = read_buffer.size();
end_of_buffer_offset_ += added;

if (!status.ok()) {
if (added > 0) {
ReportDrop(added, status);
}

read_error_ = true;
return;
}

if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
read_buffer.size());
}

buffer_ = Slice(backing_store_ + consumed_bytes,
eof_offset_ + added - consumed_bytes);

if (added < remaining) {
eof_ = true;
eof_offset_ += added;
} else {
eof_offset_ = 0;
}
}

void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
Expand All @@ -184,18 +249,19 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {
Expand Down
12 changes: 9 additions & 3 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ class Reader {

// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();

SequentialFile* file() { return file_.get(); }

Expand All @@ -82,6 +83,11 @@ class Reader {
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file

// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
Expand Down
Loading

0 comments on commit 3d33da7

Please sign in to comment.