Skip to content

Commit

Permalink
WriteController: fix for stop while shutting down
Browse files Browse the repository at this point in the history
Also switch to waiting a sec on the CV each time. This is required
since a bg error doesn't signal the CV in the WriteController.
  • Loading branch information
Yuval-Ariel committed May 14, 2023
1 parent 6c96280 commit c4efc92
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 20 deletions.
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ class ColumnFamilyData {
const MutableCFOptions& mutable_cf_options,
WriteStallCause& write_stall_cause);

void TEST_ResetWriteControllerToken() { write_controller_token_.reset(); }

private:
std::unique_ptr<WriteControllerToken> DynamicSetupDelay(
WriteController* write_controller, uint64_t compaction_needed_bytes,
Expand Down
29 changes: 15 additions & 14 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1810,13 +1810,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
mutex_.AssertHeld();
uint64_t time_delayed = 0;
bool delayed = false;
bool stopped = false;
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
// To avoid parallel timed delays (bad throttling), only support them
// on the primary write queue.
uint64_t delay;
// TODO: yuval - check whether db_mutex can be unlocked during GetDelay
if (&write_thread == &write_thread_) {
delay =
write_controller_->GetDelay(immutable_db_options_.clock, num_bytes);
Expand Down Expand Up @@ -1865,7 +1865,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
}
delayed = true;
stopped = true;

// Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
Expand All @@ -1876,16 +1876,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait");
}
mutex_.Unlock();

write_controller_->WaitOnCV(error_handler_);
auto continue_wait = [this]() -> bool {
return (this->error_handler_.GetBGError().ok() &&
!(this->shutting_down_.load(std::memory_order_relaxed)));
};
write_controller_->WaitOnCV(continue_wait);

mutex_.Lock();
TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_);
write_thread.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);
if (delayed) {
assert((!delayed && !stopped) || !write_options.no_slowdown);
if (delayed || stopped) {
default_cf_internal_stats_->AddDBStats(
InternalStats::kIntStatsWriteStallMicros, time_delayed);
RecordTick(stats_, STALL_MICROS, time_delayed);
Expand All @@ -1895,14 +1898,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
// writes, we can ignore any background errors and allow the write to
// proceed
Status s;
if (write_controller_->IsStopped()) {
if (!shutting_down_.load(std::memory_order_relaxed)) {
// If writes are still stopped and db not shutdown, it means we bailed
// due to a background error
s = Status::Incomplete(error_handler_.GetBGError().ToString());
} else {
s = Status::ShutdownInProgress("stalled writes");
}
if (stopped && shutting_down_.load(std::memory_order_relaxed)) {
s = Status::ShutdownInProgress("stalled writes");
} else if (write_controller_->IsStopped()) {
// If writes are still stopped and db not shutdown, it means we bailed
// due to a background error
s = Status::Incomplete(error_handler_.GetBGError().ToString());
}
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
Expand Down
6 changes: 5 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7340,7 +7340,7 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) {
}
}

TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) {
TEST_F(DBTest, ShuttingDownNotBlockStalledWrites) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
Expand Down Expand Up @@ -7377,6 +7377,10 @@ TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) {

TEST_SYNC_POINT("DBTest::ShuttingDownNotBlockStalledWrites");
CancelAllBackgroundWork(db_, true);
// In addition to raising the shutting_down_ flag, we need to reset the Write
// Controller tokens since only the detor of the StopWriteToken wakes up the
// condition variable which the stopped thread is waiting on.
ResetWriteControllerTokens(dbfull());

thd.join();
}
Expand Down
7 changes: 7 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,13 @@ void VerifySstUniqueIds(const TablePropertiesCollection& props) {
}
}

void DBTestBase::ResetWriteControllerTokens(DBImpl* db) {
auto versions = db->GetVersionSet();
for (auto* cfd : versions->GetRefedColumnFamilySet()) {
cfd->TEST_ResetWriteControllerToken();
}
}

template <CacheEntryRole R>
TargetCacheChargeTrackingCache<R>::TargetCacheChargeTrackingCache(
std::shared_ptr<Cache> target)
Expand Down
2 changes: 2 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ class DBTestBase : public testing::Test {
// supported
void SetTimeElapseOnlySleepOnReopen(DBOptions* options);

void ResetWriteControllerTokens(DBImpl* db);

private: // Prone to error on direct use
void MaybeInstallTimeElapseOnlySleep(const DBOptions& options);

Expand Down
14 changes: 10 additions & 4 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <mutex>
#include <ratio>

Expand Down Expand Up @@ -106,11 +107,13 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
}
}

void WriteController::WaitOnCV(const ErrorHandler& error_handler) {
void WriteController::WaitOnCV(std::function<bool()> continue_wait) {
std::unique_lock<std::mutex> lock(stop_mu_);
while (error_handler.GetBGError().ok() && IsStopped()) {
while (continue_wait() && IsStopped()) {
TEST_SYNC_POINT("WriteController::WaitOnCV");
stop_cv_.wait(lock);
// need to time the wait since the stop_cv_ is not signalled if a bg error
// is raised.
stop_cv_.wait_for(lock, std::chrono::seconds(1));
}
}

Expand All @@ -125,7 +128,10 @@ bool WriteController::IsStopped() const {
return total_stopped_.load(std::memory_order_relaxed) > 0;
}

// This function is no longer under db mutex since credit_in_bytes_ is atomic
// This is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
Expand Down
2 changes: 1 addition & 1 deletion db/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class WriteController {

uint64_t TEST_GetMapMinRate();

void WaitOnCV(const ErrorHandler& error_handler);
void WaitOnCV(std::function<bool()> continue_wait);
void NotifyCV();

private:
Expand Down

0 comments on commit c4efc92

Please sign in to comment.