diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 70dfc785..4b2a08c1 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -173,6 +173,11 @@ impl Counts { self.max_send_streams } + /// Returns the maximum number of streams. + pub(crate) fn max_streams(&self) -> usize { + self.max_recv_streams + self.max_reset_streams + } + /// Returns the maximum number of streams that can be initiated by the /// remote peer. pub(crate) fn max_recv_streams(&self) -> usize { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 3e34b7cb..271c3579 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -212,7 +212,7 @@ impl Store { self.ids.len() } - #[cfg(feature = "unstable")] + /// Returns the number of streams that are held in slab. pub fn num_wired_streams(&self) -> usize { self.slab.len() } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 3e7ae97d..9c6752fe 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -312,7 +312,14 @@ where impl DynStreams<'_, B> { pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); - + if me.store.num_wired_streams() > me.counts.max_streams() { + tracing::error!("HEADERS: number of streams exceeds the upper limit."); + return Err(Error::GoAway( + Bytes::new(), + Reason::PROTOCOL_ERROR, + Initiator::Remote, + )); + } me.recv_headers(self.peer, &self.send_buffer, frame) } @@ -349,6 +356,14 @@ impl DynStreams<'_, B> { pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); + if me.store.num_wired_streams() > me.counts.max_streams() { + tracing::error!("PUSH_PROMISE: number of streams exceeds the upper limit."); + return Err(Error::GoAway( + Bytes::new(), + Reason::PROTOCOL_ERROR, + Initiator::Remote, + )); + } me.recv_push_promise(&self.send_buffer, frame) }