Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] rpc/transport: fix temporary dispatch loop stalls #11184

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 102 additions & 70 deletions src/v/rpc/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ transport::send(netbuf b, rpc::client_opts opts) {
}

ss::future<result<std::unique_ptr<streaming_context>>>
transport::make_response_handler(
netbuf& b, rpc::client_opts& opts, sequence_t seq) {
transport::make_response_handler(netbuf& b, rpc::client_opts& opts) {
if (_correlations.find(_correlation_idx + 1) != _correlations.end()) {
_probe.client_correlation_error();
vlog(
Expand Down Expand Up @@ -169,49 +168,48 @@ transport::make_response_handler(
throw std::logic_error(
fmt::format("Tried to reuse correlation id: {}", idx));
}
handler_raw_ptr->with_timeout(
opts.timeout, [this, method = b.name(), idx, seq] {
auto format_ms = [](clock_type::duration d) {
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
d);
return fmt::format("{} ms", ms.count());
};

auto from_now =
[now = timing_info::clock_type::now(), format_ms](
timing_info::clock_type::time_point earlier) -> std::string {
if (earlier == timing_info::unset) {
return "unset";
}
return format_ms(now - earlier);
};
_requests_queue.erase(seq);
auto it = _correlations.find(idx);
// The timeout may race with the completion of the request (and
// removal from _correlations map) in which case we treat this as a
// not-timed-out request.
if (likely(it != _correlations.end())) {
auto& timing = it->second->timing;
vlog(
rpclog.info,
"RPC timeout ({}) to {}, method: {}, correlation id: {}, {} "
"in flight, time since: {{init: {}, enqueue: {}, dispatch: "
"{}, written: {}}}, flushed: {}",
format_ms(timing.timeout.timeout_period),
server_address(),
method,
idx,
_correlations.size(),
from_now(
timing.timeout.timeout_at() - timing.timeout.timeout_period),
from_now(timing.enqueued_at),
from_now(timing.dispatched_at),
from_now(timing.written_at),
timing.flushed);
_probe.request_timeout();
_correlations.erase(it);
}
});
handler_raw_ptr->with_timeout(opts.timeout, [this, method = b.name(), idx] {
auto format_ms = [](clock_type::duration d) {
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(d);
return fmt::format("{} ms", ms.count());
};

auto from_now =
[now = timing_info::clock_type::now(), format_ms](
timing_info::clock_type::time_point earlier) -> std::string {
if (earlier == timing_info::unset) {
return "unset";
}
return format_ms(now - earlier);
};
auto it = _correlations.find(idx);
// The timeout may race with the completion of the request (and
// removal from _correlations map) in which case we treat this as a
// not-timed-out request.
if (likely(it != _correlations.end())) {
auto& timing = it->second->timing;
vlog(
rpclog.info,
"RPC timeout ({}) to {}, method: {}, correlation id: {}, {} "
"in flight, time since: {{init: {}, enqueue: {}, "
"memory_reserved: {} dispatch: "
"{}, written: {}}}, flushed: {}",
format_ms(timing.timeout.timeout_period),
server_address(),
method,
idx,
_correlations.size(),
from_now(
timing.timeout.timeout_at() - timing.timeout.timeout_period),
from_now(timing.enqueued_at),
from_now(timing.memory_reserved_at),
from_now(timing.dispatched_at),
from_now(timing.written_at),
timing.flushed);
_probe.request_timeout();
_correlations.erase(it);
}
});

return response_future;
}
Expand All @@ -222,19 +220,24 @@ transport::do_send(sequence_t seq, netbuf b, rpc::client_opts opts) {
// hold invariant of always having a valid connection _and_ a working
// dispatch gate where we can wait for async futures
if (!is_valid() || _dispatch_gate.is_closed()) {
_last_seq = std::max(_last_seq, seq);
return ss::make_ready_future<ret_t>(errc::disconnected_endpoint);
}
return ss::with_gate(
_dispatch_gate,
[this, b = std::move(b), opts = std::move(opts), seq]() mutable {
auto f = make_response_handler(b, opts, seq);
auto f = make_response_handler(b, opts);

// send
auto sz = b.buffer().size_bytes();
auto corr = b.correlation_id();
return get_units(_memory, sz)
.then([b = std::move(b)](ssx::semaphore_units units) mutable {
.then([b = std::move(b), corr, this](
ssx::semaphore_units units) mutable {
auto it = _correlations.find(corr);
if (likely(it != _correlations.end())) {
auto& timing = it->second->timing;
timing.memory_reserved_at = clock_type::now();
}
return std::move(b).as_scattered().then(
[u = std::move(units)](
ss::scattered_message<char> scattered_message) mutable {
Expand All @@ -246,27 +249,35 @@ transport::do_send(sequence_t seq, netbuf b, rpc::client_opts opts) {
[this, f = std::move(f), seq, corr](
ssx::semaphore_units units,
ss::scattered_message<char> scattered_message) mutable {
// Check that the request hasn't been finished yet.
// If it has (due to timeout or disconnect), we don't need to
// send it.
if (_correlations.contains(corr)) {
auto e = entry{
.scattered_message
= std::make_unique<ss::scattered_message<char>>(
std::move(scattered_message)),
.correlation_id = corr};

_requests_queue.emplace(
seq, std::make_unique<entry>(std::move(e)));
dispatch_send();
}
auto e = entry{
.scattered_message
= std::make_unique<ss::scattered_message<char>>(
std::move(scattered_message)),
.correlation_id = corr};

_requests_queue.emplace(
seq, std::make_unique<entry>(std::move(e)));
// By this point the request may already have timed out but
// we still do dispatch_send where it is handled. This is
// needed for two reasons:
// - Monotonic updates to _last_seq
// - Draining of the request_queue which could otherwise be
// stalled by missing sequence number.
dispatch_send();
return std::move(f).finally([u = std::move(units)] {});
})
.finally([this, seq] {
// update last sequence to make progress, for successfull
// dispatches this will be noop, as _last_seq was already update
// before sending data
_last_seq = std::max(_last_seq, seq);
.handle_exception([this, seq, corr](std::exception_ptr eptr) {
// This is unlikely but may potentially mean dispatch_send()
// is not called, stalling the sequence number.
vlog(
rpclog.error,
"Exception {} dispatching rpc with sequence: {}, "
"correlation_idx: {}, last_seq: {}",
eptr,
seq,
corr,
_last_seq);
return ss::make_exception_future<ret_t>(eptr);
});
});
}
Expand All @@ -276,9 +287,21 @@ void transport::dispatch_send() {
= ssx::spawn_with_gate_then(_dispatch_gate, [this]() mutable {
return ss::do_until(
[this] {
return _requests_queue.empty()
|| _requests_queue.begin()->first
> (_last_seq + sequence_t(1));
if (_requests_queue.empty()) {
return true;
}
auto queue_begin_sequence = _requests_queue.begin()->first;
auto out_of_order = queue_begin_sequence
> (_last_seq + sequence_t(1));
if (unlikely(out_of_order)) {
vlog(
rpclog.debug,
"Dispatch request queue out of order. Last seq: {}, "
"queue begin seq: {}",
_last_seq,
queue_begin_sequence);
}
return out_of_order;
},
// Be careful adding any scheduling points in the lambda below.
//
Expand Down Expand Up @@ -317,6 +340,15 @@ void transport::dispatch_send() {

auto f = _out.write(std::move(v));
resp_entry->timing.dispatched_at = clock_type::now();
vlog(
rpclog.trace,
"Dispatched request with sequence: {}, "
"correlation_idx: {}, "
"pending queue_size: {}, target_address: {}",
_last_seq,
corr,
_requests_queue.size(),
server_address());
return std::move(f)
.then([this, corr](bool flushed) {
if (auto maybe_timing = get_timing(corr)) {
Expand Down
9 changes: 8 additions & 1 deletion src/v/rpc/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ struct timing_info {
*/
time_point enqueued_at = unset;

/**
* Moment in time the semaphore units needed for request buffer are
* reserved. The request is not dispatched until the required units are
* acquired.
*/
time_point memory_reserved_at = unset;

/**
* The moment in time we dispatched the request: that is, it was the next
* request to be sent and we called .write on the output stream: note that
Expand Down Expand Up @@ -156,7 +163,7 @@ class transport final : public net::base_transport {
void dispatch_send();

ss::future<result<std::unique_ptr<streaming_context>>>
make_response_handler(netbuf&, rpc::client_opts&, sequence_t);
make_response_handler(netbuf&, rpc::client_opts&);

ssx::semaphore _memory;

Expand Down