Skip to content

Commit

Permalink
feat: add max_pending_accept_reset_streams(n) options
Browse files Browse the repository at this point in the history
The new option is available to both client and server `Builder`s.
  • Loading branch information
seanmonstar committed Apr 13, 2023
1 parent e2723c9 commit a0d4949
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 4 deletions.
37 changes: 37 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ pub struct Builder {
/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

/// Maximum number of remotely reset streams to allow in the pending
/// accept queue.
pending_accept_reset_stream_max: usize,

/// Initial `Settings` frame to send as part of the handshake.
settings: Settings,

Expand Down Expand Up @@ -634,6 +638,7 @@ impl Builder {
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
initial_target_connection_window_size: None,
initial_max_send_streams: usize::MAX,
settings: Default::default(),
Expand Down Expand Up @@ -966,6 +971,37 @@ impl Builder {
self
}

/// Sets the maximum number of pending-accept remotely-reset streams.
///
/// When a
///
/// The default value is 20.
///
/// # Examples
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
/// # use h2::client::*;
/// # use bytes::Bytes;
/// #
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # {
/// // `client_fut` is a future representing the completion of the HTTP/2
/// // handshake.
/// let client_fut = Builder::new()
/// .max_pending_accept_reset_streams(100)
/// .handshake(my_io);
/// # client_fut.await
/// # }
/// #
/// # pub fn main() {}
/// ```
pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
self.pending_accept_reset_stream_max = max;
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
Expand Down Expand Up @@ -1209,6 +1245,7 @@ where
max_send_buffer_size: builder.max_send_buffer_size,
reset_stream_duration: builder.reset_stream_duration,
reset_stream_max: builder.reset_stream_max,
remote_reset_stream_max: builder.pending_accept_reset_stream_max,
settings: builder.settings.clone(),
},
);
Expand Down
5 changes: 2 additions & 3 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};

const DEFAULT_MAX_REMOTE_RESET_STREAMS: usize = 20;

/// An H2 connection
#[derive(Debug)]
pub(crate) struct Connection<T, P, B: Buf = Bytes>
Expand Down Expand Up @@ -82,6 +80,7 @@ pub(crate) struct Config {
pub max_send_buffer_size: usize,
pub reset_stream_duration: Duration,
pub reset_stream_max: usize,
pub remote_reset_stream_max: usize,
pub settings: frame::Settings,
}

Expand Down Expand Up @@ -120,7 +119,7 @@ where
.unwrap_or(false),
local_reset_duration: config.reset_stream_duration,
local_reset_max: config.reset_stream_max,
remote_reset_max: DEFAULT_MAX_REMOTE_RESET_STREAMS,
remote_reset_max: config.remote_reset_stream_max,
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
remote_max_initiated: config
.settings
Expand Down
1 change: 1 addition & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
49 changes: 49 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ pub struct Builder {
/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

/// Maximum number of remotely reset streams to allow in the pending
/// accept queue.
pending_accept_reset_stream_max: usize,

/// Initial `Settings` frame to send as part of the handshake.
settings: Settings,

Expand Down Expand Up @@ -642,6 +646,7 @@ impl Builder {
Builder {
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
Expand Down Expand Up @@ -882,6 +887,49 @@ impl Builder {
self
}

/// Sets the maximum number of pending-accept remotely-reset streams.
///
/// Streams that have been received by the peer, but not accepted by the
/// user, can also receive a RST_STREAM. This is a legitimate pattern: one
/// could send a request and then shortly after, realize it is not needed,
/// sending a CANCEL.
///
/// However, since those streams are now "closed", they don't count towards
/// the max concurrent streams. So, they will sit in the accept queue,
/// using memory.
///
/// When the number of remotely-reset streams sitting in the pending-accept
/// queue reaches this maximum value, a connection error with the code of
/// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
/// `Future`.
///
/// The default value is 20.
///
/// # Examples
///
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
/// # use h2::server::*;
/// #
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<T>
/// # {
/// // `server_fut` is a future representing the completion of the HTTP/2
/// // handshake.
/// let server_fut = Builder::new()
/// .max_pending_accept_reset_streams(100)
/// .handshake(my_io);
/// # server_fut
/// # }
/// #
/// # pub fn main() {}
/// ```
pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
self.pending_accept_reset_stream_max = max;
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
Expand Down Expand Up @@ -1312,6 +1360,7 @@ where
max_send_buffer_size: self.builder.max_send_buffer_size,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
settings: self.builder.settings.clone(),
},
);
Expand Down
4 changes: 3 additions & 1 deletion tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ async fn reset_streams_dont_grow_memory_continuously() {
let (io, mut client) = mock::new();

const N: u32 = 50;
const MAX: usize = 20;

let client = async move {
let settings = client.assert_server_handshake().await;
Expand All @@ -212,14 +213,15 @@ async fn reset_streams_dont_grow_memory_continuously() {
}
tokio::time::timeout(
std::time::Duration::from_secs(1),
client.recv_frame(frames::go_away(41).calm()),
client.recv_frame(frames::go_away((MAX * 2 + 1) as u32).calm()),
)
.await
.expect("client goaway");
};

let srv = async move {
let mut srv = server::Builder::new()
.max_pending_accept_reset_streams(MAX)
.handshake::<_, Bytes>(io)
.await
.expect("handshake");
Expand Down

0 comments on commit a0d4949

Please sign in to comment.