Skip to content

Commit

Permalink
Logger: Add reporting capabilities to the WBM (#556)
Browse files Browse the repository at this point in the history
Add Logging capabilities to the WriteController and the WriteBufferManager 
by saving the Loggers of the dbs which are using them internally and issuing
WARN msgs to these Loggers whenever the state of the WC and WBM 
changes in regards to delaying.
  • Loading branch information
Yuval-Ariel authored and udi-speedb committed Dec 6, 2023
1 parent e50643d commit 376321b
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 106 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Fix RepeatableThread to work properly with on thread start callback feature (htt

### Enhancements
* Unit Testing: Expose the disallow_trivial_move flag in the MoveFilesToLevel testing utility (#677).
* LOG Reporting: add reporting capabilities to the WriteController and the WriteBufferManager by saving the Loggers of the dbs which are using them internally and issuing WARN msgs to these Loggers whenever the state of the WC and WBM changes in regards to delaying (#556).

### Bug Fixes
* db_bench: fix SeekRandomWriteRandom valid check. Use key and value only after checking iterator is valid.
Expand Down
8 changes: 6 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1764,11 +1764,15 @@ ColumnFamilySet::ColumnFamilySet(
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
write_buffer_manager_->RegisterWriteController(write_controller_);
wbm_client_id_ = write_buffer_manager_->RegisterWCAndLogger(
write_controller_, db_options_->info_log);
wc_client_id_ = write_controller_->RegisterLogger(db_options_->info_log);
}

ColumnFamilySet::~ColumnFamilySet() {
write_buffer_manager_->DeregisterWriteController(write_controller_);
write_buffer_manager_->DeregisterWCAndLogger(
write_controller_, db_options_->info_log, wbm_client_id_);
write_controller_->DeregisterLogger(db_options_->info_log, wc_client_id_);
while (column_family_data_.size() > 0) {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,8 @@ class ColumnFamilySet {
std::shared_ptr<IOTracer> io_tracer_;
const std::string& db_id_;
std::string db_session_id_;
uint64_t wbm_client_id_ = 0;
uint64_t wc_client_id_ = 0;
};

// A wrapper for ColumnFamilySet that supports releasing DB mutex during each
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ class CompactionJobTestBase : public testing::Test {
DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_);
Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log);
ASSERT_OK(s);
// calling reset() before changing immutable db options.
versions_.reset();
db_options_.info_log = info_log;

versions_.reset(
Expand Down
117 changes: 98 additions & 19 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,28 @@
#include <atomic>
#include <cassert>
#include <chrono>
#include <cinttypes>
#include <ratio>

#include "db/error_handler.h"
#include "logging/logging.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"

namespace ROCKSDB_NAMESPACE {

std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
++total_stopped_;
if (total_stopped_ == 0) {
std::lock_guard<std::mutex> lock(loggers_map_mu_);
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC enforcing stop writes");
}
}
{
std::lock_guard<std::mutex> lock(stop_mu_);
++total_stopped_;
}
return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
}

Expand All @@ -53,6 +65,31 @@ std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
}

WriteController::WCClientId WriteController::RegisterLogger(
std::shared_ptr<Logger> logger) {
uint64_t client_id = 0;
{
std::lock_guard<std::mutex> lock(loggers_map_mu_);
assert(next_client_id_ != std::numeric_limits<uint64_t>::max());
client_id = next_client_id_++;
loggers_to_client_ids_map_[logger].insert(client_id);
}
return client_id;
}

void WriteController::DeregisterLogger(std::shared_ptr<Logger> logger,
WCClientId wc_client_id) {
std::lock_guard<std::mutex> lock(loggers_map_mu_);
assert(wc_client_id > 0); // value of 0 means the logger wasn`t registered.
assert(loggers_to_client_ids_map_.count(logger));
assert(loggers_to_client_ids_map_[logger].empty() == false);
assert(loggers_to_client_ids_map_[logger].count(wc_client_id));
loggers_to_client_ids_map_[logger].erase(wc_client_id);
if (loggers_to_client_ids_map_[logger].empty()) {
loggers_to_client_ids_map_.erase(logger);
}
}

uint64_t WriteController::TEST_GetMapMinRate() { return GetMapMinRate(); }

uint64_t WriteController::GetMapMinRate() {
Expand Down Expand Up @@ -91,22 +128,34 @@ bool WriteController::IsInRateMap(void* client_id) {
// its write_rate is higher than the delayed_write_rate_ so we need to find a
// new min from all clients via GetMapMinRate()
void WriteController::HandleNewDelayReq(void* client_id,
uint64_t cf_write_rate) {
uint64_t client_write_rate) {
assert(is_dynamic_delay());
std::lock_guard<std::mutex> lock(map_mu_);
std::unique_lock<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(client_id);
bool inserted =
id_to_write_rate_map_.insert_or_assign(client_id, cf_write_rate).second;
id_to_write_rate_map_.insert_or_assign(client_id, client_write_rate)
.second;
if (inserted) {
total_delayed_++;
}
uint64_t min_rate = delayed_write_rate();
if (cf_write_rate <= min_rate) {
min_rate = cf_write_rate;
if (client_write_rate <= min_rate) {
min_rate = client_write_rate;
} else if (was_min) {
min_rate = GetMapMinRate();
}
set_delayed_write_rate(min_rate);
lock.unlock();

{
std::lock_guard<std::mutex> logger_lock(loggers_map_mu_);
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC setting delay of %" PRIu64
", client_id: %p, client rate: %" PRIu64,
min_rate, client_id, client_write_rate);
}
}
}

// Checks if the client is in the id_to_write_rate_map_ , if it is:
Expand All @@ -116,14 +165,26 @@ void WriteController::HandleNewDelayReq(void* client_id,
// 4. if total_delayed_ == 0, reset next_refill_time_ and credit_in_bytes_
void WriteController::HandleRemoveDelayReq(void* client_id) {
assert(is_dynamic_delay());
std::unique_lock<std::mutex> lock(map_mu_);
if (!IsInRateMap(client_id)) {
return;
}
bool was_min = RemoveDelayReq(client_id);
uint64_t min_rate = 0;
if (was_min) {
min_rate = GetMapMinRate();
set_delayed_write_rate(min_rate);
}
lock.unlock();

{
std::lock_guard<std::mutex> lock(map_mu_);
if (!IsInRateMap(client_id)) {
return;
}
bool was_min = RemoveDelayReq(client_id);
if (was_min) {
set_delayed_write_rate(GetMapMinRate());
std::string if_min_str =
was_min ? "WC setting delay of " + std::to_string(min_rate) : "";
std::lock_guard<std::mutex> logger_lock(loggers_map_mu_);
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC removed client_id: %p . %s", client_id,
if_min_str.c_str());
}
}
MaybeResetCounters();
Expand All @@ -138,11 +199,22 @@ bool WriteController::RemoveDelayReq(void* client_id) {
}

void WriteController::MaybeResetCounters() {
std::lock_guard<std::mutex> lock(metrics_mu_);
if (total_delayed_ == 0) {
// reset counters.
next_refill_time_ = 0;
credit_in_bytes_ = 0;
bool zero_delayed = false;
{
std::lock_guard<std::mutex> lock(metrics_mu_);
if (total_delayed_ == 0) {
// reset counters.
next_refill_time_ = 0;
credit_in_bytes_ = 0;
zero_delayed = true;
}
}
if (zero_delayed) {
std::lock_guard<std::mutex> logger_lock(loggers_map_mu_);
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC no longer enforcing delay");
}
}
}

Expand Down Expand Up @@ -235,7 +307,14 @@ void WriteController::NotifyCV() {
std::lock_guard<std::mutex> lock(stop_mu_);
--total_stopped_;
}
stop_cv_.notify_all();
if (total_stopped_ == 0) {
stop_cv_.notify_all();
std::lock_guard<std::mutex> lock(loggers_map_mu_);
for (auto& logger_and_clients : loggers_to_client_ids_map_) {
ROCKS_LOG_WARN(logger_and_clients.first.get(),
"WC no longer enforcing stop writes");
}
}
}

StopWriteToken::~StopWriteToken() { controller_->NotifyCV(); }
Expand Down
65 changes: 48 additions & 17 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CacheReservationManager;
class InstrumentedMutex;
class InstrumentedCondVar;
class WriteController;
class Logger;

// Interface to block and signal DB instances, intended for RocksDB
// internal use only. Each DB instance contains ptr to StallInterface.
Expand Down Expand Up @@ -304,12 +305,22 @@ class WriteBufferManager final {
public:
uint16_t get_start_delay_percent() const { return start_delay_percent_; }

// Add this Write Controller(WC) to controllers_to_refcount_map_
// which the WBM is responsible for updating (when stalling is allowed).
// each time db is opened with this WC-WBM, add a ref count so we know when
// to remove this WC from the WBM when the last is no longer used.
void RegisterWriteController(std::shared_ptr<WriteController> wc);
void DeregisterWriteController(std::shared_ptr<WriteController> wc);
using WBMClientId = uint64_t;
using WBMClientIds = std::unordered_set<WBMClientId>;

// Add this WriteController(WC) and Logger to controllers_to_client_ids_map_
// and loggers_to_client_ids_map_ respectively.
// The WBM is responsible for updating (when stalling is allowed) these WCs
// and report through the Loggers.
// The connection between the WC and the Loggers can be looked up through
// controllers_to_loggers_map_ which this method also populates.
// When registering, a WBMClientId is returned which is later required for
// deregistering.
WBMClientId RegisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger);
void DeregisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger,
WBMClientId wbm_client_id);

private:
// The usage + delay factor are coded in a single (atomic) uint64_t value as
Expand Down Expand Up @@ -342,24 +353,44 @@ class WriteBufferManager final {
std::atomic<uint64_t> coded_usage_state_ = kNoneCodedUsageState;

private:
// returns true if wc was removed from controllers_to_refcount_map_
// which means its ref count reached 0.
bool RemoveFromControllersMap(std::shared_ptr<WriteController> wc);
// When Closing the db, remove this WC/Logger - wbm_client_id from its
// corresponding map. Returns true if the ptr (WC or Logger) is removed from
// the map when it has no more wbm_client_id. Meaning no db is using this
// WC/Logger with this WBM.
template <typename SharedPtrType>
bool RemoveFromMap(const SharedPtrType& ptr, WBMClientId wbm_client_id,
std::mutex& mutex,
std::unordered_map<SharedPtrType, WBMClientIds>& map);

void UpdateControllerDelayState();

void ResetDelay();
void ResetDelay(UsageState usage_state, WriteController* wc,
const std::unordered_set<Logger*>& loggers);

void WBMSetupDelay(uint64_t delay_factor);
void WBMSetupDelay(uint64_t delay_factor, WriteController* wc,
const std::unordered_set<Logger*>& loggers);

// a list of all write controllers which are associated with this WBM.
// the WBM needs to update them when its delay requirements change.
// the key is the WC to update and the value is a ref count of how many dbs
// are using this WC with the WBM.
std::unordered_map<std::shared_ptr<WriteController>, uint64_t>
controllers_to_refcount_map_;
// A map of all write controllers which are associated with this WBM.
// The WBM needs to update them when its delay requirements change.
// The key is the WC to update and the value is an unordered_set of all
// wbm_client_ids opened with the WC. The WBMClientIds are used as unique
// identifiers of the connection between the WC and the db.
std::unordered_map<std::shared_ptr<WriteController>, WBMClientIds>
controllers_to_client_ids_map_;
std::mutex controllers_map_mutex_;

// a map of Loggers similar to the above controllers_to_client_ids_map_.
std::unordered_map<std::shared_ptr<Logger>, WBMClientIds>
loggers_to_client_ids_map_;
std::mutex loggers_map_mutex_;

WBMClientId next_client_id_ = 1;
using Loggers = std::unordered_set<Logger*>;
// a map used to bind the Loggers to a specific WC so that the reports
// regarding a specific WC are sent through the right Logger.
// protected with controllers_map_mutex_
std::unordered_map<WriteController*, Loggers> controllers_to_loggers_map_;

private:
std::atomic<size_t> buffer_size_;
std::atomic<size_t> mutable_limit_;
Expand Down
21 changes: 20 additions & 1 deletion include/rocksdb/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace ROCKSDB_NAMESPACE {
class SystemClock;
class WriteControllerToken;
class ErrorHandler;
class Logger;

// WriteController is controlling write stalls in our write code-path. Write
// stalls happen when compaction can't keep up with write rate.
// All of the methods here (including WriteControllerToken's destructors) need
Expand Down Expand Up @@ -89,6 +91,13 @@ class WriteController {
// Prerequisite: DB mutex held.
uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes);

using WCClientId = uint64_t;
using WCClientIds = std::unordered_set<WCClientId>;

WCClientId RegisterLogger(std::shared_ptr<Logger> logger);
void DeregisterLogger(std::shared_ptr<Logger> logger,
WCClientId wc_client_id);

void set_delayed_write_rate(uint64_t write_rate) {
std::lock_guard<std::mutex> lock(metrics_mu_);
// avoid divide 0
Expand Down Expand Up @@ -126,7 +135,7 @@ class WriteController {
// and the Id (void*) is simply the pointer to their obj
using ClientIdToRateMap = std::unordered_map<void*, uint64_t>;

void HandleNewDelayReq(void* client_id, uint64_t cf_write_rate);
void HandleNewDelayReq(void* client_id, uint64_t client_write_rate);

// Removes a client's delay and updates the Write Controller's effective
// delayed write rate if applicable
Expand Down Expand Up @@ -159,6 +168,15 @@ class WriteController {
// The mutex used by stop_cv_
std::mutex stop_mu_;
std::condition_variable stop_cv_;

WCClientId next_client_id_ = 1;
// a map of Loggers to report to. The same Logger can be passed to several dbs
// so its required to save all the WCClientIds that were opened with this
// Logger.
std::unordered_map<std::shared_ptr<Logger>, WCClientIds>
loggers_to_client_ids_map_;
std::mutex loggers_map_mu_;

/////// end of methods and members used when dynamic_delay_ == true. ///////

uint64_t NowMicrosMonotonic(SystemClock* clock);
Expand All @@ -174,6 +192,7 @@ class WriteController {

// mutex to protect below 4 members which is required when WriteController is
// shared across several dbs.
// Sometimes taken under map_mu_ So never take metrics_mu_ and then map_mu_
std::mutex metrics_mu_;
// Number of bytes allowed to write without delay
std::atomic<uint64_t> credit_in_bytes_;
Expand Down
Loading

0 comments on commit 376321b

Please sign in to comment.