From 5bc8e72e5fcbd8ae2d3d9bc78a1c0ef0040bcc39 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 12 Apr 2023 12:23:56 -0400 Subject: [PATCH] fix: limit the amount of pending-accept reset streams Streams that have been received by the peer, but not accepted by the user, can also receive a RST_STREAM. This is a legitimate pattern: one could send a request and then shortly after, realize it is not needed, sending a CANCEL. However, since those streams are now "closed", they don't count towards the max concurrent streams. So, they will sit in the accept queue, using memory. In most cases, the user is calling `accept` in a loop, and they can accept requests that have been reset fast enough that this isn't an issue in practice. But if the peer is able to flood the network faster than the server accept loop can run (simply accepting, not processing requests; that tends to happen in a separate task), the memory could grow. So, this introduces a maximum count for streams in the pending-accept but remotely-reset state. If the maximum is reached, a GOAWAY frame with the error code of ENHANCE_YOUR_CALM is sent, and the connection marks itself as errored. ref CVE-2023-26964 ref GHSA-f8vr-r385-rh5r Closes https://github.com/hyperium/hyper/issues/2877 --- src/proto/connection.rs | 8 +++++ src/proto/streams/counts.rs | 51 ++++++++++++++++++++++----- src/proto/streams/mod.rs | 4 +++ src/proto/streams/recv.rs | 30 ++++++++++++++-- src/proto/streams/state.rs | 7 ++++ src/proto/streams/streams.rs | 8 ++++- src/server.rs | 7 ++++ tests/h2-support/src/frames.rs | 4 +++ tests/h2-tests/tests/stream_states.rs | 41 ++++++++++++++++++++- 9 files changed, 148 insertions(+), 12 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 59883cf33..1fec23102 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -14,6 +14,8 @@ use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; +const DEFAULT_MAX_REMOTE_RESET_STREAMS: usize = 20; + /// An H2 connection #[derive(Debug)] pub(crate) struct Connection @@ -118,6 +120,7 @@ where .unwrap_or(false), local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, + remote_reset_max: DEFAULT_MAX_REMOTE_RESET_STREAMS, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, remote_max_initiated: config .settings @@ -172,6 +175,11 @@ where self.inner.streams.max_recv_streams() } + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + self.inner.streams.num_wired_streams() + } + /// Returns `Ready` when the connection is ready to receive a frame. /// /// Returns `Error` as this may raise errors that are caused by delayed diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 70dfc7851..6a5aa9ccd 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -21,10 +21,16 @@ pub(super) struct Counts { num_recv_streams: usize, /// Maximum number of pending locally reset streams - max_reset_streams: usize, + max_local_reset_streams: usize, /// Current number of pending locally reset streams - num_reset_streams: usize, + num_local_reset_streams: usize, + + /// Max number of "pending accept" streams that were remotely reset + max_remote_reset_streams: usize, + + /// Current number of "pending accept" streams that were remotely reset + num_remote_reset_streams: usize, } impl Counts { @@ -36,8 +42,10 @@ impl Counts { num_send_streams: 0, max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), num_recv_streams: 0, - max_reset_streams: config.local_reset_max, - num_reset_streams: 0, + max_local_reset_streams: config.local_reset_max, + num_local_reset_streams: 0, + max_remote_reset_streams: config.remote_reset_max, + num_remote_reset_streams: 0, } } @@ -90,7 +98,7 @@ impl Counts { /// Returns true if the number of pending reset streams can be incremented. pub fn can_inc_num_reset_streams(&self) -> bool { - self.max_reset_streams > self.num_reset_streams + self.max_local_reset_streams > self.num_local_reset_streams } /// Increments the number of pending reset streams. @@ -101,7 +109,34 @@ impl Counts { pub fn inc_num_reset_streams(&mut self) { assert!(self.can_inc_num_reset_streams()); - self.num_reset_streams += 1; + self.num_local_reset_streams += 1; + } + + pub(crate) fn max_remote_reset_streams(&self) -> usize { + self.max_remote_reset_streams + } + + /// Returns true if the number of pending REMOTE reset streams can be + /// incremented. + pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool { + self.max_remote_reset_streams > self.num_remote_reset_streams + } + + /// Increments the number of pending REMOTE reset streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub(crate) fn inc_num_remote_reset_streams(&mut self) { + assert!(self.can_inc_num_remote_reset_streams()); + + self.num_remote_reset_streams += 1; + } + + pub(crate) fn dec_num_remote_reset_streams(&mut self) { + assert!(self.num_remote_reset_streams > 0); + + self.num_remote_reset_streams -= 1; } pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { @@ -194,8 +229,8 @@ impl Counts { } fn dec_num_reset_streams(&mut self) { - assert!(self.num_reset_streams > 0); - self.num_reset_streams -= 1; + assert!(self.num_local_reset_streams > 0); + self.num_local_reset_streams -= 1; } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 0ff8131c1..fbe32c7b0 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -60,6 +60,10 @@ pub struct Config { /// Maximum number of locally reset streams to keep at a time pub local_reset_max: usize, + /// Maximum number of remotely reset "pending accept" streams to keep at a + /// time. Going over this number results in a connection error. + pub remote_reset_max: usize, + /// Initial window size of remote initiated streams pub remote_init_window_sz: WindowSize, diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 497efc9bc..0fe2bdd57 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -741,12 +741,39 @@ impl Recv { } /// Handle remote sending an explicit RST_STREAM. - pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { + pub fn recv_reset( + &mut self, + frame: frame::Reset, + stream: &mut Stream, + counts: &mut Counts, + ) -> Result<(), Error> { + // Reseting a stream that the user hasn't accepted is possible, + // but should be done with care. These streams will continue + // to take up memory in the accept queue, but will no longer be + // counted as "concurrent" streams. + // + // So, we have a separate limit for these. + // + // See https://github.com/hyperium/hyper/issues/2877 + if stream.is_pending_accept { + if counts.can_inc_num_remote_reset_streams() { + counts.inc_num_remote_reset_streams(); + } else { + tracing::warn!( + "recv_reset; remotely-reset pending-accept streams reached limit ({:?})", + counts.max_remote_reset_streams(), + ); + return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM)); + } + } + // Notify the stream stream.state.recv_reset(frame, stream.is_pending_send); stream.notify_send(); stream.notify_recv(); + + Ok(()) } /// Handle a connection-level error @@ -1033,7 +1060,6 @@ impl Recv { cx: &Context, stream: &mut Stream, ) -> Poll>> { - // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), Some(event) => { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index db37831f8..b9612addc 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -360,6 +360,13 @@ impl State { } } + pub fn is_remote_reset(&self) -> bool { + match self.inner { + Closed(Cause::Error(ref e)) => e.is_local(), + _ => false, + } + } + /// Returns true if the stream is already reset. pub fn is_reset(&self) -> bool { match self.inner { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index e1a2e3fe7..dbaebfa7a 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -140,6 +140,12 @@ where // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding // the lock, so it can't. me.refs += 1; + + // Pending-accepted remotely-reset streams are counted. + if stream.state.is_remote_reset() { + me.counts.dec_num_remote_reset_streams(); + } + StreamRef { opaque: OpaqueStreamRef::new(self.inner.clone(), stream), send_buffer: self.send_buffer.clone(), @@ -601,7 +607,7 @@ impl Inner { let actions = &mut self.actions; self.counts.transition(stream, |counts, stream| { - actions.recv.recv_reset(frame, stream); + actions.recv.recv_reset(frame, stream, counts)?; actions.send.handle_error(send_buffer, stream, counts); assert!(stream.state.is_closed()); Ok(()) diff --git a/src/server.rs b/src/server.rs index bb3d3cf86..6f2455e0b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -576,6 +576,13 @@ where pub fn max_concurrent_recv_streams(&self) -> usize { self.connection.max_recv_streams() } + + // Could disappear at anytime. + #[doc(hidden)] + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + self.connection.num_wired_streams() + } } #[cfg(feature = "stream")] diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 862e0c629..bc4e2e708 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -297,6 +297,10 @@ impl Mock { self.reason(frame::Reason::FRAME_SIZE_ERROR) } + pub fn calm(self) -> Self { + self.reason(frame::Reason::ENHANCE_YOUR_CALM) + } + pub fn no_error(self) -> Self { self.reason(frame::Reason::NO_ERROR) } diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 9f348d5f2..610d3a530 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, join3, lazy, try_join}; +use futures::future::{join, join3, lazy, poll_fn, try_join}; use futures::{FutureExt, StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once; @@ -194,6 +194,45 @@ async fn closed_streams_are_released() { join(srv, h2).await; } +#[tokio::test] +async fn reset_streams_dont_grow_memory_continuously() { + //h2_support::trace_init!(); + let (io, mut client) = mock::new(); + + const N: u32 = 50; + + let client = async move { + let settings = client.assert_server_handshake().await; + assert_default_settings!(settings); + for n in (1..(N * 2)).step_by(2) { + client + .send_frame(frames::headers(n).request("GET", "https://a.b/").eos()) + .await; + client.send_frame(frames::reset(n).protocol_error()).await; + } + tokio::time::timeout( + std::time::Duration::from_secs(1), + client.recv_frame(frames::go_away(41).calm()), + ) + .await + .expect("client goaway"); + }; + + let srv = async move { + let mut srv = server::Builder::new() + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + poll_fn(|cx| srv.poll_closed(cx)) + .await + .expect_err("server should error"); + // specifically, not 50; + assert_eq!(21, srv.num_wired_streams()); + }; + join(srv, client).await; +} + #[tokio::test] async fn errors_if_recv_frame_exceeds_max_frame_size() { h2_support::trace_init!();