Skip to content

Commit

Permalink
Ensure VerifyFileChecksums reads don't exceed readahead_size (#11328)
Browse files Browse the repository at this point in the history
Summary:
VerifyFileChecksums currently interprets the readahead_size as a payload of readahead_size for calculating the checksum, plus a prefetch of an additional readahead_size. Hence each read is readahead_size * 2. This change treats it as chunks of readahead_size for checksum calculation.

Pull Request resolved: #11328

Test Plan: Add a unit test

Reviewed By: pdillinger

Differential Revision: D44718781

Pulled By: anand1976

fbshipit-source-id: 79bae1ebaa27de2a13bc86f5910bf09356936e63
  • Loading branch information
anand1976 committed Apr 6, 2023
1 parent 501f359 commit af72046
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 10 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* In the DB::VerifyFileChecksums API, ensure that file system reads of SST files are equal to the readahead_size in ReadOptions, if specified. Previously, each read was 2x the readahead_size.

## 8.1.0 (03/18/2023)
### Behavior changes
* Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys.
Expand Down
57 changes: 57 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4494,6 +4494,63 @@ TEST_F(DBBasicTest, VerifyFileChecksums) {
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
}

TEST_F(DBBasicTest, VerifyFileChecksumsReadahead) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env_;
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
DestroyAndReopen(options);

Random rnd(301);
int alignment = 256 * 1024;
for (int i = 0; i < 16; ++i) {
ASSERT_OK(Put("key" + std::to_string(i), rnd.RandomString(alignment)));
}
ASSERT_OK(Flush());

std::vector<std::string> filenames;
int sst_cnt = 0;
std::string sst_name;
uint64_t sst_size;
uint64_t number;
FileType type;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (auto name : filenames) {
if (ParseFileName(name, &number, &type)) {
if (type == kTableFile) {
sst_cnt++;
sst_name = name;
}
}
}
ASSERT_EQ(sst_cnt, 1);
ASSERT_OK(env_->GetFileSize(dbname_ + '/' + sst_name, &sst_size));

bool last_read = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"GenerateOneFileChecksum::Chunk:0", [&](void* /*arg*/) {
if (env_->random_read_bytes_counter_.load() == sst_size) {
EXPECT_FALSE(last_read);
last_read = true;
} else {
ASSERT_EQ(env_->random_read_bytes_counter_.load() & (alignment - 1),
0);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->count_random_reads_ = true;
env_->random_read_bytes_counter_ = 0;
env_->random_read_counter_.Reset();

ReadOptions ro;
ro.readahead_size = alignment;
ASSERT_OK(db_->VerifyFileChecksums(ro));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_TRUE(last_read);
ASSERT_EQ(env_->random_read_counter_.Read(),
(sst_size + alignment - 1) / (alignment));
}

// TODO: re-enable after we provide finer-grained control for WAL tracking to
// meet the needs of different use cases, durability levels and recovery modes.
TEST_F(DBBasicTest, DISABLED_ManualWalSync) {
Expand Down
24 changes: 14 additions & 10 deletions file/file_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ IOStatus GenerateOneFileChecksum(
FileChecksumGenFactory* checksum_factory,
const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
size_t verify_checksums_readahead_size, bool /*allow_mmap_reads*/,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority) {
if (checksum_factory == nullptr) {
Expand Down Expand Up @@ -196,29 +196,33 @@ IOStatus GenerateOneFileChecksum(
size_t readahead_size = (verify_checksums_readahead_size != 0)
? verify_checksums_readahead_size
: default_max_read_ahead_size;

FilePrefetchBuffer prefetch_buffer(readahead_size /* readahead_size */,
readahead_size /* max_readahead_size */,
!allow_mmap_reads /* enable */);
std::unique_ptr<char[]> buf;
if (reader->use_direct_io()) {
size_t alignment = reader->file()->GetRequiredBufferAlignment();
readahead_size = (readahead_size + alignment - 1) & ~(alignment - 1);
}
buf.reset(new char[readahead_size]);

Slice slice;
uint64_t offset = 0;
IOOptions opts;
while (size > 0) {
size_t bytes_to_read =
static_cast<size_t>(std::min(uint64_t{readahead_size}, size));
if (!prefetch_buffer.TryReadFromCache(
opts, reader.get(), offset, bytes_to_read, &slice,
nullptr /* status */, rate_limiter_priority,
false /* for_compaction */)) {
return IOStatus::Corruption("file read failed");
io_s = reader->Read(opts, offset, bytes_to_read, &slice, buf.get(), nullptr,
rate_limiter_priority);
if (!io_s.ok()) {
return IOStatus::Corruption("file read failed with error: " +
io_s.ToString());
}
if (slice.size() == 0) {
return IOStatus::Corruption("file too small");
}
checksum_generator->Update(slice.data(), slice.size());
size -= slice.size();
offset += slice.size();

TEST_SYNC_POINT("GenerateOneFileChecksum::Chunk:0");
}
checksum_generator->Finalize();
*file_checksum = checksum_generator->GetChecksum();
Expand Down

0 comments on commit af72046

Please sign in to comment.