Skip to content

Commit

Permalink
New API to get all merge operands for a Key (#5604)
Browse files Browse the repository at this point in the history
Summary:
This is a new API added to db.h to allow for fetching all merge operands associated with a Key. The main motivation for this API is to support use cases where doing a full online merge is not necessary as it is performance sensitive. Example use-cases:
1. Update subset of columns and read subset of columns -
Imagine a SQL Table, a row is encoded as a K/V pair (as it is done in MyRocks). If there are many columns and users only updated one of them, we can use merge operator to reduce write amplification. While users only read one or two columns in the read query, this feature can avoid a full merging of the whole row, and save some CPU.
2. Updating very few attributes in a value which is a JSON-like document -
Updating one attribute can be done efficiently using merge operator, while reading back one attribute can be done more efficiently if we don't need to do a full merge.
----------------------------------------------------------------------------------------------------
API :
Status GetMergeOperands(
      const ReadOptions& options, ColumnFamilyHandle* column_family,
      const Slice& key, PinnableSlice* merge_operands,
      GetMergeOperandsOptions* get_merge_operands_options,
      int* number_of_operands)

Example usage :
int size = 100;
int number_of_operands = 0;
std::vector<PinnableSlice> values(size);
GetMergeOperandsOptions merge_operands_info;
db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), "k1", values.data(), merge_operands_info, &number_of_operands);

Description :
Returns all the merge operands corresponding to the key. If the number of merge operands in DB is greater than merge_operands_options.expected_max_number_of_operands no merge operands are returned and status is Incomplete. Merge operands returned are in the order of insertion.
merge_operands-> Points to an array of at-least merge_operands_options.expected_max_number_of_operands and the caller is responsible for allocating it. If the status returned is Incomplete then number_of_operands will contain the total number of merge operands found in DB for key.
Pull Request resolved: #5604

Test Plan:
Added unit test and perf test in db_bench that can be run using the command:
./db_bench -benchmarks=getmergeoperands --merge_operator=sortlist

Differential Revision: D16657366

Pulled By: vjnadimpalli

fbshipit-source-id: 0faadd752351745224ee12d4ae9ef3cb529951bf
  • Loading branch information
vjnadimpalli authored and facebook-github-bot committed Aug 6, 2019
1 parent 4f98b43 commit d150e01
Show file tree
Hide file tree
Showing 40 changed files with 914 additions and 163 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ set(SOURCES
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/sortlist.cc
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/uint64add.cc
Expand Down Expand Up @@ -887,6 +888,7 @@ if(WITH_TESTS)
db/db_log_iter_test.cc
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ TESTS = \
db_iterator_test \
db_memtable_test \
db_merge_operator_test \
db_merge_operand_test \
db_options_test \
db_range_del_test \
db_secondary_test \
Expand Down Expand Up @@ -1254,6 +1255,9 @@ db_memtable_test: db/db_memtable_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHA
db_merge_operator_test: db/db_merge_operator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_merge_operand_test: db/db_merge_operand_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ cpp_library(
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/sortlist.cc",
"utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc",
Expand Down Expand Up @@ -755,6 +756,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_merge_operand_test",
"db/db_merge_operand_test.cc",
"parallel",
[],
[],
],
[
"db_options_test",
"db/db_options_test.cc",
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ build:
test:

test_script:
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test -Concurrency 8
- ps: build_tools\run_ci_db_test.ps1 -SuiteRun db_basic_test,db_test2,db_test,env_basic_test,env_test,db_merge_operand_test -Concurrency 8

on_failure:
- cmd: 7z a build-failed.zip %APPVEYOR_BUILD_FOLDER%\build\ && appveyor PushArtifact build-failed.zip
Expand Down
4 changes: 2 additions & 2 deletions db/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, nullptr);
true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
Expand Down Expand Up @@ -70,7 +70,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
std::string& value = (*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, nullptr);
nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
value.assign(pinnable_val.data(), pinnable_val.size());
Expand Down
8 changes: 5 additions & 3 deletions db/db_blob_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ class DBBlobIndexTest : public DBTestBase {
ReadOptions read_options;
read_options.snapshot = snapshot;
PinnableSlice value;
auto s = dbfull()->GetImpl(read_options, cfh(), key, &value,
nullptr /*value_found*/, nullptr /*callback*/,
is_blob_index);
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cfh();
get_impl_options.value = &value;
get_impl_options.is_blob_index = is_blob_index;
auto s = dbfull()->GetImpl(read_options, key, get_impl_options);
if (s.IsNotFound()) {
return "NOT_FOUND";
}
Expand Down
112 changes: 81 additions & 31 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,27 +1441,30 @@ ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options);
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback, bool* is_blob_index) {
assert(pinnable_val != nullptr);
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);

auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
auto cfd = cfh->cfd();

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
tracer_->Get(get_impl_options.column_family, key);
}
}

Expand All @@ -1473,9 +1476,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,

SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (callback) {
if (get_impl_options.callback) {
// Already calculated based on read_options.snapshot
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
Expand All @@ -1489,12 +1492,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
if (callback) {
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
// However, we still want our callback to contain the actual snapshot so
// that it can do the correct visibility filtering.
callback->Refresh(snapshot);
get_impl_options.callback->Refresh(snapshot);

// Internally, WriteUnpreparedTxnReadCallback::Refresh would set
// max_visible_seq = max(max_visible_seq, snapshot)
Expand All @@ -1505,7 +1508,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// be needed.
//
// assert(callback->max_visible_seq() >= snapshot);
snapshot = callback->max_visible_seq();
snapshot = get_impl_options.callback->max_visible_seq();
}
}
TEST_SYNC_POINT("DBImpl::GetImpl:3");
Expand All @@ -1526,19 +1529,39 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
// Get value associated with key
if (get_impl_options.get_value) {
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
done = true;
get_impl_options.value->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
} else {
// Get Merge Operands associated with key, Merge Operands should not be
// merged and raw values should be returned to the user.
if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, read_options, nullptr,
nullptr, false)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->GetMergeOperands(lkey, &s, &merge_context,
&max_covering_tombstone_seq,
read_options)) {
done = true;
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
Expand All @@ -1547,9 +1570,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&max_covering_tombstone_seq, value_found, nullptr, nullptr,
callback, is_blob_index);
sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context,
&max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
RecordTick(stats_, MEMTABLE_MISS);
}

Expand All @@ -1561,7 +1589,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = 0;
if (s.ok()) {
size = pinnable_val->size();
if (get_impl_options.get_value) {
size = get_impl_options.value->size();
} else {
// Return all merge operands for get_impl_options.key
*get_impl_options.number_of_operands =
static_cast<int>(merge_context.GetNumOperands());
if (*get_impl_options.number_of_operands >
get_impl_options.get_merge_operands_options
->expected_max_number_of_operands) {
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
Expand Down Expand Up @@ -2222,7 +2268,11 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
ReadOptions roptions = read_options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
PinnableSlice pinnable_val;
auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found);
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = value_found;
auto s = GetImpl(roptions, key, get_impl_options);
value->assign(pinnable_val.data(), pinnable_val.size());

// If block_cache is enabled and the index block of the table didn't
Expand Down
43 changes: 39 additions & 4 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;

using DB::GetMergeOperands;
Status GetMergeOperands(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* merge_operands,
GetMergeOperandsOptions* get_merge_operands_options,
int* number_of_operands) override {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.merge_operands = merge_operands;
get_impl_options.get_merge_operands_options = get_merge_operands_options;
get_impl_options.number_of_operands = number_of_operands;
get_impl_options.get_value = false;
return GetImpl(options, key, get_impl_options);
}

using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
Expand Down Expand Up @@ -395,12 +410,32 @@ class DBImpl : public DB {

// ---- End of implementations of the DB interface ----

struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
PinnableSlice* value = nullptr;
bool* value_found = nullptr;
ReadCallback* callback = nullptr;
bool* is_blob_index = nullptr;
// If true return value associated with key via value pointer else return
// all merge operands for key via merge_operands pointer
bool get_value = true;
// Pointer to an array of size
// get_merge_operands_options.expected_max_number_of_operands allocated by
// user
PinnableSlice* merge_operands = nullptr;
GetMergeOperandsOptions* get_merge_operands_options = nullptr;
int* number_of_operands = nullptr;
};

// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
// This function is also called by GetMergeOperands
// If get_impl_options.get_value = true get value associated with
// get_impl_options.key via get_impl_options.value
// If get_impl_options.get_value = false get merge operands associated with
// get_impl_options.key via get_impl_options.merge_operands
Status GetImpl(const ReadOptions& options, const Slice& key,
GetImplOptions get_impl_options);

ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// We may ignore the dbname when generating the file names.
for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back(
MakeTableFileName(file.metadata->fd.GetNumber()),
file.path);
MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
if (file.metadata->table_reader_handle) {
table_cache_->Release(file.metadata->table_reader_handle);
}
Expand Down
Loading

0 comments on commit d150e01

Please sign in to comment.