Skip to content

Commit

Permalink
Introduce MergeContext to Lazily Initialize merge operand list
Browse files Browse the repository at this point in the history
Summary: In get operations, merge_operands is only used in few cases. Lazily initialize it can reduce average latency in some cases

Test Plan: make all check

Reviewers: haobo, kailiu, dhruba

Reviewed By: haobo

CC: igor, nkg-, leveldb

Differential Revision: https://reviews.facebook.net/D14415

Conflicts:
	db/db_impl.cc
	db/memtable.cc
  • Loading branch information
siying committed Dec 11, 2013
1 parent bc5dd19 commit a8029fd
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 53 deletions.
21 changes: 11 additions & 10 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtablelist.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/prefix_filter_iterator.h"
#include "db/table_cache.h"
Expand Down Expand Up @@ -2608,20 +2609,20 @@ Status DBImpl::GetImpl(const ReadOptions& options,


// Prepare to store a list of merge operations if merge occurs.
std::deque<std::string> merge_operands;
MergeContext merge_context;

// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) {
if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) {
} else if (imm.Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
current->Get(options, lkey, value, &s, &merge_operands, &stats,
current->Get(options, lkey, value, &s, &merge_context, &stats,
options_, value_found);
have_stat_update = true;
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
Expand Down Expand Up @@ -2676,8 +2677,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
bool have_stat_update = false;
Version::GetStats stats;

// Prepare to store a list of merge operations if merge occurs.
std::deque<std::string> merge_operands;
// Contain a list of merge operations if merge occurs.
MergeContext merge_context;

// Note: this always resizes the values array
int numKeys = keys.size();
Expand All @@ -2692,17 +2693,17 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
for (int i=0; i<numKeys; ++i) {
merge_operands.clear();
merge_context.Clear();
Status& s = statList[i];
std::string* value = &(*values)[i];

LookupKey lkey(keys[i], snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) {
if (mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) {
} else if (imm.Get(lkey, value, &s, merge_context, options_)) {
// Done
} else {
current->Get(options, lkey, value, &s, &merge_operands, &stats, options_);
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
have_stat_update = true;
}

Expand Down
8 changes: 5 additions & 3 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
Expand Down Expand Up @@ -57,12 +59,12 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
MemTable* mem = GetMemTable();
Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence();
std::deque<std::string> merge_operands;
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) {
if (mem->Get(lkey, value, &s, merge_context, options_)) {
} else {
Version::GetStats stats;
current->Get(options, lkey, value, &s, &merge_operands, &stats, options_);
current->Get(options, lkey, value, &s, &merge_context, &stats, options_);
}
return s;
}
Expand Down
32 changes: 15 additions & 17 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <memory>

#include "db/dbformat.h"
#include "db/merge_context.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
Expand Down Expand Up @@ -162,15 +163,12 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options) {
MergeContext& merge_context, const Options& options) {
Slice memkey = key.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(memkey.data());

// It is the caller's responsibility to allocate/delete operands list
assert(operands != nullptr);

bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get();
auto logger = options.info_log;
Expand Down Expand Up @@ -202,8 +200,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
*s = Status::OK();
if (merge_in_progress) {
assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
value, logger.get())) {
if (!merge_operator->FullMerge(key.user_key(), &v,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
Expand All @@ -219,8 +218,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (merge_in_progress) {
assert(merge_operator);
*s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
value, logger.get())) {
if (!merge_operator->FullMerge(key.user_key(), nullptr,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
Expand All @@ -232,16 +232,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
merge_in_progress = true;
operands->push_front(v.ToString());
while(operands->size() >= 2) {
merge_context.PushOperand(v);
while(merge_context.GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(key.user_key(),
Slice((*operands)[0]),
Slice((*operands)[1]),
&merge_result,
logger.get())) {
operands->pop_front();
swap(operands->front(), merge_result);
if (merge_operator->PartialMerge(key.user_key(),
merge_context.GetOperand(0),
merge_context.GetOperand(1),
&merge_result, logger.get())) {
merge_context.PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
Expand Down
3 changes: 2 additions & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace rocksdb {

class Mutex;
class MemTableIterator;
class MergeContext;

class MemTable {
public:
Expand Down Expand Up @@ -94,7 +95,7 @@ class MemTable {
// store MergeInProgress in s, and return false.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options);
MergeContext& merge_context, const Options& options);

// Update the value and return status ok,
// if key exists in current memtable
Expand Down
5 changes: 2 additions & 3 deletions db/memtablelist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ size_t MemTableList::ApproximateMemoryUsage() {
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands,
const Options& options) {
MergeContext& merge_context, const Options& options) {
for (auto &memtable : memlist_) {
if (memtable->Get(key, value, s, operands, options)) {
if (memtable->Get(key, value, s, merge_context, options)) {
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion db/memtablelist.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MemTableList {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
std::deque<std::string>* operands, const Options& options);
MergeContext& merge_context, const Options& options);

// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);
Expand Down
69 changes: 69 additions & 0 deletions db/merge_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include "db/dbformat.h"
#include "rocksdb/slice.h"
#include <string>
#include <deque>

namespace rocksdb {

const std::deque<std::string> empty_operand_list;

// The merge context for merging a user key.
// When doing a Get(), DB will create such a class and pass it when
// issuing Get() operation to memtables and version_set. The operands
// will be fetched from the context when issuing partial of full merge.
class MergeContext {
public:
// Clear all the operands
void Clear() {
if (operand_list) {
operand_list->clear();
}
}
// Replace the first two operands of merge_result, which are expected be the
// merge results of them.
void PushPartialMergeResult(std::string& merge_result) {
assert (operand_list);
operand_list->pop_front();
swap(operand_list->front(), merge_result);
}
// Push a merge operand
void PushOperand(const Slice& operand_slice) {
Initialize();
operand_list->push_front(operand_slice.ToString());
}
// return total number of operands in the list
size_t GetNumOperands() const {
if (!operand_list) {
return 0;
}
return operand_list->size();
}
// Get the operand at the index.
Slice GetOperand(int index) const {
assert (operand_list);
return (*operand_list)[index];
}
// Return all the operands.
const std::deque<std::string>& GetOperands() const {
if (!operand_list) {
return empty_operand_list;
}
return *operand_list;
}
private:
void Initialize() {
if (!operand_list) {
operand_list.reset(new std::deque<std::string>());
}
}
std::unique_ptr<std::deque<std::string>> operand_list;
};

} // namespace rocksdb

34 changes: 19 additions & 15 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
Expand Down Expand Up @@ -287,7 +288,8 @@ struct Saver {
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
std::deque<std::string>* merge_operands; // the merge operations encountered
// the merge operations encountered;
MergeContext* merge_context;
Logger* logger;
bool didIO; // did we do any disk io?
Statistics* statistics;
Expand All @@ -309,10 +311,10 @@ static void MarkKeyMayExist(void* arg) {

static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
Saver* s = reinterpret_cast<Saver*>(arg);
std::deque<std::string>* const ops = s->merge_operands; // shorter alias
MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later

assert(s != nullptr && ops != nullptr);
assert(s != nullptr && merge_contex != nullptr);

ParsedInternalKey parsed_key;
// TODO: didIO and Merge?
Expand All @@ -331,7 +333,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
} else if (kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v, *ops,
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
Expand All @@ -346,8 +349,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
s->state = kDeleted;
} else if (kMerge == s->state) {
s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops,
s->value, s->logger)) {
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
Expand All @@ -359,16 +363,15 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
case kTypeMerge:
assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
ops->push_front(v.ToString());
while (ops->size() >= 2) {
merge_contex->PushOperand(v);
while (merge_contex->GetNumOperands() >= 2) {
// Attempt to merge operands together via user associateive merge
if (s->merge_operator->PartialMerge(s->user_key,
Slice((*ops)[0]),
Slice((*ops)[1]),
merge_contex->GetOperand(0),
merge_contex->GetOperand(1),
&merge_result,
s->logger)) {
ops->pop_front();
swap(ops->front(), merge_result);
merge_contex->PushPartialMergeResult(merge_result);
} else {
// Associative merge returns false ==> stack the operands
break;
Expand Down Expand Up @@ -417,7 +420,7 @@ void Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
Status* status,
std::deque<std::string>* operands,
MergeContext* merge_context,
GetStats* stats,
const Options& db_options,
bool* value_found) {
Expand All @@ -436,7 +439,7 @@ void Version::Get(const ReadOptions& options,
saver.value_found = value_found;
saver.value = value;
saver.merge_operator = merge_operator;
saver.merge_operands = operands;
saver.merge_context = merge_context;
saver.logger = logger.get();
saver.didIO = false;
saver.statistics = db_options.statistics.get();
Expand Down Expand Up @@ -557,7 +560,8 @@ void Version::Get(const ReadOptions& options,
if (kMerge == saver.state) {
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands,
if (merge_operator->FullMerge(user_key, nullptr,
saver.merge_context->GetOperands(),
value, logger.get())) {
*status = Status::OK();
} else {
Expand Down
Loading

0 comments on commit a8029fd

Please sign in to comment.