Skip to content

Commit

Permalink
Fix GetMergeOperands in ReadOnlyDB and SecondaryDB
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Jan 28, 2025
1 parent 591f5b1 commit 248bdc5
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 13 deletions.
19 changes: 15 additions & 4 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -86,7 +87,11 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
return s;
}
}
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
// TODO - Large Result Optimization for Read Only DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand All @@ -98,9 +103,6 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
get_impl_options.columns, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, &read_cb)) {
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
} else {
PERF_TIMER_GUARD(get_from_output_files_time);
Expand All @@ -118,9 +120,18 @@ Status DBImplReadOnly::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, BYTES_READ, size);
RecordInHistogram(stats_, BYTES_PER_READ, size);
Expand Down
21 changes: 14 additions & 7 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.columns != nullptr);
get_impl_options.columns != nullptr ||
get_impl_options.merge_operands != nullptr);
assert(get_impl_options.column_family);

Status s;
Expand Down Expand Up @@ -397,6 +398,9 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
}
}
MergeContext merge_context;
// TODO - Large Result Optimization for Secondary DB
// (https://github.com/facebook/rocksdb/pull/10458)

SequenceNumber max_covering_tombstone_seq = 0;
LookupKey lkey(key, snapshot, read_options.timestamp);
PERF_TIMER_STOP(get_snapshot_time);
Expand All @@ -410,9 +414,6 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
&max_covering_tombstone_seq, read_options,
false /* immutable_memtable */, &read_cb)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get(
Expand All @@ -422,9 +423,6 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
get_impl_options.columns, ts, &s, &merge_context,
&max_covering_tombstone_seq, read_options, &read_cb)) {
done = true;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
}
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
Expand All @@ -448,9 +446,18 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (get_impl_options.value) {
get_impl_options.value->PinSelf();
size = get_impl_options.value->size();
} else if (get_impl_options.columns) {
size = get_impl_options.columns->serialized_size();
} else if (get_impl_options.merge_operands) {
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
RecordTick(stats_, BYTES_READ, size);
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
Expand Down
21 changes: 20 additions & 1 deletion db/db_merge_operand_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ TEST_F(DBMergeOperandTest, FlushedMergeOperandReadAfterFreeBug) {

TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
Options options = CurrentOptions();

int limit = 2;
// Use only the latest two merge operands.
options.merge_operator = std::make_shared<LimitedStringAppendMergeOp>(2, ',');
options.merge_operator =
std::make_shared<LimitedStringAppendMergeOp>(limit, ',');
Reopen(options);
int num_records = 4;
int number_of_operands = 0;
Expand Down Expand Up @@ -299,9 +302,25 @@ TEST_F(DBMergeOperandTest, GetMergeOperandsBasic) {
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k5", values.data(), &merge_operands_info,
&number_of_operands));
ASSERT_EQ(number_of_operands, 4);
ASSERT_EQ(values[0], "remember");
ASSERT_EQ(values[1], "i");
ASSERT_EQ(values[2], "am");
ASSERT_EQ(values[3], "rocks");

// GetMergeOperands() in ReadOnly DB
ASSERT_OK(Merge("k6", "better"));
ASSERT_OK(Merge("k6", "call"));
ASSERT_OK(Merge("k6", "saul"));

ASSERT_OK(ReadOnlyReopen(options));
std::vector<PinnableSlice> readonly_values(num_records);
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k6", readonly_values.data(),
&merge_operands_info, &number_of_operands));
ASSERT_EQ(number_of_operands, limit);
ASSERT_EQ(readonly_values[0], "call");
ASSERT_EQ(readonly_values[1], "saul");
}

TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
Expand Down
40 changes: 39 additions & 1 deletion db/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
#include "utilities/fault_injection_env.h"
#include "utilities/merge_operators/string_append/stringappend2.h"

namespace ROCKSDB_NAMESPACE {

class DBSecondaryTestBase : public DBBasicTestWithTimestampBase {
public:
explicit DBSecondaryTestBase(const std::string& dbname)
Expand Down Expand Up @@ -331,6 +331,44 @@ TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) {
// cfh, input1, &result));
}

TEST_F(DBSecondaryTest, GetMergeOperands) {
Options options;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
options.env = env_;
Reopen(options);

ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "v2"));
ASSERT_OK(Merge("k1", "v3"));
ASSERT_OK(Merge("k1", "v4"));

options.max_open_files = -1;
OpenSecondary(options);

ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());

int num_records = 4;
int number_of_operands = 0;
std::vector<PinnableSlice> values(num_records);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = num_records;

auto cfh = db_secondary_->DefaultColumnFamily();

// s.IsMergeInProgress()
const Status s = db_secondary_->GetMergeOperands(
ReadOptions(), cfh, "k1", values.data(), &merge_operands_info,
&number_of_operands);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsMergeInProgress());

ASSERT_EQ(number_of_operands, 4);
ASSERT_EQ(values[0].ToString(), "v1");
ASSERT_EQ(values[1].ToString(), "v2");
ASSERT_EQ(values[2].ToString(), "v3");
ASSERT_EQ(values[3].ToString(), "v4");
}

TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) {
Options options;
options.env = env_;
Expand Down

0 comments on commit 248bdc5

Please sign in to comment.