Skip to content

Commit

Permalink
Fix wrong value passed in compaction filter in BlobDB (#10391)
Browse files Browse the repository at this point in the history
Summary:
New blobdb has a bug in compaction filter, where `blob_value_` is not reset for next iterated key. This will cause blob_value_ not empty and previous value read from blob is passed into the filter function for next key, even if its value is not in blob. Fixed by reseting regardless of key type.

Test Case:
Add `FilterByValueLength` test case in `DBBlobCompactionTest`

Pull Request resolved: #10391

Reviewed By: riversand963

Differential Revision: D38629900

Pulled By: ltamasi

fbshipit-source-id: 47d23ff2e5ec697958a210db9e6ceeb8b2fc49fa
  • Loading branch information
sherriiiliu authored and facebook-github-bot committed Aug 11, 2022
1 parent f42fec2 commit 4753e5a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
69 changes: 69 additions & 0 deletions db/blob/db_blob_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ class FilterByKeyLength : public CompactionFilter {
size_t length_threshold_;
};

class FilterByValueLength : public CompactionFilter {
public:
explicit FilterByValueLength(size_t len) : length_threshold_(len) {}
const char* Name() const override {
return "rocksdb.compaction.filter.by.value.length";
}
CompactionFilter::Decision FilterV2(
int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
const Slice& existing_value, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (existing_value.size() < length_threshold_) {
return CompactionFilter::Decision::kRemove;
}
return CompactionFilter::Decision::kKeep;
}

private:
size_t length_threshold_;
};

class BadBlobCompactionFilter : public CompactionFilter {
public:
explicit BadBlobCompactionFilter(std::string prefix,
Expand Down Expand Up @@ -243,6 +263,55 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
Close();
}

TEST_F(DBBlobCompactionTest, FilterByValueLength) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 5;
options.create_if_missing = true;
constexpr size_t kValueLength = 5;
std::unique_ptr<CompactionFilter> compaction_filter_guard(
new FilterByValueLength(kValueLength));
options.compaction_filter = compaction_filter_guard.get();

const std::vector<std::string> short_value_keys = {"a", "e", "j"};
constexpr char short_value[] = "val";
const std::vector<std::string> long_value_keys = {"b", "f", "k"};
constexpr char long_value[] = "valuevalue";

DestroyAndReopen(options);
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_OK(Put(short_value_keys[i], short_value));
}
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_OK(Put(long_value_keys[i], long_value));
}
ASSERT_OK(Flush());
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
std::string value;
for (size_t i = 0; i < short_value_keys.size(); ++i) {
ASSERT_TRUE(
db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound());
value.clear();
}
for (size_t i = 0; i < long_value_keys.size(); ++i) {
ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value));
ASSERT_EQ(long_value, value);
}

#ifndef ROCKSDB_LITE
const auto& compaction_stats = GetCompactionStats();
ASSERT_GE(compaction_stats.size(), 2);

// Filter decides between kKeep and kRemove based on value;
// this involves reading but not writing blobs
ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
#endif // ROCKSDB_LITE

Close();
}

#ifndef ROCKSDB_LITE
TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
Options options = GetDefaultOptions();
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
{
StopWatchNano timer(clock_, report_detailed_time_);
if (kTypeBlobIndex == ikey_.type) {
blob_value_.Reset();
filter = compaction_filter_->FilterBlobByKey(
level_, filter_key, &compaction_filter_value_,
compaction_filter_skip_until_.rep());
Expand Down Expand Up @@ -367,6 +366,7 @@ void CompactionIterator::NextFromInput() {
!IsShuttingDown()) {
key_ = input_.key();
value_ = input_.value();
blob_value_.Reset();
iter_stats_.num_input_records++;

Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
Expand Down

0 comments on commit 4753e5a

Please sign in to comment.