diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index d8f682ac28..f341ac279d 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -513,6 +513,13 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
 }
 
 Status DBImpl::CloseHelper() {
+  if (is_registered_for_flush_initiation_rqsts_) {
+    assert(write_buffer_manager_);
+    assert(write_buffer_manager_->IsInitiatingFlushes());
+    write_buffer_manager_->DeregisterFlushInitiator(this);
+    is_registered_for_flush_initiation_rqsts_ = false;
+  }
+
   // Guarantee that there is no background error recovery in progress before
   // continuing with the shutdown
   mutex_.Lock();
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index 88640bf7b0..b14a0feeb7 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -363,6 +363,9 @@ class DBImpl : public DB {
   virtual Status LockWAL() override;
   virtual Status UnlockWAL() override;
 
+  // flush initiated by the write buffer manager to free some space
+  bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);
+
   virtual SequenceNumber GetLatestSequenceNumber() const override;
 
   // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
@@ -2404,6 +2407,8 @@ class DBImpl : public DB {
       WriteBufferManager::UsageState::kNone;
   uint64_t wbm_spdb_delayed_write_factor_ =
       WriteBufferManager::kNoneDelayedWriteFactor;
+
+  bool is_registered_for_flush_initiation_rqsts_ = false;
 };
 
 extern Options SanitizeOptions(const std::string& db, const Options& src,
diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index fe43d3c629..67002f453f 100644
--- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -2816,6 +2816,11 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
 
     Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
                                &reason, thread_pri);
+    if (write_buffer_manager_) {
+      write_buffer_manager_->FlushStarted(
+          reason == FlushReason::kSpeedbWriteBufferManager);
+    }
+
     if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
         reason != FlushReason::kErrorRecovery) {
       // Wait a little bit before retrying background flush in
@@ -2865,6 +2870,10 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
     assert(num_running_flushes_ > 0);
     num_running_flushes_--;
     bg_flush_scheduled_--;
+    if (write_buffer_manager_) {
+      write_buffer_manager_->FlushEnded(reason ==
+                                        FlushReason::kSpeedbWriteBufferManager);
+    }
     // See if there's more work to be done
     MaybeScheduleFlushOrCompaction();
     atomic_flush_install_cv_.SignalAll();
@@ -3786,4 +3795,100 @@ Status DBImpl::WaitForCompact(bool wait_unscheduled) {
   return error_handler_.GetBGError();
 }
 
+bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
+  ColumnFamilyData* cfd_to_flush = nullptr;
+
+  {
+    InstrumentedMutexLock l(&mutex_);
+
+    if (shutdown_initiated_) {
+      return false;
+    }
+
+    autovector<ColumnFamilyData*> cfds;
+    if (immutable_db_options_.atomic_flush) {
+      SelectColumnFamiliesForAtomicFlush(&cfds);
+      size_t total_size_to_flush = 0U;
+      for (const auto& cfd : cfds) {
+        if (cfd->imm()->NumNotFlushed() > 0) {
+          total_size_to_flush = min_size_to_flush;
+          break;
+        } else {
+          total_size_to_flush += cfd->mem()->ApproximateMemoryUsage();
+        }
+      }
+      if (total_size_to_flush < min_size_to_flush) {
+        return false;
+      }
+    } else {
+      ColumnFamilyData* cfd_picked = nullptr;
+      SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
+
+      for (auto cfd : *versions_->GetColumnFamilySet()) {
+        if (cfd->IsDropped()) {
+          continue;
+        }
+        if ((cfd->imm()->NumNotFlushed() != 0) ||
+            (!cfd->mem()->IsEmpty() &&
+             (cfd->mem()->ApproximateMemoryUsage() >= min_size_to_flush))) {
+          // We only consider active mem table, hoping immutable memtable is
+          // already in the process of flushing.
+          uint64_t seq = cfd->mem()->GetCreationSeq();
+          if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
+            cfd_picked = cfd;
+            seq_num_for_cf_picked = seq;
+          }
+        }
+      }
+      if (cfd_picked != nullptr) {
+        cfds.push_back(cfd_picked);
+      }
+
+      // // MaybeFlushStatsCF(&cfds);
+    }
+
+    if (cfds.empty()) {
+      // printf("InitiateMemoryManagerFlushRequest - Nothing To Flush\n");
+      return false;
+    }
+
+    cfd_to_flush = cfds.front();
+  }
+
+  ROCKS_LOG_INFO(
+      immutable_db_options_.info_log,
+      "[%s] write buffer manager flush started current usage %lu out of %lu",
+      cfd_to_flush->GetName().c_str(),
+      cfd_to_flush->write_buffer_mgr()->memory_usage(),
+      cfd_to_flush->write_buffer_mgr()->buffer_size());
+
+  // printf("InitiateMemoryManagerFlushRequest - Flushing %s\n",
+  // cfd_to_flush->GetName().c_str());
+
+  TEST_SYNC_POINT("DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush");
+
+  // URQ - When the memory manager picks a cf for flushing, it considers its
+  // mutable memtable as well. However, here we reuest a flush of only the
+  // immutable ones => the freed memory by flushing is not the same as the one
+  // causing this cf to be picked
+  FlushOptions flush_options;
+  flush_options.allow_write_stall = true;
+  flush_options.wait = false;
+  Status s;
+  if (immutable_db_options_.atomic_flush) {
+    s = AtomicFlushMemTables({cfd_to_flush}, flush_options,
+                             FlushReason::kSpeedbWriteBufferManager);
+  } else {
+    s = FlushMemTable(cfd_to_flush, flush_options,
+                      FlushReason::kSpeedbWriteBufferManager);
+  }
+
+  ROCKS_LOG_INFO(
+      immutable_db_options_.info_log,
+      "[%s] write buffer manager intialize flush finished, status: %s\n",
+      cfd_to_flush->GetName().c_str(), s.ToString().c_str());
+
+  return s.ok();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index 64896cba47..8f835a781d 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -1985,6 +1985,23 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
     }
     handles->clear();
   }
+
+  if (s.ok()) {
+    auto wbm = db_options.write_buffer_manager.get();
+    auto db_impl = static_cast<DBImpl*>(*dbptr);
+
+    if (wbm && wbm->IsInitiatingFlushes()) {
+      // Registering regardless of wbm->enabled() since the buffer size may be
+      // set later making the WBM enabled, but we will not re-register again
+      // However, notifications will only be received when the wbm is enabled
+      auto cb = [db_impl](size_t min_size_to_flush) {
+        return db_impl->InitiateMemoryManagerFlushRequest(min_size_to_flush);
+      };
+      wbm->RegisterFlushInitiator(db_impl, cb);
+      db_impl->is_registered_for_flush_initiation_rqsts_ = true;
+    }
+  }
+
   return s;
 }
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index ca30b4e039..ffb7766ca2 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -1103,11 +1103,26 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
             (wbm_spdb_delayed_write_factor_ != new_delayed_write_factor))) {
       if (new_usage_state != WriteBufferManager::UsageState::kDelay) {
         write_controller_token_.reset();
+        ROCKS_LOG_INFO(immutable_db_options_.info_log, "Reset WBM Delay Token");
       } else if ((wbm_spdb_usage_state_ !=
                   WriteBufferManager::UsageState::kDelay) ||
                  (wbm_spdb_delayed_write_factor_ != new_delayed_write_factor)) {
         write_controller_token_ =
             SetupDelayFromFactor(write_controller_, new_delayed_write_factor);
+        {
+          auto wbm_memory_used = write_buffer_manager_->memory_usage();
+          auto wbm_quota = write_buffer_manager_->buffer_size();
+          assert(wbm_quota > 0U);
+          auto wbm_used_percentage = (100 * wbm_memory_used) / wbm_quota;
+          ROCKS_LOG_INFO(immutable_db_options_.info_log,
+                         "Delaying writes due to WBM's usage relative to quota "
+                         "which is %" PRIu64 "%%(%" PRIu64 "/%" PRIu64
+                         "). "
+                         "Factor:%" PRIu64 ", Controller-Rate:%" PRIu64 ", ",
+                         wbm_used_percentage, wbm_memory_used, wbm_quota,
+                         new_delayed_write_factor,
+                         write_controller_.delayed_write_rate());
+        }
       }
 
       wbm_spdb_usage_state_ = new_usage_state;
@@ -1137,6 +1152,9 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
     if (write_options.no_slowdown) {
       status = Status::Incomplete("Write stall");
     } else {
+      // Initiating a flush to avoid blocking forever
+      WaitForPendingWrites();
+      status = HandleWriteBufferManagerFlush(write_context);
       WriteBufferManagerStallWrites();
     }
   }
diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc
index af723d193b..49cfc827e5 100644
--- a/db/db_write_buffer_manager_test.cc
+++ b/db/db_write_buffer_manager_test.cc
@@ -7,6 +7,9 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
+#include <chrono>
+#include <thread>
+
 #include "db/db_test_util.h"
 #include "db/write_thread.h"
 #include "port/stack_trace.h"
@@ -897,12 +900,72 @@ TEST_P(DBWriteBufferManagerTest1, WbmDelaySharedWriteBufferAcrossDBs) {
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
+// ===============================================================================================================
+class DBWriteBufferManagerFlushTests
+    : public DBTestBase,
+      public ::testing::WithParamInterface<bool> {
+ public:
+  DBWriteBufferManagerFlushTests()
+      : DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}
+
+  void SetUp() override { cost_cache_ = GetParam(); }
+  bool cost_cache_;
+};
+
+TEST_P(DBWriteBufferManagerFlushTests, WbmFlushesSingleDBSingleCf) {
+  constexpr size_t kQuota = 100 * 1000;
+
+  Options options = CurrentOptions();
+  options.arena_block_size = 4096;
+  options.write_buffer_size = kQuota;  // this is never hit
+  std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
+  ASSERT_LT(cache->GetUsage(), 256 * 1024);
+
+  auto allow_delays_and_stalls_ = false;
+
+  if (cost_cache_) {
+    options.write_buffer_manager.reset(
+        new WriteBufferManager(kQuota, cache, allow_delays_and_stalls_, true));
+  } else {
+    options.write_buffer_manager.reset(new WriteBufferManager(
+        kQuota, nullptr, allow_delays_and_stalls_, true));
+  }
+  auto* wbm = options.write_buffer_manager.get();
+  size_t flush_step_size =
+      kQuota / wbm->GetFlushInitiationOptions().max_num_parallel_flushes;
+
+  WriteOptions wo;
+  wo.disableWAL = true;
+
+  DestroyAndReopen(options);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::InitiateMemoryManagerFlushRequest::BeforeFlush",
+        "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::"
+        "Flushing"}});
+
+  // Reach the flush step by writing to two cf-s, no flush
+  ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));
+  ASSERT_OK(Put(Key(1), DummyString(flush_step_size / 2), wo));
+
+  TEST_SYNC_POINT(
+      "DBWriteBufferManagerFlushTests::WbmFlushesSingleDBSingleCf::Flushing");
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
                         testing::Bool());
 
 INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest1, DBWriteBufferManagerTest1,
                         ::testing::Bool());
 
+INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerFlushTests,
+                        DBWriteBufferManagerFlushTests,
+                        ::testing::Values(false));
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
diff --git a/db/flush_job.cc b/db/flush_job.cc
index 96d62d7891..199e3f219f 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -77,6 +77,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
       return "Error Recovery";
     case FlushReason::kWalFull:
       return "WAL Full";
+    case FlushReason::kSpeedbWriteBufferManager:
+      return "Speedb Write Buffer Manager";
     default:
       return "Invalid";
   }
diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h
index 5f6f890599..9d88327eba 100644
--- a/include/rocksdb/listener.h
+++ b/include/rocksdb/listener.h
@@ -169,6 +169,7 @@ enum class FlushReason : int {
   // will not be called to avoid many small immutable memtables.
   kErrorRecoveryRetryFlush = 0xc,
   kWalFull = 0xd,
+  kSpeedbWriteBufferManager = 0xe,
 };
 
 // TODO: In the future, BackgroundErrorReason will only be used to indicate
diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h
index e7e3ca3184..63f273fd98 100644
--- a/include/rocksdb/write_buffer_manager.h
+++ b/include/rocksdb/write_buffer_manager.h
@@ -17,11 +17,15 @@
 #include <cstddef>
 #include <list>
 #include <mutex>
+#include <thread>
+#include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "rocksdb/cache.h"
 
 namespace ROCKSDB_NAMESPACE {
+class Options;
 class CacheReservationManager;
 
 // Interface to block and signal DB instances, intended for RocksDB
@@ -46,6 +50,11 @@ class WriteBufferManager final {
   static constexpr uint64_t kMaxDelayedWriteFactor = 200U;
   static constexpr uint64_t kStopDelayedWriteFactor = kMaxDelayedWriteFactor;
 
+  struct FlushInitiationOptions {
+    FlushInitiationOptions() {}
+    size_t max_num_parallel_flushes = 4U;
+  };
+
  public:
   // Parameters:
   // _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
@@ -68,9 +77,11 @@ class WriteBufferManager final {
   //  Stalls: stalling of writes when memory_usage() exceeds buffer_size. It
   //  will wait for flush to complete and
   //   memory usage to drop down.
-  explicit WriteBufferManager(size_t _buffer_size,
-                              std::shared_ptr<Cache> cache = {},
-                              bool allow_delays_and_stalls = true);
+  explicit WriteBufferManager(
+      size_t _buffer_size, std::shared_ptr<Cache> cache = {},
+      bool allow_delays_and_stalls = true, bool initiate_flushes = false,
+      const FlushInitiationOptions& flush_initiation_options =
+          FlushInitiationOptions());
 
   // No copying allowed
   WriteBufferManager(const WriteBufferManager&) = delete;
@@ -132,6 +143,10 @@ class WriteBufferManager final {
     MaybeEndWriteStall();
     if (enabled()) {
       UpdateUsageState(memory_usage(), 0 /* mem_changed_size */, new_size);
+
+      if (initiate_flushes_) {
+        InitFlushInitiationVars(new_size);
+      }
     }
   }
 
@@ -139,7 +154,7 @@ class WriteBufferManager final {
 
   // Should only be called from write thread
   bool ShouldFlush() const {
-    if (enabled()) {
+    if ((initiate_flushes_ == false) && enabled()) {
       if (mutable_memtable_memory_usage() >
           mutable_limit_.load(std::memory_order_relaxed)) {
         return true;
@@ -215,6 +230,11 @@ class WriteBufferManager final {
 
  public:
   bool IsDelayAllowed() const { return allow_delays_and_stalls_; }
+  bool IsInitiatingFlushes() const { return initiate_flushes_; }
+  const FlushInitiationOptions& GetFlushInitiationOptions() const {
+    return flush_initiation_options_;
+  }
+
   std::pair<UsageState, uint64_t> GetUsageStateInfo() const {
     return ParseCodedUsageState(GetCodedUsageState());
   }
@@ -227,9 +247,6 @@ class WriteBufferManager final {
   static constexpr uint64_t kNoneCodedUsageState = 0U;
   static constexpr uint64_t kStopCodedUsageState = kMaxDelayedWriteFactor + 1;
 
-  void UpdateUsageState(size_t new_memory_used, ssize_t mem_changed_size,
-                        size_t quota);
-
   uint64_t CalcNewCodedUsageState(size_t new_memory_used,
                                   ssize_t memory_changed_size, size_t quota,
                                   uint64_t old_coded_usage_state);
@@ -243,6 +260,30 @@ class WriteBufferManager final {
   static std::pair<UsageState, uint64_t> ParseCodedUsageState(
       uint64_t coded_usage_state);
 
+ private:
+  void UpdateUsageState(size_t new_memory_used, ssize_t mem_changed_size,
+                        size_t quota);
+
+ public:
+  using InitiateFlushRequestCb = std::function<bool(size_t min_size_to_flush)>;
+
+  void RegisterFlushInitiator(void* initiator, InitiateFlushRequestCb request);
+  void DeregisterFlushInitiator(void* initiator);
+
+  void FlushStarted(bool wbm_initiated);
+  void FlushEnded(bool wbm_initiated);
+
+ public:
+  size_t TEST_GetNumFlushesToInitiate() const {
+    return num_flushes_to_initiate_;
+  }
+  size_t TEST_GetNumRunningFlushes() const { return num_running_flushes_; }
+  size_t TEST_GetNextCandidateInitiatorIdx() const {
+    return next_candidate_initiator_idx_;
+  }
+
+  void TEST_WakeupFlushInitiationThread();
+
  private:
   std::atomic<size_t> buffer_size_;
   std::atomic<size_t> mutable_limit_;
@@ -267,5 +308,93 @@ class WriteBufferManager final {
   // Return the new memory usage
   size_t ReserveMemWithCache(size_t mem);
   size_t FreeMemWithCache(size_t mem);
+
+ private:
+  struct InitiatorInfo {
+    void* initiator;
+    InitiateFlushRequestCb cb;
+  };
+
+  static constexpr uint64_t kInvalidInitiatorIdx =
+      std::numeric_limits<uint64_t>::max();
+
+ private:
+  void InitFlushInitiationVars(size_t quota);
+  void InitiateFlushesThread();
+  bool InitiateAdditionalFlush();
+  void WakeUpFlushesThread();
+  void TerminateFlushesThread();
+  void RecalcFlushInitiationSize();
+  void ReevaluateNeedForMoreFlushes(size_t curr_memory_used);
+  uint64_t FindInitiator(void* initiator) const;
+
+  // Assumed the lock is held
+  void WakeupFlushInitiationThread() {
+    new_flushes_wakeup_ = true;
+    flushes_wakeup_cv_.notify_one();
+  }
+
+  // This is used outside the flushes_mu_ lock => only
+  // additional_flush_initiation_size_ needs to be atomic
+  bool ShouldInitiateAnotherFlushMemOnly(size_t curr_memory_used) const {
+    return (curr_memory_used >= additional_flush_initiation_size_);
+  }
+
+  // This should be called only unther the flushes_mu_ lock
+  bool ShouldInitiateAnotherFlush(size_t curr_memory_used) const {
+    return (((num_running_flushes_ + num_flushes_to_initiate_) <
+             flush_initiation_options_.max_num_parallel_flushes) &&
+            ShouldInitiateAnotherFlushMemOnly(curr_memory_used));
+  }
+
+  // Assumed flushes_mu_ is locked
+  uint64_t CalcNextCandidateInitiatorIdx() const {
+    // The index is irrelevant when there are no initiators so we might as well
+    // set it to 0
+    return (flush_initiators_.empty() ? 0U
+                                      : (next_candidate_initiator_idx_ + 1) %
+                                            flush_initiators_.size());
+  }
+
+  bool IsInitiatorIdxValid(uint64_t initiator_idx) const {
+    return (initiator_idx != kInvalidInitiatorIdx);
+  }
+
+ private:
+  // Flush Initiation Data Members
+
+  const bool initiate_flushes_ = false;
+  const FlushInitiationOptions flush_initiation_options_ =
+      FlushInitiationOptions();
+
+  std::vector<InitiatorInfo> flush_initiators_;
+  std::atomic<size_t> num_initiators_ = 0U;
+  uint64_t next_candidate_initiator_idx_ = 0U;
+
+  // Consider if this needs to be atomic
+  size_t num_flushes_to_initiate_ = 0U;
+  size_t num_running_flushes_ = 0U;
+  size_t flush_initiation_start_size_ = 0U;
+  size_t additional_flush_step_size_ = 0U;
+  std::atomic<size_t> additional_flush_initiation_size_ = 0U;
+  size_t min_flush_size_ = 0U;
+
+  std::mutex flushes_mu_;
+  std::mutex flushes_initiators_mu_;
+  std::condition_variable flushes_wakeup_cv_;
+  bool new_flushes_wakeup_ = false;
+
+  std::thread flushes_thread_;
+  bool terminate_flushes_thread_ = false;
 };
+
+// This is a convenience utility for users of the WriteBufferManager that
+// wish to use Speedb's WBM flush initiation mechanism.
+// For such users, Speedb recommends effectively disabling the existing
+// mechanisms that flush based on write buffers' configureation (size, number,
+// etc). So, Speedb's recommendation would be to call this function resulting
+// the WBM as the sole automatic initiator of flushes.
+extern void SanitizeOptionsToDisableFlushesBasedOnWriteBufferOptions(
+    Options& options);
+
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc
index 70779fa99c..d23e78f02f 100644
--- a/memtable/write_buffer_manager.cc
+++ b/memtable/write_buffer_manager.cc
@@ -9,18 +9,21 @@
 
 #include "rocksdb/write_buffer_manager.h"
 
+#include <array>
 #include <memory>
 
 #include "cache/cache_entry_roles.h"
 #include "cache/cache_reservation_manager.h"
 #include "db/db_impl/db_impl.h"
 #include "rocksdb/status.h"
+#include "test_util/sync_point.h"
 #include "util/coding.h"
 
 namespace ROCKSDB_NAMESPACE {
-WriteBufferManager::WriteBufferManager(size_t _buffer_size,
-                                       std::shared_ptr<Cache> cache,
-                                       bool allow_delays_and_stalls)
+WriteBufferManager::WriteBufferManager(
+    size_t _buffer_size, std::shared_ptr<Cache> cache,
+    bool allow_delays_and_stalls, bool initiate_flushes,
+    const FlushInitiationOptions& flush_initiation_options)
     : buffer_size_(_buffer_size),
       mutable_limit_(buffer_size_ * 7 / 8),
       memory_used_(0),
@@ -28,7 +31,9 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
       memory_being_freed_(0U),
       cache_res_mgr_(nullptr),
       allow_delays_and_stalls_(allow_delays_and_stalls),
-      stall_active_(false) {
+      stall_active_(false),
+      initiate_flushes_(initiate_flushes),
+      flush_initiation_options_(flush_initiation_options) {
 #ifndef ROCKSDB_LITE
   if (cache) {
     // Memtable's memory usage tends to fluctuate frequently
@@ -41,6 +46,10 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
 #else
   (void)cache;
 #endif  // ROCKSDB_LITE
+
+  if (initiate_flushes_) {
+    InitFlushInitiationVars(buffer_size());
+  }
 }
 
 WriteBufferManager::~WriteBufferManager() {
@@ -48,6 +57,7 @@ WriteBufferManager::~WriteBufferManager() {
   std::unique_lock<std::mutex> lock(mu_);
   assert(queue_.empty());
 #endif
+  TerminateFlushesThread();
 }
 
 std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const {
@@ -71,6 +81,13 @@ void WriteBufferManager::ReserveMem(size_t mem) {
   }
   if (is_enabled) {
     UpdateUsageState(new_memory_used, mem, buffer_size());
+
+    // Checking outside the locks is not reliable, but avoids locking
+    // unnecessarily which is expensive
+    if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) {
+      std::unique_lock<std::mutex> lock(flushes_mu_);
+      ReevaluateNeedForMoreFlushes(new_memory_used);
+    }
   }
 }
 
@@ -153,6 +170,13 @@ void WriteBufferManager::FreeMem(size_t mem) {
 
   if (is_enabled) {
     UpdateUsageState(new_memory_used, -mem, buffer_size());
+
+    // Checking outside the locks is not reliable, but avoids locking
+    // unnecessarily which is expensive
+    if (UNLIKELY(ShouldInitiateAnotherFlushMemOnly(new_memory_used))) {
+      std::unique_lock<std::mutex> lock(flushes_mu_);
+      ReevaluateNeedForMoreFlushes(new_memory_used);
+    }
   }
 }
 
@@ -407,4 +431,234 @@ void WriteBufferManager::UpdateUsageState(size_t new_memory_used,
   }
 }
 
+// ================================================================================================
+void WriteBufferManager::RegisterFlushInitiator(
+    void* initiator, InitiateFlushRequestCb request) {
+  {
+    std::unique_lock<std::mutex> lock(flushes_initiators_mu_);
+    assert(IsInitiatorIdxValid(FindInitiator(initiator)) == false);
+    flush_initiators_.push_back({initiator, request});
+    num_initiators_ = flush_initiators_.size();
+  }
+
+  WakeupFlushInitiationThread();
+}
+
+void WriteBufferManager::DeregisterFlushInitiator(void* initiator) {
+  {
+    std::unique_lock<std::mutex> lock(flushes_initiators_mu_);
+
+    auto initiator_idx = FindInitiator(initiator);
+    assert(IsInitiatorIdxValid(initiator_idx));
+
+    // Move to the next initiator (if there is one) in case the current one
+    // deregisters
+    if (initiator_idx == next_candidate_initiator_idx_) {
+      next_candidate_initiator_idx_ = CalcNextCandidateInitiatorIdx();
+    }
+
+    flush_initiators_.erase(flush_initiators_.begin() + initiator_idx);
+    num_initiators_ = flush_initiators_.size();
+  }
+
+  WakeupFlushInitiationThread();
+}
+
+void WriteBufferManager::InitFlushInitiationVars(size_t quota) {
+  assert(initiate_flushes_);
+
+  {
+    std::unique_lock<std::mutex> lock(flushes_mu_);
+    additional_flush_step_size_ =
+        quota / flush_initiation_options_.max_num_parallel_flushes;
+    flush_initiation_start_size_ = additional_flush_step_size_;
+    // TODO - Update this to a formula. If it depends on the number of initators
+    // => update when that number changes
+    min_flush_size_ = 4 * 1024U * 1024U;  // 4 MB
+    RecalcFlushInitiationSize();
+  }
+
+  if (flushes_thread_.joinable() == false) {
+    flushes_thread_ =
+        std::thread(&WriteBufferManager::InitiateFlushesThread, this);
+  }
+}
+
+void WriteBufferManager::InitiateFlushesThread() {
+  while (true) {
+    // Should return true when the waiting should stop (no spurious wakeups
+    // guaranteed)
+    auto StopWaiting = [this]() {
+      return (new_flushes_wakeup_ &&
+              (terminate_flushes_thread_ || (num_flushes_to_initiate_ > 0U)));
+    };
+
+    std::unique_lock<std::mutex> lock(flushes_mu_);
+    flushes_wakeup_cv_.wait(lock, StopWaiting);
+    new_flushes_wakeup_ = false;
+
+    if (terminate_flushes_thread_) {
+      break;
+    }
+
+    // The code below tries to initiate num_flushes_to_initiate_ flushes by
+    // invoking its registered initiators, and requesting them to initiate a
+    // flush of a certain minimum size. The initiation is done in iterations. An
+    // iteration is an attempt to give evey initiator an opportunity to flush,
+    // in a round-robin ordering. An initiator may or may not be able to
+    // initiate a flush. Reasons for not initiating could be:
+    // - The flush is less than the specified minimum size.
+    // - The initiator is in the process of shutting down or being disposed of.
+    //
+    // The assumption is that in case flush initiation stopped when
+    // num_flushes_to_initiate_ == 0, there will be some future event that will
+    // wake up this thread and initiation attempts will be retried:
+    // - Initiator will be enabled
+    // - A flush in progress will end
+    // - The memory_used() will increase above additional_flush_initiation_size_
+
+    // Two iterations:
+    // 1. Flushes of a min size.
+    // 2. Flushes of any size
+    constexpr size_t kNumIters = 2U;
+    const std::array<size_t, kNumIters> kMinFlushSizes{min_flush_size_, 0U};
+
+    auto iter = 0U;
+    while ((iter < kMinFlushSizes.size()) && (num_flushes_to_initiate_ > 0U)) {
+      auto num_repeated_failures_to_initiate = 0U;
+      while ((num_repeated_failures_to_initiate < num_initiators_) &&
+             (num_flushes_to_initiate_ > 0U)) {
+        bool was_flush_initiated = false;
+        // Unlocking the flushed_mu_ since flushing (via the initiator cb) may
+        // call a WBM service (e.g., ReserveMem()), that, in turn, needs to lock
+        // the same mutex => will get stuck
+        lock.unlock();
+
+        {
+          std::unique_lock<std::mutex> initiators_lock(flushes_initiators_mu_);
+          auto& initiator = flush_initiators_[next_candidate_initiator_idx_];
+          next_candidate_initiator_idx_ = CalcNextCandidateInitiatorIdx();
+          was_flush_initiated = initiator.cb(kMinFlushSizes[iter]);
+        }
+
+        lock.lock();
+        if (was_flush_initiated) {
+          // Not recalculating flush initiation size since the increment &
+          // decrement cancel each other with respect to the recalc
+          assert(num_running_flushes_ <
+                 flush_initiation_options_.max_num_parallel_flushes);
+          ++num_running_flushes_;
+          --num_flushes_to_initiate_;
+          num_repeated_failures_to_initiate = 0U;
+        } else {
+          ++num_repeated_failures_to_initiate;
+        }
+      }
+      ++iter;
+    }
+    TEST_SYNC_POINT_CALLBACK(
+        "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt",
+        &num_flushes_to_initiate_);
+  }
+}
+
+void WriteBufferManager::TerminateFlushesThread() {
+  {
+    std::unique_lock<std::mutex> lock(flushes_mu_);
+    terminate_flushes_thread_ = true;
+    WakeupFlushInitiationThread();
+  }
+
+  if (flushes_thread_.joinable()) {
+    flushes_thread_.join();
+  }
+}
+
+void WriteBufferManager::FlushStarted(bool wbm_initiated) {
+  // num_running_flushes_ is incremented in our thread when initiating flushes
+  // => Already accounted for
+  if (wbm_initiated || !enabled()) {
+    return;
+  }
+
+  std::unique_lock<std::mutex> lock(flushes_mu_);
+
+  ++num_running_flushes_;
+  size_t curr_memory_used = memory_usage();
+  RecalcFlushInitiationSize();
+  ReevaluateNeedForMoreFlushes(curr_memory_used);
+}
+
+void WriteBufferManager::FlushEnded(bool /* wbm_initiated */) {
+  if (!enabled()) {
+    return;
+  }
+
+  std::unique_lock<std::mutex> lock(flushes_mu_);
+
+  assert(num_running_flushes_ > 0U);
+  --num_running_flushes_;
+  size_t curr_memory_used = memory_usage();
+  RecalcFlushInitiationSize();
+  ReevaluateNeedForMoreFlushes(curr_memory_used);
+}
+
+void WriteBufferManager::RecalcFlushInitiationSize() {
+  additional_flush_initiation_size_ =
+      flush_initiation_start_size_ +
+      additional_flush_step_size_ *
+          (num_running_flushes_ + num_flushes_to_initiate_);
+}
+
+void WriteBufferManager::ReevaluateNeedForMoreFlushes(size_t curr_memory_used) {
+  // TODO - Assert flushes_mu_ is held at this point
+  assert(enabled());
+
+  // URQ - I SUGGEST USING HERE THE AMOUNT OF MEMORY STILL NOT MARKED FOR FLUSH
+  // (MUTABLE + IMMUTABLE)
+  if (ShouldInitiateAnotherFlush(curr_memory_used)) {
+    // need to schedule more
+    ++num_flushes_to_initiate_;
+    RecalcFlushInitiationSize();
+    WakeupFlushInitiationThread();
+  }
+}
+
+uint64_t WriteBufferManager::FindInitiator(void* initiator) const {
+  // Assumes lock is held on the flushes_mu_
+
+  auto initiator_idx = kInvalidInitiatorIdx;
+  for (auto i = 0U; i < flush_initiators_.size(); ++i) {
+    if (flush_initiators_[i].initiator == initiator) {
+      initiator_idx = i;
+      break;
+    }
+  }
+
+  return initiator_idx;
+}
+
+void WriteBufferManager::TEST_WakeupFlushInitiationThread() {
+  std::unique_lock<std::mutex> lock(flushes_mu_);
+  WakeupFlushInitiationThread();
+}
+
+void SanitizeOptionsToDisableFlushesBasedOnWriteBufferOptions(
+    Options& options) {
+  if (options.write_buffer_size == 0)
+    options.write_buffer_size = options.db_write_buffer_size / 2;
+  options.max_write_buffer_number =
+      (options.db_write_buffer_size / options.write_buffer_size + 1) * 2;
+  options.min_write_buffer_number_to_merge =
+      options.max_write_buffer_number / 2;
+  int compaction_trigger = options.level0_file_num_compaction_trigger;
+  if (compaction_trigger == 0) {
+    compaction_trigger = options.level0_file_num_compaction_trigger = 4;
+  }
+  // URQ - Why and should it also be part of the options sanitization?
+  // // options.max_background_compactions = options.max_background_flushes * 2;
+  // // options.max_background_jobs =
+  // //   options.max_background_flushes + options.max_background_compactions;
+}
+
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc
index 5487a4acd0..743656833b 100644
--- a/memtable/write_buffer_manager_test.cc
+++ b/memtable/write_buffer_manager_test.cc
@@ -9,7 +9,15 @@
 
 #include "rocksdb/write_buffer_manager.h"
 
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <optional>
+#include <string>
+
 #include "rocksdb/cache.h"
+#include "test_util/sync_point.h"
 #include "test_util/testharness.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -460,10 +468,713 @@ TEST_P(WriteBufferManagerTestWithParams, UsageNotifications) {
                        WriteBufferManager::kNoneDelayedWriteFactor);
 }
 
+// ==========================================================================
+#define CALL_WRAPPER(func) \
+  func;                    \
+  ASSERT_FALSE(HasFailure());
+
+// #1: Quota (size_t). 0 == WBM disabled
+// #2: Cost to cache (Boolean)
+class WriteBufferManagerFlushInitiationTest
+    : public WriteBufferManagerTest,
+      public ::testing::WithParamInterface<std::tuple<size_t, bool, bool>> {
+ public:
+  void SetUp() override {
+    quota_ = std::get<0>(GetParam());
+    cost_cache_ = std::get<1>(GetParam());
+    allow_delay_and_stalls_ = std::get<2>(GetParam());
+
+    wbm_enabled_ = (quota_ > 0U);
+    cache_ = NewLRUCache(4 * 1024 * 1024, 2);
+    max_num_parallel_flushes_ =
+        WriteBufferManager::FlushInitiationOptions().max_num_parallel_flushes;
+
+    CreateWbm();
+    SetupAndEnableTestPoints();
+
+    actual_num_cbs_ = 0U;
+    expected_num_cbs_ = 0U;
+    validation_num_ = 0U;
+    expected_num_flushes_to_initiate_ = 0U;
+    expected_num_running_flushes_ = 0U;
+  }
+
+  void TearDown() override {
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+
+    ASSERT_TRUE(expected_cb_initiators_.empty());
+    ASSERT_TRUE(expected_cb_min_size_to_flush_.empty());
+    ASSERT_TRUE(flush_cb_results_.empty());
+
+    initiators_.clear();
+  }
+
+  bool IsWbmDisabled() const { return (wbm_enabled_ == false); }
+
+  void CreateWbm() {
+    auto wbm_quota = (wbm_enabled_ ? quota_ : 0U);
+    WriteBufferManager::FlushInitiationOptions initiation_options;
+    initiation_options.max_num_parallel_flushes = max_num_parallel_flushes_;
+
+    ASSERT_GT(max_num_parallel_flushes_, 0U);
+    flush_step_size_ = quota_ / max_num_parallel_flushes_;
+
+    if (cost_cache_) {
+      wbm_.reset(new WriteBufferManager(wbm_quota, cache_,
+                                        allow_delay_and_stalls_, true,
+                                        initiation_options));
+    } else {
+      wbm_.reset(new WriteBufferManager(wbm_quota, nullptr,
+                                        allow_delay_and_stalls_, true,
+                                        initiation_options));
+    }
+    ASSERT_EQ(wbm_->enabled(), wbm_enabled_);
+    ASSERT_TRUE(wbm_->IsInitiatingFlushes());
+  }
+
+  uint64_t CreateInitiator() {
+    auto initiator = std::make_unique<uint64_t>(++next_initiator_id_);
+    auto initiator_id = *initiator;
+    initiators_.push_back(std::move(initiator));
+    return initiator_id;
+  }
+
+  void RegisterInitiator(uint64_t initiator_id) {
+    auto initiator = FindInitiator(initiator_id);
+    ASSERT_NE(initiator, nullptr);
+    if (initiator != nullptr) {
+      auto cb =
+          std::bind(&WriteBufferManagerFlushInitiationTest::FlushRequestCb,
+                    this, std::placeholders::_1, initiator);
+      wbm_->RegisterFlushInitiator(initiator, cb);
+    }
+  }
+
+  uint64_t CreateAndRegisterInitiator() {
+    auto initiator_id = CreateInitiator();
+    RegisterInitiator(initiator_id);
+    return initiator_id;
+  }
+
+  std::optional<uint64_t> FindInitiatorIdx(uint64_t initiator_id) {
+    for (auto i = 0U; i < initiators_.size(); ++i) {
+      if (*initiators_[i] == initiator_id) {
+        return i;
+      }
+    }
+
+    return {};
+  }
+
+  uint64_t* FindInitiator(uint64_t initiator_id) {
+    auto initiator_idx = FindInitiatorIdx(initiator_id);
+    if (initiator_idx.has_value()) {
+      return initiators_[initiator_idx.value()].get();
+    } else {
+      ADD_FAILURE();
+      return nullptr;
+    }
+  }
+
+  void DeregisterInitiator(uint64_t initiator_id) {
+    auto initiator_idx = FindInitiatorIdx(initiator_id);
+    ASSERT_TRUE(initiator_idx.has_value());
+
+    if (initiator_idx.has_value()) {
+      wbm_->DeregisterFlushInitiator(initiators_[initiator_idx.value()].get());
+      initiators_.erase(initiators_.begin() + initiator_idx.value());
+    }
+  }
+
+  struct ExpectedCbInfo {
+    uint64_t initiator_id;
+    size_t min_size_to_flush;
+    bool flush_cb_result;
+  };
+
+  void AddExpectedCbsInfos(const std::vector<ExpectedCbInfo>& cbs_infos) {
+    ASSERT_TRUE(expected_cb_initiators_.empty());
+    ASSERT_TRUE(expected_cb_min_size_to_flush_.empty());
+    ASSERT_TRUE(flush_cb_results_.empty());
+
+    if (IsWbmDisabled()) {
+      return;
+    }
+
+    for (const auto& cb_info : cbs_infos) {
+      auto initiator = FindInitiator(cb_info.initiator_id);
+      ASSERT_NE(initiator, nullptr);
+      expected_cb_initiators_.push_back(initiator);
+
+      expected_cb_min_size_to_flush_.push_back(cb_info.min_size_to_flush);
+      flush_cb_results_.push_back(cb_info.flush_cb_result);
+    }
+    actual_num_cbs_ = 0U;
+    expected_num_cbs_ = cbs_infos.size();
+
+    ++validation_num_;
+    std::string test_point_name_suffix = std::to_string(validation_num_);
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+        {{"DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" +
+              test_point_name_suffix,
+          "ValidateState::WaitUntilValidtionPossible:" +
+              test_point_name_suffix}});
+  }
+
+  // Flush initiaion callback registered with the WBM
+  bool FlushRequestCb(size_t min_size_to_flush, void* initiator) {
+    EXPECT_TRUE(wbm_enabled_);
+
+    ++actual_num_cbs_;
+
+    if (expected_cb_min_size_to_flush_.empty() == false) {
+      EXPECT_EQ(expected_cb_min_size_to_flush_[0], min_size_to_flush);
+      expected_cb_min_size_to_flush_.erase(
+          expected_cb_min_size_to_flush_.begin());
+    } else {
+      EXPECT_FALSE(expected_cb_min_size_to_flush_.empty());
+    }
+
+    if (expected_cb_initiators_.empty() == false) {
+      EXPECT_EQ(expected_cb_initiators_[0], initiator);
+      expected_cb_initiators_.erase(expected_cb_initiators_.begin());
+    } else {
+      EXPECT_FALSE(expected_cb_initiators_.empty());
+    }
+
+    if (flush_cb_results_.empty() == false) {
+      bool result = flush_cb_results_[0];
+      flush_cb_results_.erase(flush_cb_results_.begin());
+      return result;
+    } else {
+      EXPECT_FALSE(flush_cb_results_.empty());
+      // Arbitrarily return true as we must return a bool to compile
+      return true;
+    }
+  };
+
+  // Sync Test Point callback called when the flush initiation thread completes
+  // initating all flushes and resumes waiting for the condition variable to be
+  // signalled again
+  void DoneInitiationsAttemptTestPointCb(void* /*  arg */) {
+    if (actual_num_cbs_ == expected_num_cbs_) {
+      auto sync_point_name =
+          "DoneInitiationsAttemptTestPointCb::ExpectedNumAttempts:" +
+          std::to_string(validation_num_);
+      TEST_SYNC_POINT(sync_point_name);
+    }
+  }
+
+  void SetupAndEnableTestPoints() {
+    if (IsWbmDisabled()) {
+      return;
+    }
+
+    SyncPoint::GetInstance()->SetCallBack(
+        "WriteBufferManager::InitiateFlushesThread::DoneInitiationsAttempt",
+        [&](void* arg) { DoneInitiationsAttemptTestPointCb(arg); });
+
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  }
+
+  void ValidateState(bool wait_on_sync_point) {
+    if (wbm_enabled_ && wait_on_sync_point) {
+      auto sync_point_name = "ValidateState::WaitUntilValidtionPossible:" +
+                             std::to_string(validation_num_);
+      TEST_SYNC_POINT(sync_point_name);
+    }
+
+    ASSERT_EQ(wbm_->TEST_GetNumFlushesToInitiate(),
+              expected_num_flushes_to_initiate_);
+    ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), expected_num_running_flushes_);
+
+    ASSERT_TRUE(expected_cb_initiators_.empty())
+        << "Num entries:" << expected_cb_initiators_.size();
+    ASSERT_TRUE(expected_cb_min_size_to_flush_.empty())
+        << "Num entries:" << expected_cb_min_size_to_flush_.size();
+    ASSERT_TRUE(flush_cb_results_.empty())
+        << "Num entries:" << flush_cb_results_.size();
+  }
+
+  void EndFlush(bool wbm_initiated, size_t released_size,
+                bool wait_on_sync_point = false) {
+    wbm_->FreeMem(released_size);
+    wbm_->FlushEnded(wbm_initiated /* wbm_initiated */);
+    DecNumRunningFlushes();
+    ValidateState(wait_on_sync_point);
+  }
+
+  void StartAndEndFlush(bool wbm_initiated, size_t released_size) {
+    // "Run" the flush to completion & release the memory
+    wbm_->FlushStarted(wbm_initiated /* wbm_initiated */);
+    if ((wbm_initiated == false) && wbm_enabled_) {
+      ++expected_num_running_flushes_;
+    }
+    EndFlush(wbm_initiated, released_size);
+  }
+
+  void IncNumRunningFlushes() {
+    if (wbm_enabled_) {
+      ++expected_num_running_flushes_;
+    }
+  }
+
+  void DecNumRunningFlushes() {
+    if (wbm_enabled_) {
+      --expected_num_running_flushes_;
+    }
+  }
+
+  void IncNumFlushesToInitiate() {
+    if (wbm_enabled_) {
+      ++expected_num_flushes_to_initiate_;
+    }
+  }
+
+  void DecNumFlushesToInitiate() {
+    if (wbm_enabled_) {
+      --expected_num_flushes_to_initiate_;
+    }
+  }
+
+ protected:
+  static constexpr size_t kDeafaultMinSizeToFlush = 4 * 1024 * 1024;
+
+ protected:
+  std::unique_ptr<WriteBufferManager> wbm_;
+
+  size_t quota_ = 0U;
+  bool wbm_enabled_;
+  bool cost_cache_;
+  std::shared_ptr<Cache> cache_;
+  bool allow_delay_and_stalls_ = false;
+  size_t max_num_parallel_flushes_;
+  size_t flush_step_size_ = 0U;
+
+  std::vector<std::unique_ptr<uint64_t>> initiators_;
+  uint64_t next_initiator_id_ = 0U;
+  std::vector<void*> expected_cb_initiators_;
+  std::vector<size_t> expected_cb_min_size_to_flush_;
+  std::vector<bool> flush_cb_results_;
+  size_t actual_num_cbs_ = 0;
+  size_t expected_num_cbs_ = 0U;
+  size_t expected_num_flushes_to_initiate_ = 0U;
+  size_t expected_num_running_flushes_ = 0U;
+  size_t validation_num_ = 0U;
+};
+
+TEST_P(WriteBufferManagerFlushInitiationTest, Basic) {
+  // Register a single initiator
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // Reach the 1st step => expecting a single flush to be initiated
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, NonWbmInitiatedFlush) {
+  // Register a single initiator
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  wbm_->FlushStarted(false /* wbm_initiated */);
+  IncNumRunningFlushes();
+
+  // Reach the 1st step => No need to initiate a flush (one is already running)
+  wbm_->ReserveMem(flush_step_size_);
+  CALL_WRAPPER(ValidateState(false));
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // End the non-wbm flush without releasing memory, just for testing purposes
+  // Expecting a wbm-initiated flush request since we are still over the step
+  wbm_->FlushEnded(false /* wbm_initiated */);
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the wbm-initiated flush to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, MaxNumParallelFlushes) {
+  // Replace the WBM with a new WBM that is configured with our max num of
+  // parallel flushes
+  max_num_parallel_flushes_ = 3U;
+  ASSERT_NE(max_num_parallel_flushes_,
+            wbm_->GetFlushInitiationOptions().max_num_parallel_flushes);
+  CreateWbm();
+  ASSERT_EQ(wbm_->GetFlushInitiationOptions().max_num_parallel_flushes,
+            max_num_parallel_flushes_);
+
+  // Register a single initiator
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  // Start 3 (max) number of non-wbm flushes
+  for (auto i = 0U; i < max_num_parallel_flushes_; ++i) {
+    wbm_->FlushStarted(false /* wbm_initiated */);
+    IncNumRunningFlushes();
+  }
+
+  // Reserve memory to allow for up to 3 (max) wbm-initiated flushes
+  // However, 3 (max) are already running => no wbm-initaited flush expected
+  wbm_->ReserveMem(max_num_parallel_flushes_ * flush_step_size_);
+  CALL_WRAPPER(ValidateState(false));
+
+  // Start another (total of 4 > max) non-wbm flush
+  wbm_->ReserveMem(2 * flush_step_size_);
+  wbm_->FlushStarted(false /* wbm_initiated */);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(false));
+
+  // End one of the non-wbm flushes 3 (max) still running, and usage requires
+  // max flushes
+  CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_));
+
+  // End another one of the non-wbm flushes => 2 (< max) running =>
+  // Expecting one wbm-initiated
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+  // Increasing since expecteing wbm to initiate it
+  IncNumRunningFlushes();
+  CALL_WRAPPER(EndFlush(false /* wbm_initiated */, flush_step_size_,
+                        true /* wait_on_sync_point */));
+
+  wbm_->ReserveMem(2 * flush_step_size_);
+  CALL_WRAPPER(ValidateState(false));
+
+  // End a wbm-initiated flushes => 2 (< max) running => Expecting one
+  // wbm-initiated
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+  // Increasing since expecteing wbm to initiate it
+  IncNumRunningFlushes();
+  CALL_WRAPPER(EndFlush(true /* wbm_initiated */, flush_step_size_,
+                        true /* wait_on_sync_point */));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, JumpToQuota) {
+  // Register a single initiator
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // Reach the 1st step => expecting a single flush to be initiated
+  wbm_->ReserveMem(quota_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, quota_));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest,
+       FailureToStartFlushWhenRequested) {
+  // Register a single initiator
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  // Setup two cb-s to fail to start the flush (flush_cb_result == false)
+  // First with kDeafaultMinSizeToFlush size, Second with 0
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id, 0U, false /* flush_cb_result */}}));
+
+  // Reach the 1st step => expecting the 2 requests set up above
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumFlushesToInitiate();
+  CALL_WRAPPER(ValidateState(true));
+
+  // Setup another two identical cb-s
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id, 0U, false /* flush_cb_result */}}));
+
+  // Reserve a bit more, but still within the same step. This will initiate
+  // the next 2 request set up just above
+  wbm_->TEST_WakeupFlushInitiationThread();
+  CALL_WRAPPER(ValidateState(true));
+
+  // Now, allow the second request to succeed
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id, 0U, true /* flush_cb_result */}}));
+
+  // Reserve a bit more, but still within the same step. This will initiate
+  // the next 2 request set up just above
+  wbm_->TEST_WakeupFlushInitiationThread();
+  DecNumFlushesToInitiate();
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, FlushInitiationSteps) {
+  // Too much (useless) effort to adapt to the disabled case so just skipping
+  if (IsWbmDisabled()) {
+    return;
+  }
+  auto initiator_id = CreateAndRegisterInitiator();
+
+  // Increase the usage gradually in half-steps, each time expecting another
+  // flush to be initiated
+  for (auto i = 0U; i < max_num_parallel_flushes_; ++i) {
+    wbm_->ReserveMem(flush_step_size_ / 2);
+    CALL_WRAPPER(ValidateState(true));
+
+    CALL_WRAPPER(AddExpectedCbsInfos(
+        {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+    IncNumRunningFlushes();
+    wbm_->ReserveMem(flush_step_size_ / 2);
+    CALL_WRAPPER(ValidateState(true));
+  }
+  ASSERT_EQ(wbm_->memory_usage(), quota_);
+  ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_);
+
+  // Increase the usage over the quota. Not expecting any initiation activity
+  wbm_->ReserveMem(flush_step_size_ / 2);
+  wbm_->ReserveMem(flush_step_size_ / 2);
+  CALL_WRAPPER(ValidateState(false));
+
+  // Start all of the WBM flushes + some more that are NOT WBM flushes.
+  // No new flush should initiate
+  auto wbm_initiated = true;
+  size_t num_non_wbm_running_flushes = 0U;
+  for (auto i = 0U; i < 2 * max_num_parallel_flushes_; ++i) {
+    wbm_->FlushStarted(wbm_initiated);
+    if (wbm_initiated == false) {
+      IncNumRunningFlushes();
+      ++num_non_wbm_running_flushes;
+    }
+    wbm_initiated = !wbm_initiated;
+  }
+  ASSERT_EQ(expected_num_running_flushes_ - num_non_wbm_running_flushes,
+            max_num_parallel_flushes_);
+  CALL_WRAPPER(ValidateState(false));
+
+  // Release flushes + memory so that we are at the quota with max num
+  // of parallel flushes
+  while (expected_num_running_flushes_ > max_num_parallel_flushes_) {
+    EndFlush(wbm_initiated, 0U /* released_size */);
+    wbm_initiated = !wbm_initiated;
+  }
+  wbm_->FreeMem(flush_step_size_);
+  ASSERT_EQ(wbm_->memory_usage(), quota_);
+  ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), max_num_parallel_flushes_);
+  CALL_WRAPPER(ValidateState(false));
+
+  // Decrease just below the current flush step size
+  wbm_->FreeMem(1U);
+
+  while (wbm_->memory_usage() >= flush_step_size_) {
+    EndFlush(true, 0U /* released_size */);
+    CALL_WRAPPER(ValidateState(false));
+
+    CALL_WRAPPER(AddExpectedCbsInfos(
+        {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+    IncNumRunningFlushes();
+    EndFlush(false, 0U /* released_size */, true /* wait_on_sync_point */);
+
+    wbm_->FreeMem(flush_step_size_);
+  }
+  ASSERT_EQ(wbm_->memory_usage(), flush_step_size_ - 1);
+  ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 1U);
+
+  // End the last remaining flush and release all used memory
+  EndFlush(true, flush_step_size_ - 1 /* released_size */);
+  ASSERT_EQ(wbm_->memory_usage(), 0U);
+  ASSERT_EQ(wbm_->TEST_GetNumRunningFlushes(), 0U);
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, RegisteringLate) {
+  // Reach the 1st step, but no registered initiators
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumFlushesToInitiate();
+  CALL_WRAPPER(ValidateState(false));
+
+  // Register an initiator and expect it to receive the initiation request
+  auto initiator_id = CreateInitiator();
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+  RegisterInitiator(initiator_id);
+  DecNumFlushesToInitiate();
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, Deregistering) {
+  // Register a single initiator
+  auto initiator_id1 = CreateAndRegisterInitiator();
+
+  // initiator1 fails to initiate
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id1, 0U, false /* flush_cb_result */}}));
+
+  // Reach the 1st step => expecting a single flush to be initiated
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumFlushesToInitiate();
+  CALL_WRAPPER(ValidateState(true));
+
+  // Deregisters and comes initiator2
+  DeregisterInitiator(initiator_id1);
+  auto initiator_id2 = CreateInitiator();
+
+  // Set initiator2 to initiate the flush
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id2, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+  RegisterInitiator(initiator_id2);
+
+  DecNumFlushesToInitiate();
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id2);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest, TwoInitiatorsBasic) {
+  // Register two initiators
+  auto initiator_id1 = CreateAndRegisterInitiator();
+  auto initiator_id2 = CreateAndRegisterInitiator();
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // Expect the 1st request to reach initiator1
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id2, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // Expect the 2nd request to reach initiator2
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush of initiator1 to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  // "Run" the flush of initiator2 to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id2);
+  DeregisterInitiator(initiator_id1);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest,
+       TwoInitiatorsFirstFailsToInitiate) {
+  // Register two initiators
+  auto initiator_id1 = CreateAndRegisterInitiator();
+  auto initiator_id2 = CreateAndRegisterInitiator();
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id2, kDeafaultMinSizeToFlush, false /* flush_cb_result */},
+       {initiator_id1, 0U, false /* flush_cb_result */},
+       {initiator_id2, 0U, true /* flush_cb_result */}}));
+
+  // Expect the 1st request to reach initiator2
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush of initiator1 to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  // Expect the 2nd request to reach initiator1
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+
+  // "Run" the flush of initiator2 to completion & release the memory
+  CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+
+  DeregisterInitiator(initiator_id2);
+  DeregisterInitiator(initiator_id1);
+}
+
+TEST_P(WriteBufferManagerFlushInitiationTest,
+       TwoInitiatorsDeregisteringWhileBeingNextToFlush) {
+  // Register two initiators
+  auto initiator_id1 = CreateAndRegisterInitiator();
+  auto initiator_id2 = CreateAndRegisterInitiator();
+
+  // Initiator1 initiates, initiator2 is next
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+  if (wbm_enabled_) {
+    ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 1U);
+  }
+
+  // Initiator2 will be deregistered => prepare another initiation for
+  // initiator1
+  CALL_WRAPPER(AddExpectedCbsInfos(
+      {{initiator_id1, kDeafaultMinSizeToFlush, true /* flush_cb_result */}}));
+
+  DeregisterInitiator(initiator_id2);
+  ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U);
+
+  wbm_->ReserveMem(flush_step_size_);
+  IncNumRunningFlushes();
+  CALL_WRAPPER(ValidateState(true));
+  ASSERT_EQ(wbm_->TEST_GetNextCandidateInitiatorIdx(), 0U);
+
+  // "Run" both flushes to completion & release the memory
+  for (auto i = 0U; i < 2; ++i) {
+    CALL_WRAPPER(StartAndEndFlush(true, flush_step_size_));
+  }
+
+  DeregisterInitiator(initiator_id1);
+}
+
 INSTANTIATE_TEST_CASE_P(WriteBufferManagerTestWithParams,
                         WriteBufferManagerTestWithParams,
-                        ::testing::Combine(testing::Bool(), testing::Bool(),
-                                           testing::Bool()));
+                        ::testing::Combine(::testing::Bool(), ::testing::Bool(),
+                                           ::testing::Bool()));
+
+// Run the flush initiation tests in all combinations of:
+// 1. WBM Enabled (buffer size > 0) / WBM Disabled (0 buffer size)
+// 2. With and without costing to cache
+// 3. Allow / Disallow delays and stalls
+INSTANTIATE_TEST_CASE_P(WriteBufferManagerFlushInitiationTest,
+                        WriteBufferManagerFlushInitiationTest,
+                        ::testing::Combine(::testing::Values(10 * 1000, 0),
+                                           ::testing::Bool(),
+                                           ::testing::Bool()));
 
 }  // namespace ROCKSDB_NAMESPACE
 
diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc
index 04d610e3b4..7feab56342 100644
--- a/tools/db_bench_tool.cc
+++ b/tools/db_bench_tool.cc
@@ -327,6 +327,22 @@ DEFINE_int64(max_scan_distance, 0,
 
 DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
 
+DEFINE_bool(allow_wbm_delays_and_stalls, true,
+            "Enable WBM write stalls and delays ");
+DEFINE_bool(initiate_wbm_flushes, true,
+            "WBM will proactively initiate flushes (Speedb)."
+            "If false, WBM-related flushes will be initiated using the "
+            "ShouldFlush() service "
+            "of the WBM.");
+DEFINE_bool(disable_non_wbm_initated_auto_flushes, true,
+            "In case FLAGS_initiate_wbm_flushes is true, this flag will "
+            "overwrite the applilcable "
+            "options so that only the WBM will initiate automatic flushes.");
+DEFINE_uint32(max_num_parallel_flushes, 0,
+              "In case FLAGGS_initiate_wbm_flushes is true, this flag will "
+              "overwrite the default "
+              "max number of parallel flushes.");
+
 DEFINE_int64(batch_size, 1, "Batch size");
 
 static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) {
@@ -3879,10 +3895,6 @@ class Benchmark {
 
     options.env = FLAGS_env;
     options.max_open_files = FLAGS_open_files;
-    if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
-      options.write_buffer_manager.reset(
-          new WriteBufferManager(FLAGS_db_write_buffer_size, cache_));
-    }
     options.arena_block_size = FLAGS_arena_block_size;
     options.write_buffer_size = FLAGS_write_buffer_size;
     options.max_write_buffer_number = FLAGS_max_write_buffer_number;
@@ -4288,6 +4300,18 @@ class Benchmark {
       options.comparator = test::BytewiseComparatorWithU64TsWrapper();
     }
 
+    if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
+      WriteBufferManager::FlushInitiationOptions flush_initiation_options;
+      if (FLAGS_max_num_parallel_flushes > 0U) {
+        flush_initiation_options.max_num_parallel_flushes =
+            FLAGS_max_num_parallel_flushes;
+      }
+
+      options.write_buffer_manager.reset(new WriteBufferManager(
+          FLAGS_db_write_buffer_size, cache_, FLAGS_allow_wbm_delays_and_stalls,
+          FLAGS_initiate_wbm_flushes, flush_initiation_options));
+    }
+
     // Integrated BlobDB
     options.enable_blob_files = FLAGS_enable_blob_files;
     options.min_blob_size = FLAGS_min_blob_size;