Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WC: fix for stop while shutting down #499

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Based on RocksDB 8.1.1
* stress test: fix decoding error (#498)
* db_bench and stress: fix WBM initiation (#510)
* Sanitize max_num_parallel_flushes in WBM if 0 (#460)
* WriteController: fix for stop while shutting down (#499)
Also switch to waiting a sec on the CV each time. This is required since a bg error doesn't signal the CV in the WriteController.

### Miscellaneous
* disable failing unit tests and paired bloom filter stress testing
* version: update Speedb patch version to 2.4.1 (#503)
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ class ColumnFamilyData {
const MutableCFOptions& mutable_cf_options,
WriteStallCause& write_stall_cause);

void TEST_ResetWriteControllerToken() { write_controller_token_.reset(); }

private:
std::unique_ptr<WriteControllerToken> DynamicSetupDelay(
WriteController* write_controller, uint64_t compaction_needed_bytes,
Expand Down
29 changes: 15 additions & 14 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1810,13 +1810,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
mutex_.AssertHeld();
uint64_t time_delayed = 0;
bool delayed = false;
bool stopped = false;
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
// To avoid parallel timed delays (bad throttling), only support them
// on the primary write queue.
uint64_t delay;
// TODO: yuval - check whether db_mutex can be unlocked during GetDelay
if (&write_thread == &write_thread_) {
delay =
write_controller_->GetDelay(immutable_db_options_.clock, num_bytes);
Expand Down Expand Up @@ -1865,7 +1865,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
}
delayed = true;
stopped = true;

// Notify write_thread about the stall so it can setup a barrier and
// fail any pending writers with no_slowdown
Expand All @@ -1876,16 +1876,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
TEST_SYNC_POINT("DBImpl::DelayWrite:NonmemWait");
}
mutex_.Unlock();

write_controller_->WaitOnCV(error_handler_);
auto continue_wait = [this]() -> bool {
return (this->error_handler_.GetBGError().ok() &&
!(this->shutting_down_.load(std::memory_order_relaxed)));
};
write_controller_->WaitOnCV(continue_wait);

mutex_.Lock();
TEST_SYNC_POINT_CALLBACK("DBImpl::DelayWrite:AfterWait", &mutex_);
write_thread.EndWriteStall();
}
}
assert(!delayed || !write_options.no_slowdown);
if (delayed) {
assert((!delayed && !stopped) || !write_options.no_slowdown);
if (delayed || stopped) {
default_cf_internal_stats_->AddDBStats(
InternalStats::kIntStatsWriteStallMicros, time_delayed);
RecordTick(stats_, STALL_MICROS, time_delayed);
Expand All @@ -1895,14 +1898,12 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
// writes, we can ignore any background errors and allow the write to
// proceed
Status s;
if (write_controller_->IsStopped()) {
if (!shutting_down_.load(std::memory_order_relaxed)) {
// If writes are still stopped and db not shutdown, it means we bailed
// due to a background error
s = Status::Incomplete(error_handler_.GetBGError().ToString());
} else {
s = Status::ShutdownInProgress("stalled writes");
}
if (stopped && shutting_down_.load(std::memory_order_relaxed)) {
s = Status::ShutdownInProgress("stalled writes");
} else if (write_controller_->IsStopped()) {
// If writes are still stopped and db not shutdown, it means we bailed
// due to a background error
s = Status::Incomplete(error_handler_.GetBGError().ToString());
}
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
Expand Down
6 changes: 5 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7340,7 +7340,7 @@ TEST_F(DBTest, MemoryUsageWithMaxWriteBufferSizeToMaintain) {
}
}

TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) {
TEST_F(DBTest, ShuttingDownNotBlockStalledWrites) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
Expand Down Expand Up @@ -7377,6 +7377,10 @@ TEST_F(DBTest, DISABLED_ShuttingDownNotBlockStalledWrites) {

TEST_SYNC_POINT("DBTest::ShuttingDownNotBlockStalledWrites");
CancelAllBackgroundWork(db_, true);
// In addition to raising the shutting_down_ flag, we need to reset the Write
// Controller tokens since only the detor of the StopWriteToken wakes up the
// condition variable which the stopped thread is waiting on.
ResetWriteControllerTokens(dbfull());

thd.join();
}
Expand Down
7 changes: 7 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,13 @@ void VerifySstUniqueIds(const TablePropertiesCollection& props) {
}
}

void DBTestBase::ResetWriteControllerTokens(DBImpl* db) {
auto versions = db->GetVersionSet();
for (auto* cfd : versions->GetRefedColumnFamilySet()) {
cfd->TEST_ResetWriteControllerToken();
}
}

template <CacheEntryRole R>
TargetCacheChargeTrackingCache<R>::TargetCacheChargeTrackingCache(
std::shared_ptr<Cache> target)
Expand Down
2 changes: 2 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ class DBTestBase : public testing::Test {
// supported
void SetTimeElapseOnlySleepOnReopen(DBOptions* options);

void ResetWriteControllerTokens(DBImpl* db);

private: // Prone to error on direct use
void MaybeInstallTimeElapseOnlySleep(const DBOptions& options);

Expand Down
14 changes: 10 additions & 4 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <mutex>
#include <ratio>

Expand Down Expand Up @@ -106,11 +107,13 @@ uint64_t WriteController::InsertToMapAndGetMinRate(uint32_t id,
}
}

void WriteController::WaitOnCV(const ErrorHandler& error_handler) {
void WriteController::WaitOnCV(std::function<bool()> continue_wait) {
std::unique_lock<std::mutex> lock(stop_mu_);
while (error_handler.GetBGError().ok() && IsStopped()) {
while (continue_wait() && IsStopped()) {
TEST_SYNC_POINT("WriteController::WaitOnCV");
stop_cv_.wait(lock);
// need to time the wait since the stop_cv_ is not signalled if a bg error
// is raised.
stop_cv_.wait_for(lock, std::chrono::seconds(1));
}
}

Expand All @@ -125,7 +128,10 @@ bool WriteController::IsStopped() const {
return total_stopped_.load(std::memory_order_relaxed) > 0;
}

// This function is no longer under db mutex since credit_in_bytes_ is atomic
// This is inside the calling DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
Expand Down
2 changes: 1 addition & 1 deletion db/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class WriteController {

uint64_t TEST_GetMapMinRate();

void WaitOnCV(const ErrorHandler& error_handler);
void WaitOnCV(std::function<bool()> continue_wait);
void NotifyCV();

private:
Expand Down