Skip to content

Commit

Permalink
Task AB# 1310793: [LevelDB] Allow per-platform log buffer size (#7)
Browse files Browse the repository at this point in the history
* Allow per-platform log buffer size

* fixing log tests

---------

Co-authored-by: Dan Samhold <75692991+dango-msft@users.noreply.github.com>
  • Loading branch information
yabmek-msft and dango-msft authored Dec 20, 2024
1 parent 12d8819 commit a995da2
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 46 deletions.
2 changes: 1 addition & 1 deletion db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TEST_F(CorruptionTest, Recovery) {
Build(100);
Check(100, 100);
Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record
Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block
Corrupt(kLogFile, port::kLogBlockSize + 1000, 1); // Somewhere in second block
Reopen();

// The 64 records in the first two log blocks are completely lost.
Expand Down
2 changes: 0 additions & 2 deletions db/log_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ enum RecordType {
};
static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

Expand Down
14 changes: 7 additions & 7 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
: file_(file),
reporter_(reporter),
checksum_(checksum),
backing_store_(new char[kBlockSize]),
backing_store_(new char[port::kLogBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0),
Expand All @@ -31,12 +31,12 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
Reader::~Reader() { delete[] backing_store_; }

bool Reader::SkipToInitialBlock() {
const size_t offset_in_block = initial_offset_ % kBlockSize;
const size_t offset_in_block = initial_offset_ % port::kLogBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;

// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
block_start_location += kBlockSize;
if (offset_in_block > port::kLogBlockSize - 6) {
block_start_location += port::kLogBlockSize;
}

end_of_buffer_offset_ = block_start_location;
Expand Down Expand Up @@ -192,14 +192,14 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
Status status = file_->Read(port::kLogBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
ReportDrop(port::kLogBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
} else if (buffer_.size() < port::kLogBlockSize) {
eof_ = true;
}
continue;
Expand Down
2 changes: 1 addition & 1 deletion db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Reader {
bool const checksum_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool eof_; // Last Read() indicated EOF by returning < port::kLogBlockSize

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
Expand Down
60 changes: 30 additions & 30 deletions db/log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,20 @@ class LogTest : public testing::Test {
size_t LogTest::initial_offset_record_sizes_[] = {
10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
2 * port::kLogBlockSize - 1000, // Span three blocks
1,
13716, // Consume all but two bytes of block 3.
log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
port::kLogBlockSize - kHeaderSize, // Consume the entirety of block 4.
};

uint64_t LogTest::initial_offset_last_record_offsets_[] = {
0,
kHeaderSize + 10000,
2 * (kHeaderSize + 10000),
2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
2 * (kHeaderSize + 10000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 10000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize +
kHeaderSize + 1,
3 * log::kBlockSize,
3 * port::kLogBlockSize,
};

// LogTest::initial_offset_last_record_offsets_ must be defined before this.
Expand Down Expand Up @@ -295,9 +295,9 @@ TEST_F(LogTest, Fragmentation) {

TEST_F(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2 * kHeaderSize;
const int n = port::kLogBlockSize - 2 * kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
Expand All @@ -308,9 +308,9 @@ TEST_F(LogTest, MarginalTrailer) {

TEST_F(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
const int n = kBlockSize - 2 * kHeaderSize;
const int n = port::kLogBlockSize - 2 * kHeaderSize;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize, WrittenBytes());
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("bar", Read());
Expand All @@ -320,9 +320,9 @@ TEST_F(LogTest, MarginalTrailer2) {
}

TEST_F(LogTest, ShortTrailer) {
const int n = kBlockSize - 2 * kHeaderSize + 4;
const int n = port::kLogBlockSize - 2 * kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize + 4, WrittenBytes());
Write("");
Write("bar");
ASSERT_EQ(BigString("foo", n), Read());
Expand All @@ -332,9 +332,9 @@ TEST_F(LogTest, ShortTrailer) {
}

TEST_F(LogTest, AlignedEof) {
const int n = kBlockSize - 2 * kHeaderSize + 4;
const int n = port::kLogBlockSize - 2 * kHeaderSize + 4;
Write(BigString("foo", n));
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(port::kLogBlockSize - kHeaderSize + 4, WrittenBytes());
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_EQ("EOF", Read());
}
Expand Down Expand Up @@ -367,7 +367,7 @@ TEST_F(LogTest, ReadError) {
Write("foo");
ForceError();
ASSERT_EQ("EOF", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ(port::kLogBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}

Expand All @@ -391,13 +391,13 @@ TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
}

TEST_F(LogTest, BadLength) {
const int kPayloadSize = kBlockSize - kHeaderSize;
const int kPayloadSize = port::kLogBlockSize - kHeaderSize;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ(port::kLogBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
}

Expand Down Expand Up @@ -458,7 +458,7 @@ TEST_F(LogTest, UnexpectedFirstType) {
}

TEST_F(LogTest, MissingLastIsIgnored) {
Write(BigString("bar", kBlockSize));
Write(BigString("bar", port::kLogBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
ASSERT_EQ("EOF", Read());
Expand All @@ -467,7 +467,7 @@ TEST_F(LogTest, MissingLastIsIgnored) {
}

TEST_F(LogTest, PartialLastIsIgnored) {
Write(BigString("bar", kBlockSize));
Write(BigString("bar", port::kLogBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
Expand All @@ -481,9 +481,9 @@ TEST_F(LogTest, SkipIntoMultiRecord) {
// If initial_offset points to a record after first(R1) but before first(R2)
// incomplete fragment errors are not actual errors, and must be suppressed
// until a new first or full record is encountered.
Write(BigString("foo", 3 * kBlockSize));
Write(BigString("foo", 3 * port::kLogBlockSize));
Write("correct");
StartReadingAt(kBlockSize);
StartReadingAt(port::kLogBlockSize);

ASSERT_EQ("correct", Read());
ASSERT_EQ("", ReportMessage());
Expand All @@ -498,20 +498,20 @@ TEST_F(LogTest, ErrorJoinsRecords) {
// first(R1),last(R2) to get joined and returned as a valid record.

// Write records that span two blocks
Write(BigString("foo", kBlockSize));
Write(BigString("bar", kBlockSize));
Write(BigString("foo", port::kLogBlockSize));
Write(BigString("bar", port::kLogBlockSize));
Write("correct");

// Wipe the middle block
for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
for (int offset = port::kLogBlockSize; offset < 2 * port::kLogBlockSize; offset++) {
SetByte(offset, 'x');
}

ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
const size_t dropped = DroppedBytes();
ASSERT_LE(dropped, 2 * kBlockSize + 100);
ASSERT_GE(dropped, 2 * kBlockSize);
ASSERT_LE(dropped, 2 * port::kLogBlockSize + 100);
ASSERT_GE(dropped, 2 * port::kLogBlockSize);
}

TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
Expand All @@ -529,25 +529,25 @@ TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }

TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
CheckInitialOffsetRecord(port::kLogBlockSize - 4, 3);
}

TEST_F(LogTest, ReadFourthMiddleBlock) {
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
CheckInitialOffsetRecord(port::kLogBlockSize + 1, 3);
}

TEST_F(LogTest, ReadFourthLastBlock) {
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
CheckInitialOffsetRecord(2 * port::kLogBlockSize + 1, 3);
}

TEST_F(LogTest, ReadFourthStart) {
CheckInitialOffsetRecord(
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
2 * (kHeaderSize + 1000) + (2 * port::kLogBlockSize - 1000) + 3 * kHeaderSize,
3);
}

TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) {
CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
CheckInitialOffsetRecord(3 * port::kLogBlockSize - 3, 5);
}

TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
Expand Down
10 changes: 5 additions & 5 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
}

Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
: dest_(dest), block_offset_(dest_length % port::kLogBlockSize) {
InitTypeCrc(type_crc_);
}

Expand All @@ -41,7 +41,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
const int leftover = port::kLogBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
Expand All @@ -54,9 +54,9 @@ Status Writer::AddRecord(const Slice& slice) {
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
assert(port::kLogBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t avail = port::kLogBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
Expand All @@ -82,7 +82,7 @@ Status Writer::AddRecord(const Slice& slice) {
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
size_t length) {
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
assert(block_offset_ + kHeaderSize + length <= port::kLogBlockSize);

// Format the header
char buf[kHeaderSize];
Expand Down
3 changes: 3 additions & 0 deletions port/port_example.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace port {
// TODO(jorlow): Many of these belong more in the environment class rather than
// here. We should try moving them and see if it affects perf.

// Buffer size for log
static const int kLogBlockSize = 32768;

// ------------------ Threading -------------------

// A Mutex represents an exclusive lock.
Expand Down
2 changes: 2 additions & 0 deletions port/port_stdcxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ namespace port {

class CondVar;

static const int kLogBlockSize = 32768;

// Thinly wraps std::mutex.
class LOCKABLE Mutex {
public:
Expand Down

0 comments on commit a995da2

Please sign in to comment.