Skip to content

Commit

Permalink
Fix GetMergeOperands in ReadOnlyDB and SecondaryDB
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Jan 27, 2025
1 parent 591f5b1 commit 7da69a4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
18 changes: 17 additions & 1 deletion db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -86,7 +87,11 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
return s;
}
}
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
// TODO - Large Result Optimization for Read Only DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand All @@ -100,6 +105,11 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
false /* immutable_memtable */, &read_cb)) {
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
} else if (get_impl_options.merge_operands) {
for (const Slice& sl : merge_context.GetOperands()) {
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, MEMTABLE_HIT);
} else {
Expand All @@ -121,6 +131,12 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
}
}
RecordTick(stats_, BYTES_READ, size);
RecordInHistogram(stats_, BYTES_PER_READ, size);
Expand Down
22 changes: 21 additions & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -397,6 +398,9 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
}
}
MergeContext merge_context;
// TODO - Large Result Optimization for Secondary DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand All @@ -412,6 +416,11 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
} else if (get_impl_options.merge_operands) {
for (const Slice& sl : merge_context.GetOperands()) {
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
Expand All @@ -424,6 +433,11 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
} else if (get_impl_options.merge_operands) {
for (const Slice& sl : merge_context.GetOperands()) {
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, MEMTABLE_HIT);
}
Expand Down Expand Up @@ -451,6 +465,12 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
}
}
RecordTick(stats_, BYTES_READ, size);
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
Expand Down
21 changes: 20 additions & 1 deletion db/db_merge_operand_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {

TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
Options options = CurrentOptions();

int limit = 2;
// Use only the latest two merge operands.
options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.merge_operator =
std::make_shared<LimitedStringAppendMergeOp>(limit, ',');
Reopen(options);
int num_records = 4;
int number_of_operands = 0;
Expand Down Expand Up @@ -299,9 +302,25 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k5", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(number_of_operands, 4);
ASSERT_EQ(values[0], "remember");
ASSERT_EQ(values[1], "i");
ASSERT_EQ(values[2], "am");
ASSERT_EQ(values[3], "rocks");

// GetMergeOperands() in ReadOnly DB
ASSERT_OK(Merge("k6", "better"));
ASSERT_OK(Merge("k6", "call"));
ASSERT_OK(Merge("k6", "saul"));

ReadOnlyReopen(options);
std::vector<PinnableSlice> readonly_values(num_records);
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k6", readonly_values.data(),
&merge_operands_info, &number_of_operands));
ASSERT_EQ(number_of_operands, limit);
ASSERT_EQ(readonly_values[0], "call");
ASSERT_EQ(readonly_values[1], "saul");
}

TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
Expand Down

0 comments on commit 7da69a4

Please sign in to comment.