From 8978e106bc9774fac79c01634089ee967e24dcad Mon Sep 17 00:00:00 2001 From: Robin Seitz Date: Sun, 21 May 2023 10:44:17 -0700 Subject: [PATCH] fix(http1): http1 server graceful shutdown fix fix issue in the graceful shutdown logic which causes the connection future to hang when graceful shutdown is called prior to any requests being made. This fix checks to see if the connection is still in its initial state when disable_keep_alive is called, and starts the shutdown process if it is. This addresses issue #2730 --- src/proto/h1/conn.rs | 5 +++++ src/proto/h1/dispatch.rs | 12 +++++++++--- tests/server.rs | 26 ++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index b7c619683c..3e753c6f6a 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -175,6 +175,11 @@ where } } + #[cfg(feature = "server")] + pub(crate) fn has_initial_read_write_state(&self) -> bool { + matches!(self.state.reading, Reading::Init) && matches!(self.state.writing, Writing::Init) + } + fn should_error_on_eof(&self) -> bool { // If we're idle, it's probably just the connection closing gracefully. T::should_error_on_parse_eof() && !self.state.is_idle() diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index cd494581b9..beb8d0a914 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -28,7 +28,8 @@ pub(crate) trait Dispatch { self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>>; - fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) + -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -81,7 +82,11 @@ where #[cfg(feature = "server")] pub(crate) fn disable_keep_alive(&mut self) { self.conn.disable_keep_alive(); - if self.conn.is_write_closed() { + + // If keep alive has been disabled and no read or write has been seen on + // the connection yet, we must be in a state where the server is being asked to + // shut down before any data has been seen on the connection + if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() { self.close(); } } @@ -249,7 +254,8 @@ where let body = match body_len { DecodedLength::ZERO => IncomingBody::empty(), other => { - let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); + let (tx, rx) = + IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); self.body_tx = Some(tx); rx } diff --git a/tests/server.rs b/tests/server.rs index 632ce4839a..759843e291 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -31,6 +31,7 @@ use hyper::body::{Body, Incoming as IncomingBody}; use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; use hyper::{Method, Request, Response, StatusCode, Uri, Version}; +use tokio::pin; mod support; @@ -2136,6 +2137,31 @@ async fn max_buf_size() { .expect_err("should TooLarge error"); } +#[cfg(feature = "http1")] +#[tokio::test] +async fn graceful_shutdown_before_first_request_no_block() { + let (listener, addr) = setup_tcp_listener(); + + tokio::spawn(async move { + let socket = listener.accept().await.unwrap().0; + + let future = http1::Builder::new().serve_connection(socket, HelloWorld); + pin!(future); + future.as_mut().graceful_shutdown(); + + future.await.unwrap(); + }); + + let mut stream = TkTcpStream::connect(addr).await.unwrap(); + + let mut buf = vec![]; + + tokio::time::timeout(Duration::from_secs(5), stream.read_to_end(&mut buf)) + .await + .expect("timed out waiting for graceful shutdown") + .expect("error receiving response"); +} + #[test] fn streaming_body() { use futures_util::StreamExt;