diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 63137493f86..6a5c2406d66 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -2661,6 +2661,27 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(DBAtomicFlushTest, DisableManualCompaction) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = GetParam(); + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(dbfull()->PauseBackgroundWork()); + ASSERT_OK(Put(0, "key00", "value00")); + ASSERT_OK(Put(1, "key10", "value10")); + dbfull()->DisableManualCompaction(); + FlushOptions flush_opts; + flush_opts.wait = true; + flush_opts.check_if_compaction_disabled = true; + ASSERT_TRUE(dbfull()->Flush(flush_opts, handles_).IsIncomplete()); + ASSERT_OK(Put(0, "key01", "value01")); + ASSERT_OK(db_->ContinueBackgroundWork()); + dbfull()->EnableManualCompaction(); + ASSERT_OK(dbfull()->Flush(flush_opts, handles_)); + Close(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index da86e361237..4191665301c 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2028,6 +2028,20 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); + // Need to check inside lock to avoid [flush()] -> [disable] -> [schedule]. + if (flush_options.check_if_compaction_disabled && + manual_compaction_paused_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + if (flush_options.expected_oldest_key_time != 0 && + cfd->mem()->ApproximateOldestKeyTime() != + flush_options.expected_oldest_key_time) { + std::ostringstream oss; + oss << "Oldest key time doesn't match. expected=" + << flush_options.expected_oldest_key_time + << ", actual=" << cfd->mem()->ApproximateOldestKeyTime(); + return Status::Incomplete(oss.str()); + } WriteThread::Writer w; WriteThread::Writer nonmem_w; @@ -2040,9 +2054,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WaitForPendingWrites(); if (flush_reason != FlushReason::kErrorRecoveryRetryFlush && - (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) && - (cfd->mem()->ApproximateMemoryUsageFast() >= - flush_options.min_size_to_flush)) { + (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) { // Note that, when flush reason is kErrorRecoveryRetryFlush, during the // auto retry resume, we want to avoid creating new small memtables. // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl @@ -2174,6 +2186,11 @@ Status DBImpl::AtomicFlushMemTables( { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); + // Need to check inside lock to avoid [flush()] -> [disable] -> [schedule]. + if (flush_options.check_if_compaction_disabled && + manual_compaction_paused_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } WriteThread::Writer w; WriteThread::Writer nonmem_w; @@ -2199,10 +2216,6 @@ Status DBImpl::AtomicFlushMemTables( flush_reason == FlushReason::kErrorRecoveryRetryFlush) { continue; } - if (cfd->mem()->ApproximateMemoryUsageFast() < - flush_options.min_size_to_flush) { - continue; - } cfd->Ref(); s = SwitchMemtable(cfd, &context); cfd->UnrefAndTryDelete(); diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index d54810ee39f..afb504fc2b1 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -917,6 +917,86 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +// Test write can progress even if manual compaction and background work is +// paused. +TEST_P(DBWriteBufferManagerTest, BackgroundWorkPaused) { + std::vector dbnames; + std::vector dbs; + int num_dbs = 4; + + for (int i = 0; i < num_dbs; i++) { + dbs.push_back(nullptr); + dbnames.push_back( + test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i))); + } + + Options options = CurrentOptions(); + options.arena_block_size = 4096; + options.write_buffer_size = 500000; // this is never hit + options.avoid_flush_during_shutdown = true; // avoid blocking destroy forever + std::shared_ptr cache = NewLRUCache(4 * 1024 * 1024, 2); + ASSERT_LT(cache->GetUsage(), 256 * 1024); + cost_cache_ = GetParam(); + + // Do not enable write stall. + if (cost_cache_) { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, cache, 0.0)); + } else { + options.write_buffer_manager.reset( + new WriteBufferManager(100000, nullptr, 0.0)); + } + DestroyAndReopen(options); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(DestroyDB(dbnames[i], options)); + ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i]))); + } + + dbfull()->DisableManualCompaction(); + ASSERT_OK(dbfull()->PauseBackgroundWork()); + for (int i = 0; i < num_dbs; i++) { + dbs[i]->DisableManualCompaction(); + ASSERT_OK(dbs[i]->PauseBackgroundWork()); + } + + WriteOptions wo; + wo.disableWAL = true; + + // Arrange the score like this: (this)2000, (0-th)100000, (1-th)1, ... + ASSERT_OK(Put(Key(1), DummyString(2000), wo)); + for (int i = 1; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(1))); + } + // Exceed the limit. + ASSERT_OK(dbs[0]->Put(wo, Key(1), DummyString(100000))); + // Write another one to trigger the flush. + ASSERT_OK(Put(Key(3), DummyString(1), wo)); + + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->ContinueBackgroundWork()); + ASSERT_OK( + static_cast_with_check(dbs[i])->TEST_WaitForFlushMemTable()); + std::string property; + EXPECT_TRUE(dbs[i]->GetProperty("rocksdb.num-files-at-level0", &property)); + int num = atoi(property.c_str()); + ASSERT_EQ(num, 0); + } + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + std::string property; + EXPECT_TRUE(dbfull()->GetProperty("rocksdb.num-files-at-level0", &property)); + int num = atoi(property.c_str()); + ASSERT_EQ(num, 1); + + // Clean up DBs. + for (int i = 0; i < num_dbs; i++) { + ASSERT_OK(dbs[i]->Close()); + ASSERT_OK(DestroyDB(dbnames[i], options)); + delete dbs[i]; + } +} + INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 146b096e751..ee8d89a3b3a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1716,10 +1716,13 @@ struct FlushOptions { // is performed by someone else (foreground call or background thread). // Default: false bool allow_write_stall; - // Only switch mutable memtable if its size is no smaller than this parameter. - // Zero is no-op. + // Only flush memtable if it has the expected oldest key time. + // This option is ignored for atomic flush. Zero means disabling the check. // Default: 0 - uint64_t min_size_to_flush; + uint64_t expected_oldest_key_time; + // Abort flush if compaction is disabled via `DisableManualCompaction`. + // Default: false + bool check_if_compaction_disabled; // Used by RocksDB internally. // Default: false bool _write_stopped; @@ -1727,7 +1730,8 @@ struct FlushOptions { FlushOptions() : wait(true), allow_write_stall(false), - min_size_to_flush(0), + expected_oldest_key_time(0), + check_if_compaction_disabled(false), _write_stopped(false) {} }; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 2fec88dd9aa..6a7e199b33c 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -12,6 +12,7 @@ #include "cache/cache_entry_roles.h" #include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" +#include "logging/logging.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "util/coding.h" @@ -177,16 +178,19 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { if (!ShouldFlush()) { return; } - // We only flush at most one column family at a time. - // This is enough to keep size under control except when flush_size is - // dynamically decreased. That case is managed in `SetFlushSize`. - WriteBufferSentinel* candidate = nullptr; - uint64_t candidate_size = 0; - uint64_t max_score = 0; - uint64_t current_score = 0; + // Have at least one candidate to flush with + // check_if_compaction_disabled=false when all others failed. + constexpr size_t kCandidateSize = 2; + // (score, age). + using Candidate = std::tuple; + auto cmp = [](const Candidate& a, const Candidate& b) { + return std::get<1>(a) <= std::get<1>(b); + }; + std::set candidates(cmp); for (auto& s : sentinels_) { // TODO: move this calculation to a callback. + uint64_t current_score = 0; uint64_t current_memory_bytes = std::numeric_limits::max(); uint64_t oldest_time = std::numeric_limits::max(); s->db->GetApproximateActiveMemTableStats(s->cf, ¤t_memory_bytes, @@ -217,20 +221,37 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) { current_score = current_score * (100 - factor) / factor; } } - if (current_score > max_score) { - candidate = s.get(); - max_score = current_score; - candidate_size = current_memory_bytes; + candidates.insert({s.get(), current_score, oldest_time}); + if (candidates.size() > kCandidateSize) { + candidates.erase(candidates.begin()); } } - if (candidate != nullptr) { + // We only flush at most one column family at a time. + // This is enough to keep size under control except when flush_size is + // dynamically decreased. That case is managed in `SetFlushSize`. + auto candidate = candidates.rbegin(); + while (candidate != candidates.rend()) { + auto sentinel = std::get<0>(*candidate); FlushOptions flush_opts; flush_opts.allow_write_stall = true; flush_opts.wait = false; - flush_opts._write_stopped = (candidate->db == this_db); - flush_opts.min_size_to_flush = candidate_size; - candidate->db->Flush(flush_opts, candidate->cf); + flush_opts._write_stopped = (sentinel->db == this_db); + flush_opts.expected_oldest_key_time = std::get<2>(*candidate); + candidate++; + if (candidate != candidates.rend()) { + // Don't check it for the last candidate. Otherwise we could end up + // never progressing. + flush_opts.check_if_compaction_disabled = true; + } + auto s = sentinel->db->Flush(flush_opts, sentinel->cf); + if (s.ok()) { + return; + } + auto opts = sentinel->db->GetDBOptions(); + ROCKS_LOG_WARN(opts.info_log, "WriteBufferManager fails to flush: %s", + s.ToString().c_str()); + // Fallback to the next best candidate. } }