Skip to content

Commit

Permalink
This CL fixes a bug encountered when reading records from leveldb fil…
Browse files Browse the repository at this point in the history
…es that have been split, as in a [] input task split.

Detailed description:

Suppose an input split is generated between two leveldb record blocks and the preceding block ends with null padding.

A reader that previously read at least 1 record within the first block (before encountering the padding) upon trying to read the next record, will successfully and correctly read the next logical record from the subsequent block, but will return a last record offset pointing to the padding in the first block.

When this happened in a [], it resulted in duplicate records being handled at what appeared to be different offsets that were separated by only a few bytes.

This behavior is only observed when at least 1 record was read from the first block before encountering the padding. If the initial offset for a reader was within the padding, the correct record offset would be reported, namely the offset within the second block.

The tests failed to catch this scenario/bug, because each read test only read a single record with an initial offset. This CL adds an explicit test case for this scenario, and modifies the test structure to read all remaining records in the test case after an initial offset is specified.  Thus an initial offset that jumps to record #3, with 5 total records in the test file, will result in reading 2 records, and validating the offset of each of them in order to pass successfully.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115338487
  • Loading branch information
mikewiacek authored and cmumford committed Mar 31, 2016
1 parent 3211343 commit e84b5bd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
8 changes: 7 additions & 1 deletion db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,14 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {

Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment);

// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

if (resyncing_) {
if (record_type == kMiddleType) {
continue;
Expand Down
44 changes: 33 additions & 11 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class LogTest {
// Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[];
static uint64_t initial_offset_last_record_offsets_[];
static int num_initial_offset_records_;

public:
LogTest() : reading_(false),
Expand Down Expand Up @@ -192,7 +193,7 @@ class LogTest {
}

void WriteInitialOffsetLog() {
for (int i = 0; i < 4; i++) {
for (int i = 0; i < num_initial_offset_records_; i++) {
std::string record(initial_offset_record_sizes_[i],
static_cast<char>('a' + i));
Write(record);
Expand Down Expand Up @@ -223,14 +224,20 @@ class LogTest {
source_.contents_ = Slice(dest_.contents_);
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
initial_offset);
Slice record;
std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
record.size());
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
offset_reader->LastRecordOffset());
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);

// Read all records from expected_record_offset through the last one.
ASSERT_LT(expected_record_offset, num_initial_offset_records_);
for (; expected_record_offset < num_initial_offset_records_;
++expected_record_offset) {
Slice record;
std::string scratch;
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
record.size());
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
offset_reader->LastRecordOffset());
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
}
delete offset_reader;
}
};
Expand All @@ -239,15 +246,26 @@ size_t LogTest::initial_offset_record_sizes_[] =
{10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
1};
1,
13716, // Consume all but two bytes of block 3.
log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
};

uint64_t LogTest::initial_offset_last_record_offsets_[] =
{0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 10000) +
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize
+ kHeaderSize + 1,
3 * log::kBlockSize,
};

// LogTest::initial_offset_last_record_offsets_ must be defined before this.
int LogTest::num_initial_offset_records_ =
sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);

TEST(LogTest, Empty) {
ASSERT_EQ("EOF", Read());
Expand Down Expand Up @@ -553,6 +571,10 @@ TEST(LogTest, ReadFourthStart) {
3);
}

TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
}

TEST(LogTest, ReadEnd) {
CheckOffsetPastEndReturnsNoRecords(0);
}
Expand Down

0 comments on commit e84b5bd

Please sign in to comment.