Skip to content

Commit

Permalink
Report WBM
Browse files Browse the repository at this point in the history
This commit adds reporting capabilities to the WBM. Specifically, for when
it sets or resets a delay request to the WriteController.
This is done by adding the Logger to the WBM during DB Open,
in the Ctor of ColumnFamilySet together with the WC.
  • Loading branch information
Yuval-Ariel committed Jun 13, 2023
1 parent 6bf9458 commit c21c4e4
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 64 deletions.
6 changes: 4 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1674,11 +1674,13 @@ ColumnFamilySet::ColumnFamilySet(
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
write_buffer_manager_->RegisterWriteController(write_controller_);
write_buffer_manager_->RegisterWCAndLogger(
write_controller_, db_options_->info_log, db_session_id_);
}

ColumnFamilySet::~ColumnFamilySet() {
write_buffer_manager_->DeregisterWriteController(write_controller_);
write_buffer_manager_->DeregisterWCAndLogger(
write_controller_, db_options_->info_log, db_session_id_);
while (column_family_data_.size() > 0) {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
Expand Down
11 changes: 7 additions & 4 deletions db/write_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,24 @@ bool WriteController::IsInRateMap(void* client_id) {
// its write_rate is higher than the delayed_write_rate_ so we need to find a
// new min from all clients via GetMapMinRate()
void WriteController::HandleNewDelayReq(void* client_id,
uint64_t cf_write_rate) {
uint64_t client_write_rate) {
assert(is_dynamic_delay());
std::lock_guard<std::mutex> lock(map_mu_);
bool was_min = IsMinRate(client_id);
bool inserted =
id_to_write_rate_map_.insert_or_assign(client_id, cf_write_rate).second;
id_to_write_rate_map_.insert_or_assign(client_id, client_write_rate)
.second;
if (inserted) {
total_delayed_++;
}
uint64_t min_rate = delayed_write_rate();
if (cf_write_rate <= min_rate) {
min_rate = cf_write_rate;
if (client_write_rate <= min_rate) {
min_rate = client_write_rate;
} else if (was_min) {
min_rate = GetMapMinRate();
}
// TODO: print client delay request, whether was_min, the previous
// delayed_write_rate and the final rate to set.
set_delayed_write_rate(min_rate);
}

Expand Down
2 changes: 1 addition & 1 deletion db/write_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class WriteController {
// and the Id (void*) is simply the pointer to their obj
using ClientIdToRateMap = std::unordered_map<void*, uint64_t>;

void HandleNewDelayReq(void* client_id, uint64_t cf_write_rate);
void HandleNewDelayReq(void* client_id, uint64_t client_write_rate);

// Removes a client's delay and updates the Write Controller's effective
// delayed write rate if applicable
Expand Down
56 changes: 40 additions & 16 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,18 @@ class WriteBufferManager final {
public:
uint16_t get_start_delay_percent() const { return start_delay_percent_; }

// Add this Write Controller(WC) to controllers_to_refcount_map_
// which the WBM is responsible for updating (when stalling is allowed).
// each time db is opened with this WC-WBM, add a ref count so we know when
// to remove this WC from the WBM when the last is no longer used.
void RegisterWriteController(std::shared_ptr<WriteController> wc);
void DeregisterWriteController(std::shared_ptr<WriteController> wc);
// Add this WriteController(WC) and Logger to controllers_to_session_ids_map_
// and loggers_to_session_ids_map_ respectively.
// The WBM is responsible for updating (when stalling is allowed) these WCs
// and report through the Loggers.
// The connection between the WC and the Loggers can be looked up through
// controllers_to_loggers_map_ which this method also populates.
void RegisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger,
std::string db_session_id);
void DeregisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger,
std::string db_session_id);

private:
// The usage + delay factor are coded in a single (atomic) uint64_t value as
Expand Down Expand Up @@ -320,24 +326,42 @@ class WriteBufferManager final {
std::atomic<uint64_t> coded_usage_state_ = kNoneCodedUsageState;

private:
// returns true if wc was removed from controllers_to_refcount_map_
// which means its ref count reached 0.
bool RemoveFromControllersMap(std::shared_ptr<WriteController> wc);
using DbSessionIDs = std::unordered_set<std::string>;
// When Closing the db, remove this WC/Logger - db_session_id from its
// corresponding map. Returns true if the ptr (WC or Logger) is removed from
// the map when it has no more db_session_id. Meaning no db is using this
// WC/Logger with this WBM
template <typename SharedPtrType>
bool RemoveFromMap(SharedPtrType ptr, std::string db_session_id,
std::mutex& mutex,
std::unordered_map<SharedPtrType, DbSessionIDs>& map);

void UpdateControllerDelayState();

void ResetDelay();
void ResetDelay(UsageState usage_state);

void WBMSetupDelay(uint64_t delay_factor);

// a list of all write controllers which are associated with this WBM.
// the WBM needs to update them when its delay requirements change.
// the key is the WC to update and the value is a ref count of how many dbs
// are using this WC with the WBM.
std::unordered_map<std::shared_ptr<WriteController>, uint64_t>
controllers_to_refcount_map_;
// a map of all write controllers which are associated with this WBM.
// The WBM needs to update them when its delay requirements change.
// The key is the WC to update and the value is an unordered_set of all
// db_session_ids opened with the WC. The DbSessionIDs are used as unique
// identifiers of the connection between the WC and the db.
std::unordered_map<std::shared_ptr<WriteController>, DbSessionIDs>
controllers_to_session_ids_map_;
std::mutex controllers_map_mutex_;

// a map of Loggers similar to the above controllers_to_session_ids_map_.
std::unordered_map<std::shared_ptr<Logger>, DbSessionIDs>
loggers_to_session_ids_map_;
std::mutex loggers_map_mutex_;

using Loggers = std::unordered_set<Logger*>;
// a map used to bind the Loggers to a specific WC so that the reports
// regarding a specific WC are sent through the right Logger. uses
// controllers_map_mutex_
std::unordered_map<WriteController*, Loggers> controllers_to_loggers_map_;

private:
std::atomic<size_t> buffer_size_;
std::atomic<size_t> mutable_limit_;
Expand Down
113 changes: 77 additions & 36 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/db_impl/db_impl.h"
#include "db/write_controller.h"
#include "logging/logging.h"
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
Expand Down Expand Up @@ -317,35 +320,57 @@ std::string WriteBufferManager::GetPrintableOptions() const {
return ret;
}

void WriteBufferManager::RegisterWriteController(
std::shared_ptr<WriteController> wc) {
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
if (controllers_to_refcount_map_.count(wc)) {
++controllers_to_refcount_map_[wc];
} else {
controllers_to_refcount_map_.insert({wc, 1});
void WriteBufferManager::RegisterWCAndLogger(
std::shared_ptr<WriteController> wc, std::shared_ptr<Logger> logger,
std::string db_session_id) {
{
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
controllers_to_session_ids_map_[wc].insert(db_session_id);
controllers_to_loggers_map_[wc.get()].insert(logger.get());
}
}

void WriteBufferManager::DeregisterWriteController(
std::shared_ptr<WriteController> wc) {
bool last_entry = RemoveFromControllersMap(wc);
if (last_entry && wc->is_dynamic_delay()) {
wc->HandleRemoveDelayReq(this);
{
std::lock_guard<std::mutex> lock(loggers_map_mutex_);
loggers_to_session_ids_map_[logger].insert(db_session_id);
}
}

// UDI - made this member function instead of a free function so that
// DbSessionIDs is recognized.
template <typename SharedPtrType>
bool WriteBufferManager::RemoveFromMap(
SharedPtrType ptr, std::string db_session_id, std::mutex& mutex,
std::unordered_map<SharedPtrType, DbSessionIDs>& map) {
std::lock_guard<std::mutex> lock(mutex);
assert(map.count(ptr));
assert(map[ptr].empty() == false);
assert(map[ptr].count(db_session_id));
map[ptr].erase(db_session_id);
if (map[ptr].empty()) {
map.erase(ptr);
return true;
} else {
return false;
}
}

bool WriteBufferManager::RemoveFromControllersMap(
std::shared_ptr<WriteController> wc) {
void WriteBufferManager::DeregisterWCAndLogger(
std::shared_ptr<WriteController> wc, std::shared_ptr<Logger> logger,
std::string db_session_id) {
bool last_logger = RemoveFromMap(logger, db_session_id, loggers_map_mutex_,
loggers_to_session_ids_map_);
bool last_controller =
RemoveFromMap(wc, db_session_id, controllers_map_mutex_,
controllers_to_session_ids_map_);
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
assert(controllers_to_refcount_map_.count(wc));
assert(controllers_to_refcount_map_[wc] > 0);
--controllers_to_refcount_map_[wc];
if (controllers_to_refcount_map_[wc] == 0) {
controllers_to_refcount_map_.erase(wc);
return true;
} else {
return false;
if (last_controller) {
assert(wc.unique() == false);
if (wc->is_dynamic_delay()) {
wc->HandleRemoveDelayReq(this);
}
controllers_to_loggers_map_.erase(wc.get());
} else if (last_logger) {
assert(controllers_to_loggers_map_.count(wc.get()));
controllers_to_loggers_map_[wc.get()].erase(logger.get());
}
}

Expand Down Expand Up @@ -390,30 +415,46 @@ uint64_t CalcDelayFromFactor(uint64_t max_write_rate, uint64_t delay_factor) {

void WriteBufferManager::WBMSetupDelay(uint64_t delay_factor) {
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
for (auto& wc_and_ref_count : controllers_to_refcount_map_) {
// make sure that controllers_to_refcount_map_ does not hold
for (auto& wc_and_session_id : controllers_to_session_ids_map_) {
// make sure that controllers_to_session_ids_map_ does not hold
// the last ref to the WC.
assert(wc_and_ref_count.first.unique() == false);
assert(wc_and_session_id.first.unique() == false);
// the final rate depends on the write controllers max rate so
// each wc can receive a different delay requirement.
WriteController* wc = wc_and_ref_count.first.get();
WriteController* wc = wc_and_session_id.first.get();
if (wc->is_dynamic_delay()) {
uint64_t wbm_write_rate =
CalcDelayFromFactor(wc->max_delayed_write_rate(), delay_factor);
for (auto logger : controllers_to_loggers_map_[wc]) {
ROCKS_LOG_WARN(logger,
"WBM (%p) sets a delay requirement of %" PRIu64
" using WC (%p) ",
this, wbm_write_rate, wc);
}
wc->HandleNewDelayReq(this, wbm_write_rate);
}
}
}

void WriteBufferManager::ResetDelay() {
void WriteBufferManager::ResetDelay(UsageState usage_state) {
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
for (auto& wc_and_ref_count : controllers_to_refcount_map_) {
// make sure that controllers_to_refcount_map_ does not hold the last ref to
// the WC since holding the last ref means that the last DB that was using
// this WC has destructed and using this WC is no longer valid.
assert(wc_and_ref_count.first.unique() == false);
WriteController* wc = wc_and_ref_count.first.get();
for (auto& wc_and_session_id : controllers_to_session_ids_map_) {
// make sure that controllers_to_session_ids_map_ does not hold the last ref
// to the WC since holding the last ref means that the last DB that was
// using this WC has destructed and using this WC is no longer valid.
assert(wc_and_session_id.first.unique() == false);
WriteController* wc = wc_and_session_id.first.get();
if (wc->is_dynamic_delay()) {
auto usage_state_str = "No Delay";
if (usage_state == UsageState::kStop) {
usage_state_str = "Max memory reached";
}
for (auto logger : controllers_to_loggers_map_[wc]) {
ROCKS_LOG_WARN(logger,
"WBM (%p) resets its delay requirement using WC - %p. "
"UsageState is: %s",
this, wc, usage_state_str);
}
wc->HandleRemoveDelayReq(this);
}
}
Expand All @@ -427,7 +468,7 @@ void WriteBufferManager::UpdateControllerDelayState() {
} else {
// check if this WMB has an active delay request.
// if yes, remove it and maybe set a different rate.
ResetDelay();
ResetDelay(usage_state);
}
// TODO: things to report:
// 1. that WBM initiated reset/delay.
Expand Down
12 changes: 7 additions & 5 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4956,12 +4956,14 @@ class Benchmark {
}
}

if (FLAGS_use_dynamic_delay && FLAGS_num_multi_db > 1) {
if (options.delayed_write_rate <= 0) {
options.delayed_write_rate = 16 * 1024 * 1024;
if (options.write_controller == nullptr) {
if (FLAGS_use_dynamic_delay && FLAGS_num_multi_db > 1) {
if (options.delayed_write_rate <= 0) {
options.delayed_write_rate = 16 * 1024 * 1024;
}
options.write_controller.reset(new WriteController(
options.use_dynamic_delay, options.delayed_write_rate));
}
options.write_controller.reset(new WriteController(
options.use_dynamic_delay, options.delayed_write_rate));
}

// Integrated BlobDB
Expand Down

0 comments on commit c21c4e4

Please sign in to comment.