Skip to content

Commit

Permalink
[call-v3] Fix leak with cq-based server (#37972)
Browse files Browse the repository at this point in the history
Three problems:

1. We have an owning waker, but on the `Expire` path we never wake it, leading to calls being stranded until the pending timer runs out - instead we now call Finish and have it always wake things up (slightly more expensive in shutdown case, but not on the fast path)
2. Avoid a race condition whereby two threads could wake the same waker
3. Don't add new requests to the pending queue after we've removed all requests

Closes #37972

COPYBARA_INTEGRATE_REVIEW=#37972 from ctiller:flake-fightas-21 2bbd1cf
PiperOrigin-RevId: 688310530
  • Loading branch information
ctiller authored and copybara-github committed Oct 21, 2024
1 parent eacb2f7 commit c5999db
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/core/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
pending_promises_.front()->Finish(absl::InternalError("Server closed"));
pending_promises_.pop();
}
zombified_ = true;
}

void KillRequests(grpc_error_handle error) override {
Expand Down Expand Up @@ -468,6 +469,9 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
return Immediate(absl::ResourceExhaustedError(
"Too many pending requests for this server"));
}
if (zombified_) {
return Immediate(absl::InternalError("Server closed"));
}
auto w = std::make_shared<ActivityWaiter>(
GetContext<Activity>()->MakeOwningWaker());
pending_promises_.push(w);
Expand All @@ -478,7 +482,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
if (r == nullptr) return Pending{};
return std::move(*r);
},
[w]() { w->Expire(); });
[w]() { w->Finish(absl::CancelledError()); });
}
}
return Immediate(MatchResult(server(), cq_idx, rc));
Expand All @@ -498,8 +502,14 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
void Finish(absl::Status status) {
delete result.exchange(new ResultType(std::move(status)),
std::memory_order_acq_rel);
ResultType* expected = nullptr;
ResultType* new_value = new ResultType(std::move(status));
if (!result.compare_exchange_strong(expected, new_value,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
delete new_value;
return;
}
waker.WakeupAsync();
}
// Returns true if requested_call consumed, false otherwise.
Expand All @@ -518,10 +528,6 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
waker.WakeupAsync();
return true;
}
void Expire() {
delete result.exchange(new ResultType(absl::CancelledError()),
std::memory_order_acq_rel);
}
Duration Age() { return Timestamp::Now() - created; }
Waker waker;
std::atomic<ResultType*> result{nullptr};
Expand All @@ -531,6 +537,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
std::queue<PendingCallFilterStack> pending_filter_stack_;
std::queue<PendingCallPromises> pending_promises_;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
bool zombified_ = false;
};

// AllocatingRequestMatchers don't allow the application to request an RPC in
Expand Down

0 comments on commit c5999db

Please sign in to comment.