Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 75 additions & 60 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1332,8 +1332,12 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
db_arr_[db_ind]->prime.GetSegmentCount();
}

pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id,
size_t increase_goal_bytes) {
pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStepAtomic(DbIndex db_ind,
size_t starting_segment_id,
size_t increase_goal_bytes) {
// Disable flush journal changes to prevent preemtion
journal::JournalFlushGuard journal_flush_guard(shard_owner()->journal());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller to this function also uses journal::JournalFlushGuard
In this class distructor we do journal_->SetFlushMode(true);
This means that the logic now is broken if there are other calls to journal not inside FreeMemWithEvictionStepAtomic

  1. I would add a dcheck in JournalFlushGuard to see we do not have nested calls to it.
  2. why did you add it here if its also in the caller?

FiberAtomicGuard guard;
DCHECK(!owner_->IsReplica());

size_t evicted_items = 0, evicted_bytes = 0;
Expand All @@ -1360,60 +1364,55 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_;
vector<string> keys_to_journal;

{
FiberAtomicGuard guard;
for (int32_t slot_id = num_slots - 1; slot_id >= 0; --slot_id) {
for (int32_t bucket_id = num_buckets - 1; bucket_id >= 0; --bucket_id) {
// pick a random segment to start with in each eviction,
// as segment_id does not imply any recency, and random selection should be fair enough
int32_t segment_id = starting_segment_id;
for (size_t num_seg_visited = 0; num_seg_visited < max_segment_to_consider;
++num_seg_visited, segment_id = GetNextSegmentForEviction(segment_id, db_ind)) {
const auto& bucket = db_table->prime.GetSegment(segment_id)->GetBucket(bucket_id);
if (bucket.IsEmpty())
continue;

if (!bucket.IsBusy(slot_id))
continue;

auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id);
if (evict_it->first.IsSticky() || !evict_it->second.HasAllocated())
continue;

// check if the key is locked by looking up transaction table.
const auto& lt = db_table->trans_locks;
string_view key = evict_it->first.GetSlice(&tmp);
if (lt.Find(LockTag(key)).has_value())
continue;

if (record_keys)
keys_to_journal.emplace_back(key);

evicted_bytes += evict_it->first.MallocUsed() + evict_it->second.MallocUsed();
++evicted_items;
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());

// returns when whichever condition is met first
if ((evicted_items == max_eviction_per_hb) || (evicted_bytes >= increase_goal_bytes))
goto finish;
}
for (int32_t slot_id = num_slots - 1; slot_id >= 0; --slot_id) {
Copy link
Contributor Author

@kostasrim kostasrim Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No functional changes in this block of code. I moved the FiberAtomicGuard above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adiholden I can also move back the Guard locally so the diff is cleaner

for (int32_t bucket_id = num_buckets - 1; bucket_id >= 0; --bucket_id) {
// pick a random segment to start with in each eviction,
// as segment_id does not imply any recency, and random selection should be fair enough
int32_t segment_id = starting_segment_id;
for (size_t num_seg_visited = 0; num_seg_visited < max_segment_to_consider;
++num_seg_visited, segment_id = GetNextSegmentForEviction(segment_id, db_ind)) {
const auto& bucket = db_table->prime.GetSegment(segment_id)->GetBucket(bucket_id);
if (bucket.IsEmpty() || !bucket.IsBusy(slot_id))
continue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only difference is I combined those two statements


auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id);
if (evict_it->first.IsSticky() || !evict_it->second.HasAllocated())
continue;

// check if the key is locked by looking up transaction table.
const auto& lt = db_table->trans_locks;
string_view key = evict_it->first.GetSlice(&tmp);
if (lt.Find(LockTag(key)).has_value())
continue;

if (record_keys)
keys_to_journal.emplace_back(key);

evicted_bytes += evict_it->first.MallocUsed() + evict_it->second.MallocUsed();
++evicted_items;
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());

// returns when whichever condition is met first
if ((evicted_items == max_eviction_per_hb) || (evicted_bytes >= increase_goal_bytes))
goto finish;
}
}
} // FiberAtomicGuard
}

finish:

// send the deletion to the replicas.
// fiber preemption could happen in this phase.
for (string_view key : keys_to_journal) {
if (auto journal = owner_->journal(); journal)
// Won't block because we disabled journal flushing. See first line of this function.
RecordExpiryBlocking(db_ind, key);

if (expired_keys_events_recording_)
db_table->expired_keys_events_.emplace_back(key);
}

SendQueuedInvalidationMessages();
// This might not always be atomic on exceptional cases -- see comments on the function
// declaration.
SendQueuedInvalidationMessagesAsync();
auto time_finish = absl::GetCurrentTimeNanos();
events_.evicted_keys += evicted_items;
DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000;
Expand Down Expand Up @@ -1529,33 +1528,50 @@ void DbSlice::QueueInvalidationTrackingMessageAtomic(std::string_view key) {
}
}

void DbSlice::SendQueuedInvalidationMessagesCb(const TrackingMap& track_map, unsigned idx) const {
for (auto& [key, client_list] : track_map) {
for (auto& client : client_list) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->conn_state.tracking_info_.IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
}
}
}
}

void DbSlice::SendQueuedInvalidationMessages() {
// We run while loop because when we block below, we might have new items added to
// pending_send_map_.
while (!pending_send_map_.empty()) {
auto local_map = std::move(pending_send_map_);

// Notify all the clients. this function is not efficient,
// because it broadcasts to all threads unrelated to the subscribers for the key.
auto local_map = std::move(pending_send_map_);
auto cb = [&](unsigned idx, util::ProactorBase*) {
for (auto& [key, client_list] : local_map) {
for (auto& client : client_list) {
if (client.IsExpired() || (client.Thread() != idx)) {
continue;
}
auto* conn = client.Get();
auto* cntx = static_cast<ConnectionContext*>(conn->cntx());
if (cntx && cntx->conn_state.tracking_info_.IsTrackingOn()) {
conn->SendInvalidationMessageAsync({key});
}
}
}
SendQueuedInvalidationMessagesCb(local_map, idx);
};

shard_set->pool()->AwaitBrief(std::move(cb));
}
}

// This function might preempt if the task queue within DispatchBrief is full and we can't
// enqueue the callback. Although a rare case, this code might not be atomic.
void DbSlice::SendQueuedInvalidationMessagesAsync() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We drained it before because we blocked. Now we don't.

TODO investigate: we should call this also from heartbeat (so on the second iteration it drains it if there were items added).

Copy link
Contributor Author

@kostasrim kostasrim Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this the while loop was redundant in the flow of heartbeat. Why? Because heartbeat was preempted(because it tried to sent the queued invalidated messages). pending_send_map_ can only change in two places: 1) during evictions from heartbeat 2) through db_slice and drained at the end via OnCbFinish(). The former (1) is guaranteed that it won't evict because it's preempted and until it completes it won't run another heartbeat on that thread while the later (2) drains the pending_send_map_ via OnCbFinish(). Therefore, there is no correctness issue of "missing or not draining all of the items in the pending_send_map_".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for DispatchBrief, I also don't see any issue whatsoever, they messages will reach the connections eventually and it should be enough

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DispatchBrief can also block btw, it's just a much rare event

Copy link
Contributor Author

@kostasrim kostasrim Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange the rationale was this doesn't happen in practice because we don't reach this state easily (where we have the task queues internally full). Your comment is very valid, and maybe we should move this outside of the non preemptive critical section. I think it should be a simple change. I will update on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange would it worth to add a function that won't dispatch if the task queue is full (but also won't preempt and return to the caller?)

That way, we can send the pending_items later if we can't dispatch because the intenral queues are full

if (pending_send_map_.empty()) {
return;
}
// DispatchBrief will copy local_map
auto cb = [lm = std::move(pending_send_map_), this](unsigned idx, util::ProactorBase*) {
SendQueuedInvalidationMessagesCb(lm, idx);
};

shard_set->pool()->DispatchBrief(std::move(cb));
}

void DbSlice::StartSampleTopK(DbIndex db_ind, uint32_t min_freq) {
auto& db = *db_arr_[db_ind];
if (db.top_keys) {
Expand Down Expand Up @@ -1725,9 +1741,8 @@ void DbSlice::OnCbFinishBlocking() {
}
}

if (!pending_send_map_.empty()) {
SendQueuedInvalidationMessages();
}
// Sends only if !pending_send_map_.empty()
SendQueuedInvalidationMessages();
}

void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
Expand Down
19 changes: 12 additions & 7 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,10 @@ class DbSlice {

// Evicts items with dynamically allocated data from the primary table.
// Does not shrink tables.
// Returnes number of (elements,bytes) freed due to evictions.
std::pair<uint64_t, size_t> FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
size_t increase_goal_bytes);
// Returns number of (elements,bytes) freed due to evictions.
std::pair<uint64_t, size_t> FreeMemWithEvictionStepAtomic(DbIndex db_indx,
size_t starting_segment_id,
size_t increase_goal_bytes);

int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;

Expand Down Expand Up @@ -564,6 +565,7 @@ class DbSlice {
// Queues invalidation message to the clients that are tracking the change to a key.
void QueueInvalidationTrackingMessageAtomic(std::string_view key);
void SendQueuedInvalidationMessages();
void SendQueuedInvalidationMessagesAsync();

void CreateDb(DbIndex index);

Expand Down Expand Up @@ -673,10 +675,13 @@ class DbSlice {

using AllocatorType = PMR_NS::polymorphic_allocator<std::pair<std::string, ConnectionHashSet>>;

absl::flat_hash_map<std::string, ConnectionHashSet,
absl::container_internal::hash_default_hash<std::string>,
absl::container_internal::hash_default_eq<std::string>, AllocatorType>
client_tracking_map_, pending_send_map_;
using TrackingMap =
absl::flat_hash_map<std::string, ConnectionHashSet,
absl::container_internal::hash_default_hash<std::string>,
absl::container_internal::hash_default_eq<std::string>, AllocatorType>;
TrackingMap client_tracking_map_, pending_send_map_;

void SendQueuedInvalidationMessagesCb(const TrackingMap& track_map, unsigned idx) const;

class PrimeBumpPolicy;
};
Expand Down
2 changes: 1 addition & 1 deletion src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (eviction_goal) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
auto [evicted_items, evicted_bytes] =
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal);
db_slice.FreeMemWithEvictionStepAtomic(i, starting_segment_id, eviction_goal);

DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal
<< " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes
Expand Down
2 changes: 2 additions & 0 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,7 @@ void Journal::SetFlushMode(bool allow_flush) {
journal_slice.SetFlushMode(allow_flush);
}

size_t thread_local JournalFlushGuard::counter_ = 0;

} // namespace journal
} // namespace dfly
5 changes: 4 additions & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ class JournalFlushGuard {
journal_->SetFlushMode(false);
}
util::fb2::detail::EnterFiberAtomicSection();
++counter_;
}

~JournalFlushGuard() {
util::fb2::detail::LeaveFiberAtomicSection();
if (journal_) {
--counter_;
if (journal_ && counter_ == 0) {
journal_->SetFlushMode(true); // Restore the state on destruction
}
}
Expand All @@ -64,6 +66,7 @@ class JournalFlushGuard {

private:
Journal* journal_;
static size_t thread_local counter_;
};

} // namespace journal
Expand Down
Loading