diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 23f91472ef..e3737edbae 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -482,7 +482,11 @@ where /// This `Connection` should continue to be polled until shutdown /// can finish. pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown() + // Connection (`inner`) is `None` if it was upgraded (and `poll` is `Ready`). + // In that case, we don't need to call `graceful_shutdown`. + if let Some(conn) = self.inner.as_mut() { + Pin::new(conn).graceful_shutdown() + } } } diff --git a/tests/server.rs b/tests/server.rs index dcbe669565..661b98d180 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1256,6 +1256,67 @@ async fn disable_keep_alive_post_request() { child.join().unwrap(); } +#[tokio::test] +async fn http1_graceful_shutdown_after_upgrade() { + let (listener, addr) = setup_tcp_listener(); + let (read_101_tx, read_101_rx) = oneshot::channel(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + tcp.write_all( + b"\ + GET / HTTP/1.1\r\n\ + Upgrade: foobar\r\n\ + Connection: upgrade\r\n\ + \r\n\ + eagerly optimistic\ + ", + ) + .expect("write 1"); + let mut buf = [0; 256]; + tcp.read(&mut buf).expect("read 1"); + + let response = s(&buf); + assert!(response.starts_with("HTTP/1.1 101 Switching Protocols\r\n")); + assert!(!has_header(&response, "content-length")); + let _ = read_101_tx.send(()); + }); + + let (upgrades_tx, upgrades_rx) = mpsc::channel(); + let svc = service_fn(move |req: Request| { + let on_upgrade = hyper::upgrade::on(req); + let _ = upgrades_tx.send(on_upgrade); + future::ok::<_, hyper::Error>( + Response::builder() + .status(101) + .header("upgrade", "foobar") + .body(Empty::::new()) + .unwrap(), + ) + }); + + let (socket, _) = listener.accept().await.unwrap(); + let socket = TokioIo::new(socket); + + let mut conn = http1::Builder::new() + .serve_connection(socket, svc) + .with_upgrades(); + (&mut conn).await.unwrap(); + + let on_upgrade = upgrades_rx.recv().unwrap(); + + // wait so that we don't write until other side saw 101 response + read_101_rx.await.unwrap(); + + let upgraded = on_upgrade.await.expect("on_upgrade"); + let parts = upgraded.downcast::>().unwrap(); + assert_eq!(parts.read_buf, "eagerly optimistic"); + + pin!(conn); + // graceful shutdown doesn't cause issues or panic. It should be ignored after upgrade + conn.as_mut().graceful_shutdown(); +} + #[tokio::test] async fn empty_parse_eof_does_not_return_error() { let (listener, addr) = setup_tcp_listener();