diff --git a/src/proxy/inbound.rs b/src/proxy/inbound.rs index a05844dc..9d307c9f 100644 --- a/src/proxy/inbound.rs +++ b/src/proxy/inbound.rs @@ -27,8 +27,9 @@ use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use tokio::net::{TcpListener, TcpStream}; +use tokio::time::timeout; -use tracing::{debug, error, info, instrument, trace, trace_span, warn, Instrument}; +use tracing::{debug, error, info, instrument, trace_span, warn, Instrument}; use super::connection_manager::ConnectionManager; use super::{Error, SocketFactory}; @@ -82,7 +83,6 @@ impl Inbound { } pub(super) async fn run(self) { - // let (tx, rx) = oneshot::channel(); let acceptor = InboundCertProvider { state: self.pi.state.clone(), cert_manager: self.pi.cert_manager.clone(), @@ -102,6 +102,7 @@ impl Inbound { let connection_manager = self.pi.connection_manager.clone(); let drain = sub_drain.clone(); let network = self.pi.cfg.network.clone(); + let drain_deadline = self.pi.cfg.self_termination_deadline; tokio::task::spawn(async move { let conn = Connection { src_identity, @@ -134,12 +135,29 @@ impl Inbound { match futures_util::future::select(Box::pin(drain.signaled()), serve).await { // We got a shutdown request. Start gracful shutdown and wait for the pending requests to complete. futures_util::future::Either::Left((_shutdown, mut server)) => { + debug!("inbound serve got drain {:?}", server); let drain = std::pin::Pin::new(&mut server); drain.graceful_shutdown(); - server.await + // There are scenarios where the http2 server never resolves after + // `graceful_shutdown`, which will hang the whole task. + // + // This seems to be a hyper bug, but either way, it's safer to have a deadline. + let timeout_res = timeout(drain_deadline, server).await; + let res = match timeout_res { + Ok(res) => res, + Err(e) => { + error!("inbound serve drain err: {e}"); + Ok(()) + }, + }; + debug!("inbound serve drain done"); + res } // Serving finished, just return the result. - futures_util::future::Either::Right((server, _shutdown)) => server, + futures_util::future::Either::Right((server, _shutdown)) => { + debug!("inbound serve done {:?}", server); + server + } } }); } @@ -163,16 +181,15 @@ impl Inbound { rbac_ctx: crate::state::ProxyRbacContext, ) -> Result<(), std::io::Error> { let start = Instant::now(); - let stream = super::freebind_connect(orig_src, addr, socket_factory).await; - match stream { + let out_stream = super::freebind_connect(orig_src, addr, socket_factory).await; + match out_stream { Err(err) => { warn!(dur=?start.elapsed(), "connection to {} failed: {}", addr, err); Err(err) } - Ok(stream) => { - let mut stream = stream; + Ok(mut stream) => { stream.set_nodelay(true)?; - trace!(dur=?start.elapsed(), "connected to: {addr}"); + debug!(dur=?start.elapsed(), "connected to: {addr}"); tokio::task::spawn( (async move { let close = match connection_manager.track(&rbac_ctx).await {