Skip to content

Commit

Permalink
Proactive Flushes (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb authored and Yuval-Ariel committed May 4, 2023
1 parent cf427c9 commit 5a53078
Show file tree
Hide file tree
Showing 19 changed files with 1,743 additions and 83 deletions.
18 changes: 17 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,14 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions();

// Protected by DB mutex
void set_queued_for_flush(bool value) { queued_for_flush_ = value; }
void set_queued_for_flush(bool value) {
queued_for_flush_ = value;

if (value) {
++num_queued_for_flush_;
}
}

void set_queued_for_compaction(bool value) { queued_for_compaction_ = value; }
bool queued_for_flush() { return queued_for_flush_; }
bool queued_for_compaction() { return queued_for_compaction_; }
Expand Down Expand Up @@ -538,6 +545,11 @@ class ColumnFamilyData {
// Keep track of whether the mempurge feature was ever used.
void SetMempurgeUsed() { mempurge_used_ = true; }
bool GetMempurgeUsed() { return mempurge_used_; }
uint64_t GetNumQueuedForFlush() const { return num_queued_for_flush_; }

// TODO - Make it a CF option
static constexpr uint64_t kLaggingFlushesThreshold = 10U;
void SetNumTimedQueuedForFlush(uint64_t num) { num_queued_for_flush_ = num; }

// Allocate and return a new epoch number
uint64_t NewEpochNumber() { return next_epoch_number_.fetch_add(1); }
Expand Down Expand Up @@ -657,6 +669,10 @@ class ColumnFamilyData {
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
bool mempurge_used_;

// Used in the WBM's flush initiation heuristics.
// See DBImpl::InitiateMemoryManagerFlushRequest() for more details
uint64_t num_queued_for_flush_ = 0U;

std::atomic<uint64_t> next_epoch_number_;
};

Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,13 @@ Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
}

Status DBImpl::CloseHelper() {
if (is_registered_for_flush_initiation_rqsts_) {
assert(write_buffer_manager_);
assert(write_buffer_manager_->IsInitiatingFlushes());
write_buffer_manager_->DeregisterFlushInitiator(this);
is_registered_for_flush_initiation_rqsts_ = false;
}

// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
mutex_.Lock();
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ class DBImpl : public DB {
virtual Status LockWAL() override;
virtual Status UnlockWAL() override;

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);
bool InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);
bool InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);

virtual SequenceNumber GetLatestSequenceNumber() const override;

// IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
Expand Down Expand Up @@ -2696,6 +2703,8 @@ class DBImpl : public DB {
// thread safe, both read and write need db mutex hold.
SeqnoToTimeMapping seqno_time_mapping_;

bool is_registered_for_flush_initiation_rqsts_ = false;

// Stop write token that is acquired when first LockWAL() is called.
// Destroyed when last UnlockWAL() is called. Controlled by DB mutex.
// See lock_wal_count_
Expand Down
184 changes: 183 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cinttypes>
#include <deque>
#include <limits>

#include "db/builder.h"
#include "db/db_impl/db_impl.h"
Expand Down Expand Up @@ -2929,6 +2930,12 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
}
*reason = bg_flush_args[0].flush_reason_;
if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
*reason == FlushReason::kWriteBufferManagerInitiated);
}

status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
Expand All @@ -2939,7 +2946,6 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
assert(bg_flush_arg.flush_reason_ == bg_flush_args[0].flush_reason_);
}
#endif /* !NDEBUG */
*reason = bg_flush_args[0].flush_reason_;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->UnrefAndTryDelete()) {
Expand Down Expand Up @@ -3024,6 +3030,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
if (write_buffer_manager_) {
write_buffer_manager_->FlushEnded(
reason == FlushReason::kWriteBufferManagerInitiated);
}
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
Expand Down Expand Up @@ -3937,4 +3947,176 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) {
return error_handler_.GetBGError();
}

bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
if (shutdown_initiated_) {
return false;
}

FlushOptions flush_options;
flush_options.allow_write_stall = true;
flush_options.wait = false;

if (immutable_db_options_.atomic_flush) {
return InitiateMemoryManagerFlushRequestAtomicFlush(min_size_to_flush,
flush_options);
} else {
return InitiateMemoryManagerFlushRequestNonAtomicFlush(min_size_to_flush,
flush_options);
}
}

bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush);

autovector<ColumnFamilyData*> cfds;
{
InstrumentedMutexLock lock(&mutex_);

SelectColumnFamiliesForAtomicFlush(&cfds);
if (cfds.empty()) {
return false;
}

// min_size_to_flush may be 0.
// Since proactive flushes are active only once recovery is complete =>
// SelectColumnFamiliesForAtomicFlush() will keep cf-s in cfds collection
// only if they have a non-empty mutable memtable or any immutable memtable
// => skip the checks and just flush the selected cf-s.
if (min_size_to_flush > 0) {
size_t total_size_to_flush = 0U;
for (const auto& cfd : cfds) {
// Once at least one CF has immutable memtables, we will flush
if (cfd->imm()->NumNotFlushed() > 0) {
// Guarantee a atomic flush will occur
total_size_to_flush = min_size_to_flush;
break;
} else if (cfd->mem()->IsEmpty() == false) {
total_size_to_flush += cfd->mem()->ApproximateMemoryUsage();
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
}
}
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"write buffer manager initiated Atomic flush started current "
"usage %lu out of %lu",
cfds.front()->write_buffer_mgr()->memory_usage(),
cfds.front()->write_buffer_mgr()->buffer_size());

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush::BeforeFlush");
auto s = AtomicFlushMemTables(
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"write buffer manager initiated Atomic flush finished, status: %s",
s.ToString().c_str());
return s.ok();
}

bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush == false);

// Pick the "oldest" CF that meets one of the following:
// 1. Has at least one IMMUTABLE memtable (=> already has a memtable that
// should be flushed); Or
// 2. Has a MUTABLE memtable > min size to flush
//
// However, care must be taken to avoid starving a CF which has data to flush
// (=> and associated WAL) but, to which there is not much writing. So, in
// case we find such a CF that is lagging enough in the number of flushes it
// has undergone, relative to the cf picked originally, we will pick it
// instead, regardless of its mutable memtable size.

// The CF picked based on min min_size_to_flush
ColumnFamilyData* orig_cfd_to_flush = nullptr;
// The cf to actually flush (possibly == orig_cfd_to_flush)
ColumnFamilyData* cfd_to_flush = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;

{
InstrumentedMutexLock lock(&mutex_);

// First pick the oldest CF with data to flush that meets
// the min_size_to_flush condition
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if ((cfd->imm()->NumNotFlushed() != 0) ||
((cfd->mem()->IsEmpty() == false) &&
(cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) {
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_to_flush == nullptr || seq < seq_num_for_cf_picked) {
cfd_to_flush = cfd;
seq_num_for_cf_picked = seq;
}
}
}

if (cfd_to_flush == nullptr) {
return false;
}

orig_cfd_to_flush = cfd_to_flush;

// A CF was picked. Now see if it should be replaced with a lagging CF
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (cfd == orig_cfd_to_flush) {
continue;
}

if ((cfd->imm()->NumNotFlushed() != 0) ||
(cfd->mem()->IsEmpty() == false)) {
// The first lagging CF is picked. There may be another lagging CF that
// is older, however, that will be fixed the next time we evaluate.
if (cfd->GetNumQueuedForFlush() +
ColumnFamilyData::kLaggingFlushesThreshold <
orig_cfd_to_flush->GetNumQueuedForFlush()) {
// Fix its counter so it is considered lagging again only when
// it is indeed lagging behind
cfd->SetNumTimedQueuedForFlush(
orig_cfd_to_flush->GetNumQueuedForFlush() - 1);
cfd_to_flush = cfd;
break;
}
}
}

autovector<ColumnFamilyData*> cfds{cfd_to_flush};
MaybeFlushStatsCF(&cfds);
}

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] write buffer manager initiated flush "
"started current "
"usage %lu out of %lu, min-size:%lu, seq:%lu, "
"num-flushes:%lu, orig-cf:%s num-flushes:%lu",
cfd_to_flush->GetName().c_str(),
cfd_to_flush->write_buffer_mgr()->memory_usage(),
cfd_to_flush->write_buffer_mgr()->buffer_size(),
min_size_to_flush, seq_num_for_cf_picked,
cfd_to_flush->GetNumQueuedForFlush(),
orig_cfd_to_flush->GetName().c_str(),
orig_cfd_to_flush->GetNumQueuedForFlush());

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush");
auto s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kWriteBufferManagerInitiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager initialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
}

} // namespace ROCKSDB_NAMESPACE
17 changes: 17 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
delete impl;
*dbptr = nullptr;
}

if (s.ok()) {
auto wbm = db_options.write_buffer_manager.get();
auto db_impl = static_cast<DBImpl*>(*dbptr);

if (wbm && wbm->IsInitiatingFlushes()) {
// Registering regardless of wbm->enabled() since the buffer size may be
// set later making the WBM enabled, but we will not re-register again
// However, notifications will only be received when the wbm is enabled
auto cb = [db_impl](size_t min_size_to_flush) {
return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush);
};
wbm->RegisterFlushInitiator(db_impl, cb);
db_impl->is_registered_for_flush_initiation_rqsts_ = true;
}
}

return s;
}
} // namespace ROCKSDB_NAMESPACE
7 changes: 7 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,9 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::WriteBufferManagerStallWrites() {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Write-Buffer-Manager Stalls Writes");

mutex_.AssertHeld();
// First block future writer threads who want to add themselves to the queue
// of WriteThread.
Expand All @@ -1915,7 +1918,11 @@ void DBImpl::WriteBufferManagerStallWrites() {
write_buffer_manager_->BeginWriteStall(wbm_stall_.get());
wbm_stall_->Block();

ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Write-Buffer-Manager Stall Writes END");

mutex_.Lock();

// Stall has ended. Signal writer threads so that they can add
// themselves to the WriteThread queue for writes.
write_thread_.EndWriteStall();
Expand Down
20 changes: 17 additions & 3 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ class DBTestSharedWriteBufferAcrossCFs
};

TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// When using the old interface (configuring options.db_write_buffer_size
// rather than creating a WBM and setting options.write_buffer_manager, the
// WBM is created automatically by rocksdb and initiate_flushes is set to true
// (the default)). This test fails in that case.
if (use_old_interface_) {
return;
}

Options options = CurrentOptions();
options.arena_block_size = 4096;
auto flush_listener = std::make_shared<FlushCounterListener>();
Expand Down Expand Up @@ -333,9 +341,13 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
if (use_old_interface_) {
options.db_write_buffer_size = 120000; // this is the real limit
} else if (!cost_cache_) {
options.write_buffer_manager.reset(new WriteBufferManager(114285));
options.write_buffer_manager.reset(
new WriteBufferManager(114285, {}, WriteBufferManager::kDfltAllowStall,
false /* initiate_flushes */));
} else {
options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
options.write_buffer_manager.reset(new WriteBufferManager(
114285, cache, WriteBufferManager::kDfltAllowStall,
false /* initiate_flushes */));
}
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
Expand Down Expand Up @@ -514,7 +526,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
options.write_buffer_size = 500000; // this is never hit
// Use a write buffer total size so that the soft limit is about
// 105000.
options.write_buffer_manager.reset(new WriteBufferManager(120000));
options.write_buffer_manager.reset(new WriteBufferManager(
120000, {} /* cache */, WriteBufferManager::kDfltAllowStall,
false /* initiate_flushes */));
CreateAndReopenWithCF({"cf1", "cf2"}, options);

ASSERT_OK(DestroyDB(dbname2, options));
Expand Down
Loading

0 comments on commit 5a53078

Please sign in to comment.