Skip to content

Commit

Permalink
(Preliminary) - WBM-Initiated Flushes (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb committed Sep 29, 2022
1 parent 8f8bf6d commit 1be56d4
Show file tree
Hide file tree
Showing 12 changed files with 1,353 additions and 17 deletions.
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
}

Status DBImpl::CloseHelper() {
if (is_registered_for_flush_initiation_rqsts_) {
assert(write_buffer_manager_);
assert(write_buffer_manager_->IsInitiatingFlushes());
write_buffer_manager_->DeregisterFlushInitiator(this);
is_registered_for_flush_initiation_rqsts_ = false;
}

// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
mutex_.Lock();
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ class DBImpl : public DB {
virtual Status LockWAL() override;
virtual Status UnlockWAL() override;

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);

virtual SequenceNumber GetLatestSequenceNumber() const override;

// IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
Expand Down Expand Up @@ -2404,6 +2407,8 @@ class DBImpl : public DB {
WriteBufferManager::UsageState::kNone;
uint64_t wbm_spdb_delayed_write_factor_ =
WriteBufferManager::kNoneDelayedWriteFactor;

bool is_registered_for_flush_initiation_rqsts_ = false;
};

extern Options SanitizeOptions(const std::string& db, const Options& src,
Expand Down
105 changes: 105 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,11 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {

Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
&reason, thread_pri);
if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
reason == FlushReason::kSpeedbWriteBufferManager);
}

if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in
Expand Down Expand Up @@ -2865,6 +2870,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
if (write_buffer_manager_) {
write_buffer_manager_->FlushEnded(reason ==
FlushReason::kSpeedbWriteBufferManager);
}
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
Expand Down Expand Up @@ -3786,4 +3795,100 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) {
return error_handler_.GetBGError();
}

bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
ColumnFamilyData* cfd_to_flush = nullptr;

{
InstrumentedMutexLock l(&mutex_);

if (shutdown_initiated_) {
return false;
}

autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
size_t total_size_to_flush = 0U;
for (const auto& cfd : cfds) {
if (cfd->imm()->NumNotFlushed() > 0) {
total_size_to_flush = min_size_to_flush;
break;
} else {
total_size_to_flush += cfd->mem()->ApproximateMemoryUsage();
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
}
} else {
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;

for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if ((cfd->imm()->NumNotFlushed() != 0) ||
(!cfd->mem()->IsEmpty() &&
(cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}

// // MaybeFlushStatsCF(&cfds);
}

if (cfds.empty()) {
// printf("InitiateMemoryManagerFlushRequest - Nothing To Flush\n");
return false;
}

cfd_to_flush = cfds.front();
}

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager flush started current usage %lu out of %lu",
cfd_to_flush->GetName().c_str(),
cfd_to_flush->write_buffer_mgr()->memory_usage(),
cfd_to_flush->write_buffer_mgr()->buffer_size());

// printf("InitiateMemoryManagerFlushRequest - Flushing %s\n",
// cfd_to_flush->GetName().c_str());

TEST_SYNC_POINT("DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush");

// URQ - When the memory manager picks a cf for flushing, it considers its
// mutable memtable as well. However, here we reuest a flush of only the
// immutable ones => the freed memory by flushing is not the same as the one
// causing this cf to be picked
FlushOptions flush_options;
flush_options.allow_write_stall = true;
flush_options.wait = false;
Status s;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfd_to_flush}, flush_options,
FlushReason::kSpeedbWriteBufferManager);
} else {
s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kSpeedbWriteBufferManager);
}

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager intialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
}

} // namespace ROCKSDB_NAMESPACE
17 changes: 17 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
handles->clear();
}

if (s.ok()) {
auto wbm = db_options.write_buffer_manager.get();
auto db_impl = static_cast<DBImpl*>(*dbptr);

if (wbm && wbm->IsInitiatingFlushes()) {
// Registering regardless of wbm->enabled() since the buffer size may be
// set later making the WBM enabled, but we will not re-register again
// However, notifications will only be received when the wbm is enabled
auto cb = [db_impl](size_t min_size_to_flush) {
return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush);
};
wbm->RegisterFlushInitiator(db_impl, cb);
db_impl->is_registered_for_flush_initiation_rqsts_ = true;
}
}

return s;
}
} // namespace ROCKSDB_NAMESPACE
18 changes: 18 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,11 +1103,26 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
(wbm_spdb_delayed_write_factor_ != new_delayed_write_factor))) {
if (new_usage_state != WriteBufferManager::UsageState::kDelay) {
write_controller_token_.reset();
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Reset WBM Delay Token");
} else if ((wbm_spdb_usage_state_ !=
WriteBufferManager::UsageState::kDelay) ||
(wbm_spdb_delayed_write_factor_ != new_delayed_write_factor)) {
write_controller_token_ =
SetupDelayFromFactor(write_controller_, new_delayed_write_factor);
{
auto wbm_memory_used = write_buffer_manager_->memory_usage();
auto wbm_quota = write_buffer_manager_->buffer_size();
assert(wbm_quota > 0U);
auto wbm_used_percentage = (100 * wbm_memory_used) / wbm_quota;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Delaying writes due to WBM's usage relative to quota "
"which is %" PRIu64 "%%(%" PRIu64 "/%" PRIu64
"). "
"Factor:%" PRIu64 ", Controller-Rate:%" PRIu64 ", ",
wbm_used_percentage, wbm_memory_used, wbm_quota,
new_delayed_write_factor,
write_controller_.delayed_write_rate());
}
}

wbm_spdb_usage_state_ = new_usage_state;
Expand Down Expand Up @@ -1137,6 +1152,9 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall");
} else {
// Initiating a flush to avoid blocking forever
WaitForPendingWrites();
status = HandleWriteBufferManagerFlush(write_context);
WriteBufferManagerStallWrites();
}
}
Expand Down
63 changes: 63 additions & 0 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <chrono>
#include <thread>

#include "db/db_test_util.h"
#include "db/write_thread.h"
#include "port/stack_trace.h"
Expand Down Expand Up @@ -897,12 +900,72 @@ TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossDBs) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

// ===============================================================================================================
class DBWriteBufferManagerFlushTests
: public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerFlushTests()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};

TEST_P(DBWriteBufferManagerFlushTests, WbmFlushesSingleDBSingleCf) {
constexpr size_t kQuota = 100 * 1000;

Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = kQuota; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);

auto allow_delays_and_stalls_ = false;

if (cost_cache_) {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, cache, allow_delays_and_stalls_, true));
} else {
options.write_buffer_manager.reset(new WriteBufferManager(
kQuota, nullptr, allow_delays_and_stalls_, true));
}
auto* wbm = options.write_buffer_manager.get();
size_t flush_step_size =
kQuota / wbm->GetFlushInitiationOptions().max_num_parallel_flushes;

WriteOptions wo;
wo.disableWAL = true;

DestroyAndReopen(options);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush",
"DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::"
"Flushing"}});

// Reach the flush step by writing to two cf-s, no flush
ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));
ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));

TEST_SYNC_POINT(
"DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::Flushing");

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1,
::testing::Bool());

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerFlushTests,
DBWriteBufferManagerFlushTests,
::testing::Values(false));

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
2 changes: 2 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
return "Error Recovery";
case FlushReason::kWalFull:
return "WAL Full";
case FlushReason::kSpeedbWriteBufferManager:
return "Speedb Write Buffer Manager";
default:
return "Invalid";
}
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ enum class FlushReason : int {
// will not be called to avoid many small immutable memtables.
kErrorRecoveryRetryFlush = 0xc,
kWalFull = 0xd,
kSpeedbWriteBufferManager = 0xe,
};

// TODO: In the future, BackgroundErrorReason will only be used to indicate
Expand Down
Loading

0 comments on commit 1be56d4

Please sign in to comment.