Skip to content

Commit

Permalink
Avoid spurious wakeups when stream capacity is not available (#661)
Browse files Browse the repository at this point in the history
Fixes #628

Sometimes `poll_capacity` returns `Ready(Some(0))` - in which case
caller will have no way to wait for the stream capacity to become available.
The previous attempt on the fix has addressed only a part of the problem.

The root cause - in a nutshell - is the race condition between the
application tasks that performs stream I/O and the task that serves
the underlying HTTP/2 connection. The application thread that is about
to send data calls `reserve_capacity/poll_capacity`, is provided
with some send capacity and proceeds to `send_data`.

Meanwhile the service thread may send some buffered data and/or
receive some window updates - either way the stream's effective
allocated send capacity may not change, but, since the capacity still
available, `send_capacity_inc` flag may be set.

The sending task calls `send_data` and uses the entire allocated
capacity, leaving the flag set. Next time `poll_capacity` returns
`Ready(Some(0))`.

This change sets the flag and dispatches the wakeup event only in
cases when the effective capacity reported by `poll_capacity` actually
increases.
  • Loading branch information
vadim-eg authored Feb 20, 2023
1 parent 73bea23 commit 7323190
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 33 deletions.
20 changes: 6 additions & 14 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,11 @@ impl Prioritize {
/// connection
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
if available > 0 {
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
}

/// Reclaim just reserved capacity, not buffered capacity, and re-assign
Expand Down Expand Up @@ -756,17 +758,7 @@ impl Prioritize {

// Update the flow control
tracing::trace_span!("updating stream flow").in_scope(|| {
stream.send_flow.send_data(len);

// Decrement the stream's buffered data counter
debug_assert!(stream.buffered_send_data >= len as usize);
stream.buffered_send_data -= len as usize;
stream.requested_send_capacity -= len;

// If the capacity was limited because of the
// max_send_buffer_size, then consider waking
// the send task again...
stream.notify_if_can_buffer_more(self.max_buffer_size);
stream.send_data(len, self.max_buffer_size);

// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
Expand Down
7 changes: 1 addition & 6 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,7 @@ impl Send {

/// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
let available = stream.send_flow.available().as_size() as usize;
let buffered = stream.buffered_send_data;

available
.min(self.prioritize.max_buffer_size())
.saturating_sub(buffered) as WindowSize
stream.capacity(self.prioritize.max_buffer_size())
}

pub fn poll_reset(
Expand Down
56 changes: 43 additions & 13 deletions src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,35 +264,65 @@ impl Stream {
self.ref_count == 0 && !self.state.is_closed()
}

/// Current available stream send capacity
pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
let available = self.send_flow.available().as_size() as usize;
let buffered = self.buffered_send_data;

available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
}

pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);
debug_assert!(capacity > 0);
self.send_flow.assign_capacity(capacity);

tracing::trace!(
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}",
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
self.send_flow.available(),
self.buffered_send_data,
self.id,
max_buffer_size
max_buffer_size,
prev_capacity,
);

self.notify_if_can_buffer_more(max_buffer_size);
if prev_capacity < self.capacity(max_buffer_size) {
self.notify_capacity();
}
}

/// If the capacity was limited because of the max_send_buffer_size,
/// then consider waking the send task again...
pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) {
let available = self.send_flow.available().as_size() as usize;
let buffered = self.buffered_send_data;
pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);

self.send_flow.send_data(len);

// Only notify if the capacity exceeds the amount of buffered data
if available.min(max_buffer_size) > buffered {
self.send_capacity_inc = true;
tracing::trace!(" notifying task");
self.notify_send();
// Decrement the stream's buffered data counter
debug_assert!(self.buffered_send_data >= len as usize);
self.buffered_send_data -= len as usize;
self.requested_send_capacity -= len;

tracing::trace!(
" sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
self.send_flow.available(),
self.buffered_send_data,
self.id,
max_buffer_size,
prev_capacity,
);

if prev_capacity < self.capacity(max_buffer_size) {
self.notify_capacity();
}
}

/// If the capacity was limited because of the max_send_buffer_size,
/// then consider waking the send task again...
pub fn notify_capacity(&mut self) {
self.send_capacity_inc = true;
tracing::trace!(" notifying task");
self.notify_send();
}

/// Returns `Err` when the decrement cannot be completed due to overflow.
pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
match self.content_length {
Expand Down
61 changes: 61 additions & 0 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1797,3 +1797,64 @@ async fn max_send_buffer_size_poll_capacity_wakes_task() {

join(srv, client).await;
}

#[tokio::test]
async fn poll_capacity_wakeup_after_window_update() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().initial_window_size(10))
.await;
assert_default_settings!(settings);
srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
srv.send_frame(frames::window_update(1, 5)).await;
srv.send_frame(frames::window_update(1, 5)).await;
srv.recv_frame(frames::data(1, &b"abcde"[..])).await;
srv.recv_frame(frames::data(1, &b""[..]).eos()).await;
};

let h2 = async move {
let (mut client, mut h2) = client::Builder::new()
.max_send_buffer_size(5)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (response, mut stream) = client.send_request(request, false).unwrap();

let response = h2.drive(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

stream.send_data("abcde".into(), false).unwrap();

stream.reserve_capacity(10);
assert_eq!(stream.capacity(), 0);

let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;
h2.drive(idle_ms(10)).await;
stream.send_data("abcde".into(), false).unwrap();

stream.reserve_capacity(5);
assert_eq!(stream.capacity(), 0);

// This will panic if there is a bug causing h2 to return Ok(0) from poll_capacity.
let mut stream = h2.drive(util::wait_for_capacity(stream, 5)).await;

stream.send_data("".into(), true).unwrap();

// Wait for the connection to close
h2.await.unwrap();
};

join(srv, h2).await;
}

0 comments on commit 7323190

Please sign in to comment.