Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pending memtable should be updated with the current mutable cf option… #73

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,10 @@ ColumnFamilyData::ColumnFamilyData(
}

RecalculateWriteStallConditions(mutable_cf_options_);
switch_memtable_thread_ =
std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this);
if (ioptions_.memtable_factory->IsPrepareMemtableCreationSupported()) {
switch_memtable_thread_ =
std::thread(&ColumnFamilyData::PrepareSwitchMemTable, this);
}
}

void ColumnFamilyData::PrepareSwitchMemTable() {
Expand All @@ -630,26 +632,27 @@ void ColumnFamilyData::PrepareSwitchMemTable() {
switch_memtable_thread_cv_.wait(lck);
}
}

// Construct new memtable with an empty initial sequence
switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0, true),
switch_mem_.store(ConstructNewMemtable(mutable_cf_options_, 0, false),
std::memory_order_release);
}
}

// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_.load(std::memory_order_relaxed) == 0);
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();
if (ioptions_.memtable_factory->IsPrepareMemtableCreationSupported()) {
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
terminate_switch_memtable_ = true;
}
switch_memtable_thread_cv_.notify_one();
switch_memtable_thread_.join();

const MemTable* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
const MemTable* memtable = switch_mem_.exchange(nullptr);
if (memtable != nullptr) {
delete memtable;
}
}

// remove from linked list
Expand Down Expand Up @@ -1107,17 +1110,17 @@ MemTable* ColumnFamilyData::GetSwitchMemtable(SequenceNumber sn) {
// No point in suspending, just construct the memtable here
switch_mem = ConstructNewMemtable(mutable_cf_options_, sn, false);
} else {
switch_mem->Activate(sn);
switch_mem->Activate(sn, mutable_cf_options_);
}

return switch_mem;
}

MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq,
bool pending) {
bool active) {
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_manager_, earliest_seq, id_, pending);
write_buffer_manager_, earliest_seq, id_, active);
}

void ColumnFamilyData::CreateNewMemtable(
Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class ColumnFamilyData {
// See Memtable constructor for explanation of earliest_seq param.
MemTable* ConstructNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq,
bool pending = false);
bool active = true);
void CreateNewMemtable(const MutableCFOptions& mutable_cf_options,
SequenceNumber earliest_seq);

Expand Down
94 changes: 55 additions & 39 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,46 @@ namespace ROCKSDB_NAMESPACE {
ImmutableMemTableOptions::ImmutableMemTableOptions(
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options)
: arena_block_size(mutable_cf_options.arena_block_size),
memtable_prefix_bloom_bits(
static_cast<uint32_t>(
static_cast<double>(mutable_cf_options.write_buffer_size) *
mutable_cf_options.memtable_prefix_bloom_size_ratio) *
8u),
memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
memtable_whole_key_filtering(
mutable_cf_options.memtable_whole_key_filtering),
:
inplace_update_support(ioptions.inplace_update_support),
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
inplace_callback(ioptions.inplace_callback),
max_successive_merges(mutable_cf_options.max_successive_merges),
statistics(ioptions.stats),
merge_operator(ioptions.merge_operator.get()),
info_log(ioptions.logger),
allow_data_in_errors(ioptions.allow_data_in_errors) {}
allow_data_in_errors(ioptions.allow_data_in_errors) {
SetMutableOptions(mutable_cf_options);
}

void ImmutableMemTableOptions::SetMutableOptions(const MutableCFOptions& mutable_cf_options)
{
arena_block_size = mutable_cf_options.arena_block_size;
memtable_prefix_bloom_bits = (static_cast<uint32_t>(
static_cast<double>(mutable_cf_options.write_buffer_size) *
mutable_cf_options.memtable_prefix_bloom_size_ratio) *
8u);
memtable_huge_page_size = mutable_cf_options.memtable_huge_page_size;
memtable_whole_key_filtering =
mutable_cf_options.memtable_whole_key_filtering;
inplace_update_num_locks = mutable_cf_options.inplace_update_num_locks;
max_successive_merges = mutable_cf_options.max_successive_merges;
}

MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber latest_seq, uint32_t column_family_id,
bool pending)
bool active)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
mem_tracker_(write_buffer_manager, pending),
arena_block_size_(mutable_cf_options.arena_block_size),
mem_tracker_(write_buffer_manager,
(write_buffer_manager != nullptr &&
(write_buffer_manager->enabled() ||
write_buffer_manager->cost_to_cache()))
? active
: false),
arena_(moptions_.arena_block_size,
(write_buffer_manager != nullptr &&
(write_buffer_manager->enabled() ||
Expand All @@ -98,6 +109,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
Expand All @@ -111,18 +124,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
atomic_flush_seqno_(kMaxSequenceNumber),
approximate_memory_usage_(0) {
SetInitialSeq(latest_seq);
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());

// use bloom_filter_ for both whole key and prefix bloom filter
if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
moptions_.memtable_prefix_bloom_bits > 0) {
bloom_filter_.reset(
new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
6 /* hard coded 6 probes */,
moptions_.memtable_huge_page_size, ioptions.logger));
if (active) {
Activate(latest_seq, mutable_cf_options);
}
}

Expand All @@ -131,15 +134,28 @@ MemTable::~MemTable() {
assert(refs_ == 0);
}

void MemTable::SetInitialSeq(SequenceNumber sn) {


void MemTable::Activate(SequenceNumber sn, const MutableCFOptions& mutable_cf_options) {
assert(!active_);
active_ = true;
earliest_seqno_ = sn;
creation_seq_ = sn;
}

void MemTable::Activate(SequenceNumber sn) {
SetInitialSeq(sn);
moptions_.SetMutableOptions(mutable_cf_options);
arena_block_size_ = OptimizeBlockSize(moptions_.arena_block_size);
arena_.Activate();

write_buffer_size_ = mutable_cf_options.write_buffer_size;
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
// use bloom_filter_ for both whole key and prefix bloom filter
if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
moptions_.memtable_prefix_bloom_bits > 0) {
bloom_filter_.reset(
new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
6 /* hard coded 6 probes */,
moptions_.memtable_huge_page_size, moptions_.info_log));
}
}

size_t MemTable::ApproximateMemoryUsage() {
Expand Down Expand Up @@ -167,7 +183,7 @@ bool MemTable::ShouldFlushNow() {
// buffer size. Thus we have to decide if we should over-allocate or
// under-allocate.
// This constant variable can be interpreted as: if we still have more than
// "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
// "kAllowOverAllocationRatio * arena_block_size_" space left, we'd try to over
// allocate one more block.
const double kAllowOverAllocationRatio = 0.6;

Expand All @@ -181,15 +197,15 @@ bool MemTable::ShouldFlushNow() {

// if we can still allocate one more block without exceeding the
// over-allocation ratio, then we should not flush.
if (allocated_memory + kArenaBlockSize <
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
if (allocated_memory + arena_block_size_ <
write_buffer_size + arena_block_size_ * kAllowOverAllocationRatio) {
return false;
}

// if user keeps adding entries that exceeds write_buffer_size, we need to
// flush earlier even though we still have much available memory left.
if (allocated_memory >
write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
write_buffer_size + arena_block_size_ * kAllowOverAllocationRatio) {
return true;
}

Expand All @@ -208,7 +224,7 @@ bool MemTable::ShouldFlushNow() {
// bigger than AllocatedAndUnused()?
//
// The answer is: if the entry size is also bigger than 0.25 *
// kArenaBlockSize, a dedicated block will be allocated for it; otherwise
// arena_block_size_, a dedicated block will be allocated for it; otherwise
// arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
// and regular block. In either case, we *overly* over-allocated.
//
Expand All @@ -218,7 +234,7 @@ bool MemTable::ShouldFlushNow() {
// NOTE: the average percentage of waste space of this approach can be counted
// as: "arena block size * 0.25 / write buffer size". User who specify a small
// write buffer size and/or big arena block size may suffer.
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
return arena_.AllocatedAndUnused() < arena_block_size_ / 4;
}

void MemTable::UpdateFlushState() {
Expand Down
13 changes: 9 additions & 4 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SystemClock;
struct ImmutableMemTableOptions {
explicit ImmutableMemTableOptions(const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options);
void SetMutableOptions(const MutableCFOptions& mutable_cf_options);
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
size_t memtable_huge_page_size;
Expand Down Expand Up @@ -108,7 +109,7 @@ class MemTable {
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq, uint32_t column_family_id,
bool pending = false);
bool active = true);
// No copying allowed
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
Expand Down Expand Up @@ -410,7 +411,7 @@ class MemTable {
// Sets the initial sequence number for lazy initialization of the memtable
// and activate mem_tracker_ if needed
// NOTE: should only be called once before any other operation on the memtable
void Activate(SequenceNumber sn);
void Activate(SequenceNumber sn, const MutableCFOptions& mutable_cf_options);

// Returns the next active logfile number when this memtable is about to
// be flushed to storage
Expand Down Expand Up @@ -488,6 +489,8 @@ class MemTable {

uint64_t GetID() const { return id_; }

void UpdateMutableOptions(const MutableCFOptions& mutable_cf_options);

void SetFlushCompleted(bool completed) { flush_completed_ = completed; }

uint64_t GetFileNumber() const { return file_number_; }
Expand Down Expand Up @@ -519,9 +522,9 @@ class MemTable {
friend class MemTableList;

KeyComparator comparator_;
const ImmutableMemTableOptions moptions_;
ImmutableMemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
ayulas marked this conversation as resolved.
Show resolved Hide resolved
size_t arena_block_size_;
AllocTracker mem_tracker_;
ConcurrentArena arena_;
std::unique_ptr<MemTableRep> table_;
Expand Down Expand Up @@ -593,6 +596,8 @@ class MemTable {
// Gets refreshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_;

bool active_ = false;

#ifndef ROCKSDB_LITE
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
Expand Down
6 changes: 4 additions & 2 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@ class MemTableRepFactory : public Customizable {
// false when if the <key,seq> already exists.
// Default: false
virtual bool CanHandleDuplicatedKey() const { return false; }
};

virtual bool IsPrepareMemtableCreationSupported() const { return false; }
};

// This uses a skip list to store keys. It is the default.
//
Expand Down Expand Up @@ -351,7 +353,7 @@ class SkipListFactory : public MemTableRepFactory {
bool IsInsertConcurrentlySupported() const override { return true; }

bool CanHandleDuplicatedKey() const override { return true; }

private:
size_t lookahead_;
};
Expand Down