Skip to content

Commit

Permalink
Code Review Updates #2
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb committed Aug 25, 2022
1 parent d9ff532 commit 5410240
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 59 deletions.
2 changes: 1 addition & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class MemTable {
void SetFlushInProgress(bool in_progress) {
if (in_progress && (flush_in_progress_ == false)) {
assert(!flush_completed_);
mem_tracker_.FreeMemBegin();
mem_tracker_.FreeMemStarted();
// In case flush is aborted, notify the memory tracker
} else if ((in_progress == false) && flush_in_progress_) {
mem_tracker_.FreeMemAborted();
Expand Down
4 changes: 2 additions & 2 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class WriteBufferManager final {
return memory_inactive_.load(std::memory_order_relaxed);
}

// Returns the total memory used by active memtables.
// Returns the total memory marked to be freed but not yet actually freed
size_t memtable_memory_being_freed_usage() const {
return memory_being_freed_.load(std::memory_order_relaxed);
}
Expand Down Expand Up @@ -166,7 +166,7 @@ class WriteBufferManager final {
// The process may complete successfully and FreeMem() will be called to
// notifiy successfull completion, or, aborted, and FreeMemCancelled() will be
// called to notify that.
void FreeMemBegin(size_t mem);
void FreeMemStarted(size_t mem);

// Freeing 'mem' bytes was aborted and that memory is no longer in the process
// of being freed
Expand Down
34 changes: 25 additions & 9 deletions memory/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,36 @@ class AllocTracker {
// Call when we're finished allocating memory so we can free it from
// the write buffer's limit.
void DoneAllocating();

void FreeMemBegin();
void FreeMemStarted();
void FreeMemAborted();
void FreeMem();

bool is_being_freed() const { return is_memory_being_freed_; }
bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; }
bool HasMemoryFreeingStarted() const {
return (state_ == State::kFreeMemStarted);
}

bool IsMemoryFreed() const { return (state_ == State::kFreed); }

private:
enum class State {
kNone,
kAllocating,
kDoneAllocating,
kFreeMemStarted,
kFreed
};

private:
bool ShouldUpdateWriteBufferManager() const {
return ((write_buffer_manager_ != nullptr) &&
(write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()));
}

private:
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
bool done_allocating_;
bool is_memory_being_freed_ = false;
bool freed_ = false;
WriteBufferManager* write_buffer_manager_ = nullptr;
State state_ = State::kNone;
std::atomic<size_t> bytes_allocated_ = 0U;
};

} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion memory/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size)

Arena::~Arena() {
if (tracker_ != nullptr) {
assert(tracker_->is_freed());
assert(tracker_->IsMemoryFreed());
tracker_->FreeMem();
}
for (const auto& block : blocks_) {
Expand Down
85 changes: 41 additions & 44 deletions memtable/alloc_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,81 +15,78 @@
namespace ROCKSDB_NAMESPACE {

AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager)
: write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0),
done_allocating_(false),
is_memory_being_freed_(false),
freed_(false) {}
: write_buffer_manager_(write_buffer_manager), bytes_allocated_(0) {}

AllocTracker::~AllocTracker() { FreeMem(); }

void AllocTracker::Allocate(size_t bytes) {
assert(write_buffer_manager_ != nullptr);
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
assert((state_ == State::kNone) || (state_ == State::kAllocating));

if (ShouldUpdateWriteBufferManager() &&
((state_ == State::kNone) || (state_ == State::kAllocating))) {
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
state_ = State::kAllocating;
}

void AllocTracker::DoneAllocating() {
if (write_buffer_manager_ != nullptr && !done_allocating_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
write_buffer_manager_->ScheduleFreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
done_allocating_ = true;
assert(write_buffer_manager_ != nullptr);

if (ShouldUpdateWriteBufferManager() && (state_ == State::kAllocating)) {
write_buffer_manager_->ScheduleFreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
state_ = State::kDoneAllocating;
}

void AllocTracker::FreeMemBegin() {
assert(!is_memory_being_freed_);
assert(!freed_);
assert(done_allocating_);
void AllocTracker::FreeMemStarted() {
assert(write_buffer_manager_ != nullptr);
assert(state_ == State::kDoneAllocating);

is_memory_being_freed_ = true;
write_buffer_manager_->FreeMemBegin(
bytes_allocated_.load(std::memory_order_relaxed));
if (ShouldUpdateWriteBufferManager() && (state_ == State::kDoneAllocating)) {
write_buffer_manager_->FreeMemStarted(
bytes_allocated_.load(std::memory_order_relaxed));
}
state_ = State::kFreeMemStarted;
}

void AllocTracker::FreeMemAborted() {
assert(done_allocating_);
assert(is_memory_being_freed_);
assert(!freed_);
assert(write_buffer_manager_ != nullptr);
assert(state_ == State::kFreeMemStarted);

is_memory_being_freed_ = false;
if (write_buffer_manager_ != nullptr) {
if (ShouldUpdateWriteBufferManager() && (state_ == State::kFreeMemStarted)) {
write_buffer_manager_->FreeMemAborted(
bytes_allocated_.load(std::memory_order_relaxed));
}
state_ = State::kDoneAllocating;
}

void AllocTracker::FreeMem() {
if (freed_) {
return;
}
if (state_ != State::kNone) {
if (state_ == State::kAllocating) {
DoneAllocating();
}

if (!done_allocating_) {
DoneAllocating();
}
// This is necessary so that the WBM will not decrease the memory being freed
// twice in case memory freeing was aborted and then freed via this call
if (!is_memory_being_freed_) {
FreeMemBegin();
}
if (write_buffer_manager_ != nullptr && !freed_) {
if (write_buffer_manager_->enabled() ||
write_buffer_manager_->cost_to_cache()) {
// This is necessary so that the WBM will not decrease the memory being
// freed twice in case memory freeing was aborted and then freed via this
// call
if (state_ == State::kDoneAllocating) {
FreeMemStarted();
}

if (ShouldUpdateWriteBufferManager() &&
(state_ == State::kFreeMemStarted)) {
write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
is_memory_being_freed_ = false;
freed_ = true;
}
bytes_allocated_ = 0U;
state_ = State::kFreed;
}
} // namespace ROCKSDB_NAMESPACE
2 changes: 1 addition & 1 deletion memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void WriteBufferManager::ScheduleFreeMem(size_t mem) {
}
}

void WriteBufferManager::FreeMemBegin(size_t mem) {
void WriteBufferManager::FreeMemStarted(size_t mem) {
if (enabled()) {
memory_being_freed_.fetch_add(mem, std::memory_order_relaxed);
}
Expand Down
2 changes: 1 addition & 1 deletion memtable/write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const size_t kSizeDummyEntry = 256 * 1024;

namespace {
void BeginAndFree(WriteBufferManager& wbf, size_t size) {
wbf.FreeMemBegin(size);
wbf.FreeMemStarted(size);
wbf.FreeMem(size);
}

Expand Down

0 comments on commit 5410240

Please sign in to comment.