Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Summary:

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
pdillinger committed Sep 20, 2024
1 parent 71e38db commit a8d741e
Show file tree
Hide file tree
Showing 21 changed files with 256 additions and 101 deletions.
10 changes: 6 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1201,10 +1201,12 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(super_version->mem->NewIterator(
read_opts, /*seqno_to_time_mapping=*/nullptr, &arena));
super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr,
&merge_iter_builder,
false /* add_range_tombstone_iter */);
read_opts, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
super_version->imm->AddIterators(
read_opts, /*seqno_to_time_mapping=*/nullptr,
super_version->mutable_cf_options.prefix_extractor.get(),
&merge_iter_builder, false /* add_range_tombstone_iter */);
ScopedArenaPtr<InternalIterator> memtable_iter(merge_iter_builder.Finish());

auto read_seq = super_version->current->version_set()->LastSequence();
Expand Down
166 changes: 125 additions & 41 deletions db/db_bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2177,24 +2177,120 @@ TEST_F(DBBloomFilterTest, MemtableWholeKeyBloomFilterMultiGet) {

db_->ReleaseSnapshot(snapshot);
}
namespace {
std::pair<uint64_t, uint64_t> GetBloomStat(const Options& options, bool sst) {
if (sst) {
return {options.statistics->getAndResetTickerCount(
NON_LAST_LEVEL_SEEK_FILTER_MATCH),
options.statistics->getAndResetTickerCount(
NON_LAST_LEVEL_SEEK_FILTERED)};
} else {
auto hit = std::exchange(get_perf_context()->bloom_memtable_hit_count, 0);
auto miss = std::exchange(get_perf_context()->bloom_memtable_miss_count, 0);
return {hit, miss};
}
}

std::pair<uint64_t, uint64_t> HitAndMiss(uint64_t hits, uint64_t misses) {
return {hits, misses};
}
} // namespace

TEST_F(DBBloomFilterTest, MemtablePrefixBloomOutOfDomain) {
constexpr size_t kPrefixSize = 8;
const std::string kKey = "key";
assert(kKey.size() < kPrefixSize);
TEST_F(DBBloomFilterTest, MemtablePrefixBloom) {
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(kPrefixSize));
options.prefix_extractor.reset(NewFixedPrefixTransform(4));
options.memtable_prefix_bloom_size_ratio = 0.25;
Reopen(options);
ASSERT_OK(Put(kKey, "v"));
ASSERT_EQ("v", Get(kKey));
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ReadOptions()));
iter->Seek(kKey);
ASSERT_FALSE(options.prefix_extractor->InDomain("key"));
ASSERT_OK(Put("key", "v"));
ASSERT_OK(Put("goat1", "g1"));
ASSERT_OK(Put("goat2", "g2"));

GetBloomStat(options, false); // Reset

// Out of domain
ASSERT_EQ("v", Get("key"));
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));

// In domain
ASSERT_EQ("g1", Get("goat1"));
ASSERT_EQ(HitAndMiss(1, 0), GetBloomStat(options, false));
ASSERT_EQ("NOT_FOUND", Get("goat9"));
ASSERT_EQ(HitAndMiss(1, 0), GetBloomStat(options, false));
ASSERT_EQ("NOT_FOUND", Get("goan1"));
ASSERT_EQ(HitAndMiss(0, 1), GetBloomStat(options, false));

ReadOptions ropts;
if (options.prefix_seek_opt_in_only) {
ropts.prefix_same_as_start = true;
}
std::unique_ptr<Iterator> iter(db_->NewIterator(ropts));
// Out of domain
iter->Seek("ke");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(kKey, iter->key());
iter->SeekForPrev(kKey);
ASSERT_EQ("key", iter->key());
iter->SeekForPrev("kez");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(kKey, iter->key());
ASSERT_EQ("key", iter->key());
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));

// In domain
iter->Seek("goan");
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(HitAndMiss(0, 1), GetBloomStat(options, false));
iter->Seek("goat");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("goat1", iter->key());
ASSERT_EQ(HitAndMiss(1, 0), GetBloomStat(options, false));

// Changing prefix extractor should affect prefix query semantics
// and bypass the existing memtable Bloom filter
ASSERT_OK(db_->SetOptions({{"prefix_extractor", "fixed:5"}}));
iter.reset(db_->NewIterator(ropts));
// Now out of domain
iter->Seek("goan");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("goat1", iter->key());
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));
// In domain
iter->Seek("goat2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("goat2", iter->key());
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));
// In domain
if (ropts.prefix_same_as_start) {
iter->Seek("goat0");
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));
} else {
// NOTE: legacy prefix Seek may return keys outside of prefix
}

// Start a fresh new memtable, using new prefix extractor
ASSERT_OK(SingleDelete("key"));
ASSERT_OK(SingleDelete("goat1"));
ASSERT_OK(SingleDelete("goat2"));
ASSERT_OK(Flush());

ASSERT_OK(Put("goat1", "g1"));
ASSERT_OK(Put("goat2", "g2"));

iter.reset(db_->NewIterator(ropts));

// Now out of domain
iter->Seek("goan");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("goat1", iter->key());
ASSERT_EQ(HitAndMiss(0, 0), GetBloomStat(options, false));
// In domain
iter->Seek("goat2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("goat2", iter->key());
ASSERT_EQ(HitAndMiss(1, 0), GetBloomStat(options, false));
// In domain
iter->Seek("goat0");
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(HitAndMiss(0, 1), GetBloomStat(options, false));
}

class DBBloomFilterTestVaryPrefixAndFormatVer
Expand Down Expand Up @@ -2507,7 +2603,11 @@ TEST_P(BloomStatsTestWithParam, BloomStatsTestWithIter) {
ASSERT_OK(Put(key1, value1, WriteOptions()));
ASSERT_OK(Put(key3, value3, WriteOptions()));

std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ReadOptions()));
ReadOptions ropts;
if (options_.prefix_seek_opt_in_only) {
ropts.prefix_same_as_start = true;
}
std::unique_ptr<Iterator> iter(dbfull()->NewIterator(ropts));

// check memtable bloom stats
iter->Seek(key1);
Expand All @@ -2532,7 +2632,7 @@ TEST_P(BloomStatsTestWithParam, BloomStatsTestWithIter) {

ASSERT_OK(Flush());

iter.reset(dbfull()->NewIterator(ReadOptions()));
iter.reset(dbfull()->NewIterator(ropts));

// Check SST bloom stats
iter->Seek(key1);
Expand Down Expand Up @@ -2659,9 +2759,14 @@ TEST_F(DBBloomFilterTest, PrefixScan) {
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
iter = db_->NewIterator(ReadOptions());
ReadOptions ropts;
if (options.prefix_seek_opt_in_only) {
ropts.prefix_same_as_start = true;
}
iter = db_->NewIterator(ropts);
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(prefix)) {
ASSERT_FALSE(ropts.prefix_same_as_start);
break;
}
count++;
Expand Down Expand Up @@ -3397,23 +3502,6 @@ class FixedSuffix4Transform : public SliceTransform {

bool InDomain(const Slice& src) const override { return src.size() >= 4; }
};

std::pair<uint64_t, uint64_t> GetBloomStat(const Options& options, bool sst) {
if (sst) {
return {options.statistics->getAndResetTickerCount(
NON_LAST_LEVEL_SEEK_FILTER_MATCH),
options.statistics->getAndResetTickerCount(
NON_LAST_LEVEL_SEEK_FILTERED)};
} else {
auto hit = std::exchange(get_perf_context()->bloom_memtable_hit_count, 0);
auto miss = std::exchange(get_perf_context()->bloom_memtable_miss_count, 0);
return {hit, miss};
}
}

std::pair<uint64_t, uint64_t> HitAndMiss(uint64_t hits, uint64_t misses) {
return {hits, misses};
}
} // anonymous namespace

// This uses a prefix_extractor + comparator combination that violates
Expand Down Expand Up @@ -3447,9 +3535,7 @@ TEST_F(DBBloomFilterTest, WeirdPrefixExtractorWithFilter1) {
ASSERT_OK(Flush());
}
ReadOptions read_options;
if (flushed) { // TODO: support auto_prefix_mode in memtable?
read_options.auto_prefix_mode = true;
}
read_options.auto_prefix_mode = true;
EXPECT_EQ(GetBloomStat(options, flushed), HitAndMiss(0, 0));
{
Slice ub("999aaaa");
Expand Down Expand Up @@ -3517,9 +3603,8 @@ TEST_F(DBBloomFilterTest, WeirdPrefixExtractorWithFilter2) {
ASSERT_OK(Flush());
}
ReadOptions read_options;
if (flushed) { // TODO: support auto_prefix_mode in memtable?
read_options.auto_prefix_mode = true;
} else {
read_options.auto_prefix_mode = true;
if (!flushed) {
// TODO: why needed?
get_perf_context()->bloom_memtable_hit_count = 0;
get_perf_context()->bloom_memtable_miss_count = 0;
Expand Down Expand Up @@ -3661,9 +3746,8 @@ TEST_F(DBBloomFilterTest, WeirdPrefixExtractorWithFilter3) {
ASSERT_OK(Flush());
}
ReadOptions read_options;
if (flushed) { // TODO: support auto_prefix_mode in memtable?
read_options.auto_prefix_mode = true;
} else {
read_options.auto_prefix_mode = true;
if (!flushed) {
// TODO: why needed?
get_perf_context()->bloom_memtable_hit_count = 0;
get_perf_context()->bloom_memtable_miss_count = 0;
Expand Down
13 changes: 10 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2066,15 +2066,18 @@ InternalIterator* DBImpl::NewInternalIterator(
bool allow_unprepared_value, ArenaWrappedDBIter* db_iter) {
InternalIterator* internal_iter;
assert(arena != nullptr);
auto prefix_extractor =
super_version->mutable_cf_options.prefix_extractor.get();
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(
&cfd->internal_comparator(), arena,
!read_options.total_order_seek &&
super_version->mutable_cf_options.prefix_extractor != nullptr,
!read_options.total_order_seek && // FIXME?
prefix_extractor != nullptr,
read_options.iterate_upper_bound);
// Collect iterator for mutable memtable
auto mem_iter = super_version->mem->NewIterator(
read_options, super_version->GetSeqnoToTimeMapping(), arena);
read_options, super_version->GetSeqnoToTimeMapping(), arena,
super_version->mutable_cf_options.prefix_extractor.get());
Status s;
if (!read_options.ignore_range_deletions) {
std::unique_ptr<TruncatedRangeDelIterator> mem_tombstone_iter;
Expand All @@ -2098,6 +2101,7 @@ InternalIterator* DBImpl::NewInternalIterator(
if (s.ok()) {
super_version->imm->AddIterators(
read_options, super_version->GetSeqnoToTimeMapping(),
super_version->mutable_cf_options.prefix_extractor.get(),
&merge_iter_builder, !read_options.ignore_range_deletions);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
Expand Down Expand Up @@ -3805,6 +3809,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
read_options.io_activity = Env::IOActivity::kDBIterator;
}

read_options.total_order_seek |=
immutable_db_options_.prefix_seek_opt_in_only;

if (read_options.managed) {
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
TableProperties table_properties;
{
ScopedArenaPtr<InternalIterator> iter(
mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena));
mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" Level-0 table #%" PRIu64 ": started",
Expand Down
5 changes: 2 additions & 3 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
valid_(false),
current_entry_is_merged_(false),
is_key_seqnum_zero_(false),
prefix_same_as_start_(mutable_cf_options.prefix_extractor
? read_options.prefix_same_as_start
: false),
prefix_same_as_start_(
prefix_extractor_ ? read_options.prefix_same_as_start : false),
pin_thru_lifetime_(read_options.pin_data),
expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
read_options.total_order_seek ||
Expand Down
2 changes: 2 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5597,6 +5597,7 @@ TEST_F(DBTest2, PrefixBloomFilteredOut) {
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.whole_key_filtering = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.prefix_seek_opt_in_only = false; // Use legacy prefix seek
DestroyAndReopen(options);

// Construct two L1 files with keys:
Expand Down Expand Up @@ -5987,6 +5988,7 @@ TEST_F(DBTest2, ChangePrefixExtractor) {
// create a DB with block prefix index
BlockBasedTableOptions table_options;
Options options = CurrentOptions();
options.prefix_seek_opt_in_only = false; // Use legacy prefix seek

// Sometimes filter is checked based on upper bound. Assert counters
// for that case. Otherwise, only check data correctness.
Expand Down
2 changes: 2 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ TEST_P(DBBasicTestWithTimestampTableOptions, GetAndMultiGet) {

TEST_P(DBBasicTestWithTimestampTableOptions, SeekWithPrefixLessThanKey) {
Options options = CurrentOptions();
options.prefix_seek_opt_in_only = false; // Use legacy prefix seek
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
Expand Down Expand Up @@ -1009,6 +1010,7 @@ TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) {
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
options.prefix_seek_opt_in_only = false; // Use legacy prefix seek
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
DestroyAndReopen(options);
const std::vector<std::string> timestamps = {Timestamp(1, 1), Timestamp(0, 2),
Expand Down
8 changes: 4 additions & 4 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ Status FlushJob::MemPurge() {
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
for (MemTable* m : mems_) {
memtables.push_back(
m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena));
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr));
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (range_del_iter != nullptr) {
Expand Down Expand Up @@ -893,8 +893,8 @@ Status FlushJob::WriteLevel0Table() {
db_options_.info_log,
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(
m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena));
memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr));
auto* range_del_iter = m->NewRangeTombstoneIterator(
ro, kMaxSequenceNumber, true /* immutable_memtable */);
if (range_del_iter != nullptr) {
Expand Down
16 changes: 10 additions & 6 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,11 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
sv_->GetSeqnoToTimeMapping();
mutable_iter_ =
sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_);
sv_->imm->AddIterators(read_options_, seqno_to_time_mapping, &imm_iters_,
&arena_);
sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
sv_->mutable_cf_options.prefix_extractor.get());
sv_->imm->AddIterators(read_options_, seqno_to_time_mapping,
sv_->mutable_cf_options.prefix_extractor.get(),
&imm_iters_, &arena_);
if (!read_options_.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
sv_->mem->NewRangeTombstoneIterator(
Expand Down Expand Up @@ -781,9 +783,11 @@ void ForwardIterator::RenewIterators() {
UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
svnew->GetSeqnoToTimeMapping();
mutable_iter_ =
svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_);
svnew->imm->AddIterators(read_options_, seqno_to_time_mapping, &imm_iters_,
&arena_);
svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
svnew->mutable_cf_options.prefix_extractor.get());
svnew->imm->AddIterators(read_options_, seqno_to_time_mapping,
svnew->mutable_cf_options.prefix_extractor.get(),
&imm_iters_, &arena_);
ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
kMaxSequenceNumber /* upper_bound */);
if (!read_options_.ignore_range_deletions) {
Expand Down
Loading

0 comments on commit a8d741e

Please sign in to comment.