Skip to content

Commit

Permalink
use expected age instead of size to avoid spurious flush (tikv#337)
Browse files Browse the repository at this point in the history
Also added a new options to detect whether manual compaction is disabled. In practice we use this to avoid blocking on flushing a tablet that will be destroyed shortly after.

---------

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Jun 6, 2023
1 parent cd9aa99 commit 8a9c10e
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 26 deletions.
21 changes: 21 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2661,6 +2661,27 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(DBAtomicFlushTest, DisableManualCompaction) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(Put(0, "key00", "value00"));
ASSERT_OK(Put(1, "key10", "value10"));
dbfull()->DisableManualCompaction();
FlushOptions flush_opts;
flush_opts.wait = true;
flush_opts.check_if_compaction_disabled = true;
ASSERT_TRUE(dbfull()->Flush(flush_opts, handles_).IsIncomplete());
ASSERT_OK(Put(0, "key01", "value01"));
ASSERT_OK(db_->ContinueBackgroundWork());
dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
Close();
}

INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

Expand Down
27 changes: 20 additions & 7 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,20 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
// Need to check inside lock to avoid [flush()] -> [disable] -> [schedule].
if (flush_options.check_if_compaction_disabled &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (flush_options.expected_oldest_key_time != 0 &&
cfd->mem()->ApproximateOldestKeyTime() !=
flush_options.expected_oldest_key_time) {
std::ostringstream oss;
oss << "Oldest key time doesn't match. expected="
<< flush_options.expected_oldest_key_time
<< ", actual=" << cfd->mem()->ApproximateOldestKeyTime();
return Status::Incomplete(oss.str());
}

WriteThread::Writer w;
WriteThread::Writer nonmem_w;
Expand All @@ -2040,9 +2054,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
WaitForPendingWrites();

if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
(!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) &&
(cfd->mem()->ApproximateMemoryUsageFast() >=
flush_options.min_size_to_flush)) {
(!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
// Note that, when flush reason is kErrorRecoveryRetryFlush, during the
// auto retry resume, we want to avoid creating new small memtables.
// Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
Expand Down Expand Up @@ -2174,6 +2186,11 @@ Status DBImpl::AtomicFlushMemTables(
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
// Need to check inside lock to avoid [flush()] -> [disable] -> [schedule].
if (flush_options.check_if_compaction_disabled &&
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}

WriteThread::Writer w;
WriteThread::Writer nonmem_w;
Expand All @@ -2199,10 +2216,6 @@ Status DBImpl::AtomicFlushMemTables(
flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
continue;
}
if (cfd->mem()->ApproximateMemoryUsageFast() <
flush_options.min_size_to_flush) {
continue;
}
cfd->Ref();
s = SwitchMemtable(cfd, &context);
cfd->UnrefAndTryDelete();
Expand Down
80 changes: 80 additions & 0 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,86 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

// Test write can progress even if manual compaction and background work is
// paused.
TEST_P(DBWriteBufferManagerTest, BackgroundWorkPaused) {
std::vector<std::string> dbnames;
std::vector<DB*> dbs;
int num_dbs = 4;

for (int i = 0; i < num_dbs; i++) {
dbs.push_back(nullptr);
dbnames.push_back(
test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
}

Options options = CurrentOptions();
options.arena_block_size = 4096;
options.write_buffer_size = 500000; // this is never hit
options.avoid_flush_during_shutdown = true; // avoid blocking destroy forever
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 256 * 1024);
cost_cache_ = GetParam();

// Do not enable write stall.
if (cost_cache_) {
options.write_buffer_manager.reset(
new WriteBufferManager(100000, cache, 0.0));
} else {
options.write_buffer_manager.reset(
new WriteBufferManager(100000, nullptr, 0.0));
}
DestroyAndReopen(options);

for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(DestroyDB(dbnames[i], options));
ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
}

dbfull()->DisableManualCompaction();
ASSERT_OK(dbfull()->PauseBackgroundWork());
for (int i = 0; i < num_dbs; i++) {
dbs[i]->DisableManualCompaction();
ASSERT_OK(dbs[i]->PauseBackgroundWork());
}

WriteOptions wo;
wo.disableWAL = true;

// Arrange the score like this: (this)2000, (0-th)100000, (1-th)1, ...
ASSERT_OK(Put(Key(1), DummyString(2000), wo));
for (int i = 1; i < num_dbs; i++) {
ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(1)));
}
// Exceed the limit.
ASSERT_OK(dbs[0]->Put(wo, Key(1), DummyString(100000)));
// Write another one to trigger the flush.
ASSERT_OK(Put(Key(3), DummyString(1), wo));

for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(dbs[i]->ContinueBackgroundWork());
ASSERT_OK(
static_cast_with_check<DBImpl>(dbs[i])->TEST_WaitForFlushMemTable());
std::string property;
EXPECT_TRUE(dbs[i]->GetProperty("rocksdb.num-files-at-level0", &property));
int num = atoi(property.c_str());
ASSERT_EQ(num, 0);
}
ASSERT_OK(dbfull()->ContinueBackgroundWork());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
std::string property;
EXPECT_TRUE(dbfull()->GetProperty("rocksdb.num-files-at-level0", &property));
int num = atoi(property.c_str());
ASSERT_EQ(num, 1);

// Clean up DBs.
for (int i = 0; i < num_dbs; i++) {
ASSERT_OK(dbs[i]->Close());
ASSERT_OK(DestroyDB(dbnames[i], options));
delete dbs[i];
}
}

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

Expand Down
12 changes: 8 additions & 4 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1716,18 +1716,22 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
// Only switch mutable memtable if its size is no smaller than this parameter.
// Zero is no-op.
// Only flush memtable if it has the expected oldest key time.
// This option is ignored for atomic flush. Zero means disabling the check.
// Default: 0
uint64_t min_size_to_flush;
uint64_t expected_oldest_key_time;
// Abort flush if compaction is disabled via `DisableManualCompaction`.
// Default: false
bool check_if_compaction_disabled;
// Used by RocksDB internally.
// Default: false
bool _write_stopped;

FlushOptions()
: wait(true),
allow_write_stall(false),
min_size_to_flush(0),
expected_oldest_key_time(0),
check_if_compaction_disabled(false),
_write_stopped(false) {}
};

Expand Down
51 changes: 36 additions & 15 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/db_impl/db_impl.h"
#include "logging/logging.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "util/coding.h"
Expand Down Expand Up @@ -177,16 +178,19 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) {
if (!ShouldFlush()) {
return;
}
// We only flush at most one column family at a time.
// This is enough to keep size under control except when flush_size is
// dynamically decreased. That case is managed in `SetFlushSize`.
WriteBufferSentinel* candidate = nullptr;
uint64_t candidate_size = 0;
uint64_t max_score = 0;
uint64_t current_score = 0;
// Have at least one candidate to flush with
// check_if_compaction_disabled=false when all others failed.
constexpr size_t kCandidateSize = 2;
// (score, age).
using Candidate = std::tuple<WriteBufferSentinel*, uint64_t, uint64_t>;
auto cmp = [](const Candidate& a, const Candidate& b) {
return std::get<1>(a) <= std::get<1>(b);
};
std::set<Candidate, decltype(cmp)> candidates(cmp);

for (auto& s : sentinels_) {
// TODO: move this calculation to a callback.
uint64_t current_score = 0;
uint64_t current_memory_bytes = std::numeric_limits<uint64_t>::max();
uint64_t oldest_time = std::numeric_limits<uint64_t>::max();
s->db->GetApproximateActiveMemTableStats(s->cf, &current_memory_bytes,
Expand Down Expand Up @@ -217,20 +221,37 @@ void WriteBufferManager::MaybeFlushLocked(DB* this_db) {
current_score = current_score * (100 - factor) / factor;
}
}
if (current_score > max_score) {
candidate = s.get();
max_score = current_score;
candidate_size = current_memory_bytes;
candidates.insert({s.get(), current_score, oldest_time});
if (candidates.size() > kCandidateSize) {
candidates.erase(candidates.begin());
}
}

if (candidate != nullptr) {
// We only flush at most one column family at a time.
// This is enough to keep size under control except when flush_size is
// dynamically decreased. That case is managed in `SetFlushSize`.
auto candidate = candidates.rbegin();
while (candidate != candidates.rend()) {
auto sentinel = std::get<0>(*candidate);
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
flush_opts.wait = false;
flush_opts._write_stopped = (candidate->db == this_db);
flush_opts.min_size_to_flush = candidate_size;
candidate->db->Flush(flush_opts, candidate->cf);
flush_opts._write_stopped = (sentinel->db == this_db);
flush_opts.expected_oldest_key_time = std::get<2>(*candidate);
candidate++;
if (candidate != candidates.rend()) {
// Don't check it for the last candidate. Otherwise we could end up
// never progressing.
flush_opts.check_if_compaction_disabled = true;
}
auto s = sentinel->db->Flush(flush_opts, sentinel->cf);
if (s.ok()) {
return;
}
auto opts = sentinel->db->GetDBOptions();
ROCKS_LOG_WARN(opts.info_log, "WriteBufferManager fails to flush: %s",
s.ToString().c_str());
// Fallback to the next best candidate.
}
}

Expand Down

0 comments on commit 8a9c10e

Please sign in to comment.