Skip to content

Commit

Permalink
Deadline on inbound "graceful shutdown"
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
  • Loading branch information
bleggett committed Apr 5, 2024
1 parent ec22ac4 commit 4b15661
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};

use tokio::net::{TcpListener, TcpStream};
use tokio::time::timeout;

use tracing::{debug, error, info, instrument, trace, trace_span, warn, Instrument};
use tracing::{debug, error, info, instrument, trace_span, warn, Instrument};

use super::connection_manager::ConnectionManager;
use super::{Error, SocketFactory};
Expand Down Expand Up @@ -82,7 +83,6 @@ impl Inbound {
}

pub(super) async fn run(self) {
// let (tx, rx) = oneshot::channel();
let acceptor = InboundCertProvider {
state: self.pi.state.clone(),
cert_manager: self.pi.cert_manager.clone(),
Expand All @@ -102,6 +102,7 @@ impl Inbound {
let connection_manager = self.pi.connection_manager.clone();
let drain = sub_drain.clone();
let network = self.pi.cfg.network.clone();
let drain_deadline = self.pi.cfg.self_termination_deadline;
tokio::task::spawn(async move {
let conn = Connection {
src_identity,
Expand Down Expand Up @@ -134,12 +135,29 @@ impl Inbound {
match futures_util::future::select(Box::pin(drain.signaled()), serve).await {
// We got a shutdown request. Start gracful shutdown and wait for the pending requests to complete.
futures_util::future::Either::Left((_shutdown, mut server)) => {
debug!("inbound serve got drain {:?}", server);
let drain = std::pin::Pin::new(&mut server);
drain.graceful_shutdown();
server.await
// There are scenarios where the http2 server never resolves after
// `graceful_shutdown`, which will hang the whole task.
//
// This seems to be a hyper bug, but either way, it's safer to have a deadline.
let timeout_res = timeout(drain_deadline, server).await;
let res = match timeout_res {
Ok(res) => res,
Err(e) => {
error!("inbound serve drain err: {e}");
Ok(())
},
};
debug!("inbound serve drain done");
res
}
// Serving finished, just return the result.
futures_util::future::Either::Right((server, _shutdown)) => server,
futures_util::future::Either::Right((server, _shutdown)) => {
debug!("inbound serve done {:?}", server);
server
}
}
});
}
Expand All @@ -163,16 +181,15 @@ impl Inbound {
rbac_ctx: crate::state::ProxyRbacContext,
) -> Result<(), std::io::Error> {
let start = Instant::now();
let stream = super::freebind_connect(orig_src, addr, socket_factory).await;
match stream {
let out_stream = super::freebind_connect(orig_src, addr, socket_factory).await;
match out_stream {
Err(err) => {
warn!(dur=?start.elapsed(), "connection to {} failed: {}", addr, err);
Err(err)
}
Ok(stream) => {
let mut stream = stream;
Ok(mut stream) => {
stream.set_nodelay(true)?;
trace!(dur=?start.elapsed(), "connected to: {addr}");
debug!(dur=?start.elapsed(), "connected to: {addr}");
tokio::task::spawn(
(async move {
let close = match connection_manager.track(&rbac_ctx).await {
Expand Down

0 comments on commit 4b15661

Please sign in to comment.