Skip to content

Commit

Permalink
(Preliminary) - WBM-Initiated Flushes (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb committed Nov 27, 2022
1 parent 55f19c6 commit c573676
Show file tree
Hide file tree
Showing 12 changed files with 1,470 additions and 40 deletions.
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
}

Status DBImpl::CloseHelper() {
if (is_registered_for_flush_initiation_rqsts_) {
assert(write_buffer_manager_);
assert(write_buffer_manager_->IsInitiatingFlushes());
write_buffer_manager_->DeregisterFlushInitiator(this);
is_registered_for_flush_initiation_rqsts_ = false;
}

// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
mutex_.Lock();
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ class DBImpl : public DB {
virtual Status LockWAL() override;
virtual Status UnlockWAL() override;

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);

virtual SequenceNumber GetLatestSequenceNumber() const override;

// IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
Expand Down Expand Up @@ -2393,6 +2396,8 @@ class DBImpl : public DB {

// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;

bool is_registered_for_flush_initiation_rqsts_ = false;
};

extern Options SanitizeOptions(const std::string& db, const Options& src,
Expand Down
104 changes: 103 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2779,12 +2779,18 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
}
// HILIK: must move here
*reason = bg_flush_args[0].cfd_->GetFlushReason();
if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
*reason == FlushReason::kSpeedbWriteBufferManager);
}

status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->UnrefAndTryDelete()) {
Expand Down Expand Up @@ -2869,6 +2875,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
assert(num_running_flushes_ > 0);
num_running_flushes_--;
bg_flush_scheduled_--;
if (write_buffer_manager_) {
write_buffer_manager_->FlushEnded(reason ==
FlushReason::kSpeedbWriteBufferManager);
}
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
atomic_flush_install_cv_.SignalAll();
Expand Down Expand Up @@ -3790,4 +3800,96 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) {
return error_handler_.GetBGError();
}

bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
ColumnFamilyData* cfd_to_flush = nullptr;

{
InstrumentedMutexLock l(&mutex_);

if (shutdown_initiated_) {
return false;
}

autovector<ColumnFamilyData*> cfds;
if (immutable_db_options_.atomic_flush) {
SelectColumnFamiliesForAtomicFlush(&cfds);
size_t total_size_to_flush = 0U;
for (const auto& cfd : cfds) {
if (cfd->imm()->NumNotFlushed() > 0) {
total_size_to_flush = min_size_to_flush;
break;
} else {
total_size_to_flush += cfd->mem()->ApproximateMemoryUsage();
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
}
} else {
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;

for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if ((cfd->imm()->NumNotFlushed() != 0) ||
(!cfd->mem()->IsEmpty() &&
(cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}

// // MaybeFlushStatsCF(&cfds);
}

if (cfds.empty()) {
return false;
}

cfd_to_flush = cfds.front();
}

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager flush started current usage %lu out of %lu",
cfd_to_flush->GetName().c_str(),
cfd_to_flush->write_buffer_mgr()->memory_usage(),
cfd_to_flush->write_buffer_mgr()->buffer_size());

TEST_SYNC_POINT("DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush");

// URQ - When the memory manager picks a cf for flushing, it considers its
// mutable memtable as well. However, here we reuest a flush of only the
// immutable ones => the freed memory by flushing is not the same as the one
// causing this cf to be picked
FlushOptions flush_options;
flush_options.allow_write_stall = true;
flush_options.wait = false;
Status s;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfd_to_flush}, flush_options,
FlushReason::kSpeedbWriteBufferManager);
} else {
s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kSpeedbWriteBufferManager);
}

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager initialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
}

} // namespace ROCKSDB_NAMESPACE
17 changes: 17 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
handles->clear();
}

if (s.ok()) {
auto wbm = db_options.write_buffer_manager.get();
auto db_impl = static_cast<DBImpl*>(*dbptr);

if (wbm && wbm->IsInitiatingFlushes()) {
// Registering regardless of wbm->enabled() since the buffer size may be
// set later making the WBM enabled, but we will not re-register again
// However, notifications will only be received when the wbm is enabled
auto cb = [db_impl](size_t min_size_to_flush) {
return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush);
};
wbm->RegisterFlushInitiator(db_impl, cb);
db_impl->is_registered_for_flush_initiation_rqsts_ = true;
}
}

return s;
}
} // namespace ROCKSDB_NAMESPACE
14 changes: 14 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall");
} else {
// Initiating a flush to avoid blocking forever
if (num_running_flushes_ == 0) {
WaitForPendingWrites();
status = HandleWriteBufferManagerFlush(write_context);
}
WriteBufferManagerStallWrites();
}
}
Expand Down Expand Up @@ -1709,6 +1714,10 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::WriteBufferManagerStallWrites() {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Write-Buffer-Manager Stalls Writes");

mutex_.AssertHeld();
// First block future writer threads who want to add themselves to the queue
// of WriteThread.
Expand All @@ -1723,7 +1732,12 @@ void DBImpl::WriteBufferManagerStallWrites() {
write_buffer_manager_->BeginWriteStall(wbm_stall_.get());
wbm_stall_->Block();

ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Write-Buffer-Manager Stall Writes END");

mutex_.Lock();

// Stall has ended. Signal writer threads so that they can add
// themselves to the WriteThread queue for writes.
write_thread_.EndWriteStall();
Expand Down
85 changes: 78 additions & 7 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <chrono>
#include <thread>

#include "db/db_test_util.h"
#include "db/write_thread.h"
#include "port/stack_trace.h"

namespace ROCKSDB_NAMESPACE {

class DBWriteBufferManagerTest : public DBTestBase,
public testing::WithParamInterface<bool> {
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerTest()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};

Expand All @@ -27,7 +32,6 @@ TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -70,7 +74,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -197,7 +200,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -314,7 +316,6 @@ TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -456,7 +457,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -618,7 +618,6 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
options.write_buffer_size = 500000; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

if (cost_cache_) {
options.write_buffer_manager.reset(
Expand Down Expand Up @@ -780,9 +779,81 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

class DBWriteBufferManagerTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerTest1()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};
// ===============================================================================================================
class DBWriteBufferManagerFlushTests
: public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
DBWriteBufferManagerFlushTests()
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}

void SetUp() override { cost_cache_ = GetParam(); }
bool cost_cache_;
};

TEST_P(DBWriteBufferManagerFlushTests, WbmFlushesSingleDBSingleCf) {
constexpr size_t kQuota = 100 * 1000;

Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = kQuota; // this is never hit
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);

auto allow_stall_ = false;

if (cost_cache_) {
options.write_buffer_manager.reset(
new WriteBufferManager(kQuota, cache, allow_stall_, true));
} else {
options.write_buffer_manager.reset(new WriteBufferManager(
kQuota, nullptr, allow_stall_, true));
}
auto* wbm = options.write_buffer_manager.get();
size_t flush_step_size =
kQuota / wbm->GetFlushInitiationOptions().max_num_parallel_flushes;

WriteOptions wo;
wo.disableWAL = true;

DestroyAndReopen(options);

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush",
"DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::"
"Flushing"}});

// Reach the flush step by writing to two cf-s, no flush
ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));
ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));

TEST_SYNC_POINT(
"DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::Flushing");

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1,
::testing::Bool());

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerFlushTests,
DBWriteBufferManagerFlushTests,
::testing::Values(false));

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
2 changes: 2 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
return "Error Recovery";
case FlushReason::kWalFull:
return "WAL Full";
case FlushReason::kSpeedbWriteBufferManager:
return "Speedb Write Buffer Manager";
default:
return "Invalid";
}
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ enum class FlushReason : int {
// will not be called to avoid many small immutable memtables.
kErrorRecoveryRetryFlush = 0xc,
kWalFull = 0xd,
kSpeedbWriteBufferManager = 0xe,
};

// TODO: In the future, BackgroundErrorReason will only be used to indicate
Expand Down
Loading

0 comments on commit c573676

Please sign in to comment.