Skip to content

Commit

Permalink
MemTableOptions
Browse files Browse the repository at this point in the history
Summary: removed reference to options in WriteBatch and DBImpl::Get()

Test Plan: make all check

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23049
  • Loading branch information
Lei Jin committed Sep 9, 2014
1 parent 55114e7 commit 5231146
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 88 deletions.
3 changes: 2 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ void ColumnFamilyData::CreateNewMemtable() {
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, options_);
mem_ = new MemTable(internal_comparator_, ioptions_,
MemTableOptions(options_));
mem_->Ref();
}

Expand Down
15 changes: 7 additions & 8 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3434,10 +3434,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);

if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) {
if (sv->mem->Get(lkey, value, &s, merge_context)) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) {
} else if (sv->imm->Get(lkey, value, &s, merge_context)) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else {
Expand Down Expand Up @@ -3522,12 +3522,9 @@ std::vector<Status> DBImpl::MultiGet(
assert(mgd_iter != multiget_cf_data.end());
auto mgd = mgd_iter->second;
auto super_version = mgd->super_version;
auto cfd = mgd->cfd;
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->options())) {
if (super_version->mem->Get(lkey, value, &s, merge_context)) {
// Done
} else if (super_version->imm->Get(lkey, value, &s, merge_context,
*cfd->options())) {
} else if (super_version->imm->Get(lkey, value, &s, merge_context)) {
// Done
} else {
super_version->current->Get(options, lkey, value, &s, &merge_context);
Expand Down Expand Up @@ -4294,7 +4291,9 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
}

if (s.ok()) {
new_mem = new MemTable(cfd->internal_comparator(), *cfd->options());
new_mem = new MemTable(cfd->internal_comparator(),
*cfd->ioptions(),
MemTableOptions(*cfd->options()));
new_superversion = new SuperVersion();
}
}
Expand Down
11 changes: 5 additions & 6 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@

namespace rocksdb {

DBImplReadOnly::DBImplReadOnly(const DBOptions& options,
DBImplReadOnly::DBImplReadOnly(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(options, dbname) {
: DBImpl(db_options, dbname) {
Log(db_options_.info_log, "Opening the db in read only mode");
}

DBImplReadOnly::~DBImplReadOnly() {
}

// Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& options,
Status DBImplReadOnly::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status s;
Expand All @@ -61,10 +61,9 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->options())) {
if (super_version->mem->Get(lkey, value, &s, merge_context)) {
} else {
super_version->current->Get(options, lkey, value, &s, &merge_context);
super_version->current->Get(read_options, lkey, value, &s, &merge_context);
}
return s;
}
Expand Down
87 changes: 52 additions & 35 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,51 @@

namespace rocksdb {

MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
MemTableOptions::MemTableOptions(const Options& options)
: write_buffer_size(options.write_buffer_size),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes),
memtable_prefix_bloom_huge_page_tlb_size(
options.memtable_prefix_bloom_huge_page_tlb_size),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks),
inplace_callback(options.inplace_callback),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes) {}

MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions)
: comparator_(cmp),
ioptions_(ioptions),
moptions_(moptions),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
kWriteBufferSize(options.write_buffer_size),
arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get(),
options.info_log.get())),
kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)),
arena_(moptions.arena_block_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, ioptions.prefix_extractor,
ioptions.info_log)),
num_entries_(0),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
first_seqno_(0),
mem_next_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor.get()),
locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks
: 0),
prefix_extractor_(ioptions.prefix_extractor),
should_flush_(ShouldFlushNow()) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
if (prefix_extractor_ && moptions.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(
&arena_,
options.memtable_prefix_bloom_bits, options.bloom_locality,
options.memtable_prefix_bloom_probes, nullptr,
options.memtable_prefix_bloom_huge_page_tlb_size,
options.info_log.get()));
moptions.memtable_prefix_bloom_bits, ioptions.bloom_locality,
moptions.memtable_prefix_bloom_probes, nullptr,
moptions.memtable_prefix_bloom_huge_page_tlb_size,
ioptions.info_log));
}
}

Expand Down Expand Up @@ -97,14 +113,16 @@ bool MemTable::ShouldFlushNow() const {
// if we can still allocate one more block without exceeding the
// over-allocation ratio, then we should not flush.
if (allocated_memory + kArenaBlockSize <
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
moptions_.write_buffer_size +
kArenaBlockSize * kAllowOverAllocationRatio) {
return false;
}

// if user keeps adding entries that exceeds kWriteBufferSize, we need to
// flush earlier even though we still have much available memory left.
if (allocated_memory >
kWriteBufferSize + kArenaBlockSize * kAllowOverAllocationRatio) {
// if user keeps adding entries that exceeds moptions.write_buffer_size,
// we need to flush earlier even though we still have much available
// memory left.
if (allocated_memory > moptions_.write_buffer_size +
kArenaBlockSize * kAllowOverAllocationRatio) {
return true;
}

Expand Down Expand Up @@ -175,12 +193,12 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(
const MemTable& mem, const ReadOptions& options, Arena* arena)
const MemTable& mem, const ReadOptions& read_options, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !options.total_order_seek) {
if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
bloom_ = mem.prefix_bloom_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
Expand Down Expand Up @@ -248,10 +266,10 @@ class MemTableIterator: public Iterator {
void operator=(const MemTableIterator&);
};

Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) {
Iterator* MemTable::NewIterator(const ReadOptions& read_options, Arena* arena) {
assert(arena != nullptr);
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem) MemTableIterator(*this, options, arena);
return new (mem) MemTableIterator(*this, read_options, arena);
}

port::RWMutex* MemTable::GetLock(const Slice& key) {
Expand Down Expand Up @@ -412,7 +430,7 @@ static bool SaveValue(void* arg, const char* entry) {
}

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
MergeContext& merge_context) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
Expand All @@ -437,10 +455,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.status = s;
saver.mem = this;
saver.merge_context = &merge_context;
saver.merge_operator = options.merge_operator.get();
saver.logger = options.info_log.get();
saver.inplace_update_support = options.inplace_update_support;
saver.statistics = options.statistics.get();
saver.merge_operator = ioptions_.merge_operator;
saver.logger = ioptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;
saver.statistics = ioptions_.statistics;
table_->Get(key, &saver, SaveValue);
}

Expand Down Expand Up @@ -512,8 +530,7 @@ void MemTable::Update(SequenceNumber seq,

bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options) {
const Slice& delta) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();

Expand Down Expand Up @@ -548,8 +565,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq,

std::string str_value;
WriteLock wl(GetLock(lkey.user_key()));
auto status = options.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// Value already updated by callback.
assert(new_prev_size <= prev_size);
Expand All @@ -562,12 +579,12 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
memcpy(p, prev_buffer, new_prev_size);
}
}
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
RecordTick(ioptions_.statistics, NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
RecordTick(ioptions_.statistics, NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATE_FAILED) {
Expand Down
34 changes: 28 additions & 6 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/immutable_options.h"
#include "util/arena.h"
#include "util/dynamic_bloom.h"

Expand All @@ -26,6 +27,23 @@ class Mutex;
class MemTableIterator;
class MergeContext;

struct MemTableOptions {
explicit MemTableOptions(const Options& options);
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
uint32_t memtable_prefix_bloom_probes;
size_t memtable_prefix_bloom_huge_page_tlb_size;
bool inplace_update_support;
size_t inplace_update_num_locks;
UpdateStatus (*inplace_callback)(char* existing_value,
uint32_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
size_t max_successive_merges;
bool filter_deletes;
};

class MemTable {
public:
struct KeyComparator : public MemTableRep::KeyComparator {
Expand All @@ -40,7 +58,8 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const Options& options);
const ImmutableCFOptions& ioptions,
const MemTableOptions& moptions);

~MemTable();

Expand Down Expand Up @@ -81,7 +100,7 @@ class MemTable {
// arena: If not null, the arena needs to be used to allocate the Iterator.
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options, Arena* arena);
Iterator* NewIterator(const ReadOptions& read_options, Arena* arena);

// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
Expand All @@ -99,7 +118,7 @@ class MemTable {
// store MergeInProgress in s, and return false.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
MergeContext& merge_context);

// Attempts to update the new_value inplace, else does normal Add
// Pseudocode
Expand All @@ -123,8 +142,7 @@ class MemTable {
// else return false
bool UpdateCallback(SequenceNumber seq,
const Slice& key,
const Slice& delta,
const Options& options);
const Slice& delta);

// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the
Expand Down Expand Up @@ -172,6 +190,9 @@ class MemTable {

const Arena& TEST_GetArena() const { return arena_; }

const ImmutableCFOptions* GetImmutableOptions() const { return &ioptions_; }
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }

private:
// Dynamically check if we can add more incoming entries.
bool ShouldFlushNow() const;
Expand All @@ -181,9 +202,10 @@ class MemTable {
friend class MemTableList;

KeyComparator comparator_;
const ImmutableCFOptions& ioptions_;
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
const size_t kWriteBufferSize;
Arena arena_;
unique_ptr<MemTableRep> table_;

Expand Down
5 changes: 2 additions & 3 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ int MemTableList::size() const {
// Return the most recent value found, if any.
// Operands stores the list of merge operations to apply, so far.
bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext& merge_context,
const Options& options) {
Status* s, MergeContext& merge_context) {
for (auto& memtable : memlist_) {
if (memtable->Get(key, value, s, merge_context, options)) {
if (memtable->Get(key, value, s, merge_context)) {
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MemTableListVersion {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options);
MergeContext& merge_context);

void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list, Arena* arena);
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_);
MemTable* mem = new MemTable(icmp_, ioptions_, MemTableOptions(options_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;
Expand Down
Loading

0 comments on commit 5231146

Please sign in to comment.