From 3501592cc91809b6dece5b98284cf381a94cfc68 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 20:06:19 +0000 Subject: [PATCH 1/7] Started work on graceful shutdown Some error messages, one about hyper::server::Builder not existing and another about lack of clone on futures. Just putting this up as an initial WIP to make sure I'm on the right track :) --- tonic/src/transport/server.rs | 38 ++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 02701b44a..6a7126cfe 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -56,6 +56,7 @@ pub struct Server { init_connection_window_size: Option, max_concurrent_streams: Option, tcp_keepalive: Option, + shutdown: Option>>, tcp_nodelay: bool, } @@ -110,6 +111,14 @@ impl Server { } } + /// Set the graceful shutdown thing + pub fn graceful_shutdown(self, shutdown: Box>) -> Self { + Server { + shutdown: Some(shutdown), + ..self + } + } + // FIXME: tower-timeout currentlly uses `From` instead of `Into` for the error // so our services do not align. // pub fn timeout(&mut self, timeout: Duration) -> &mut Self { @@ -268,15 +277,26 @@ impl Server { concurrency_limit, // timeout, }; - - hyper::Server::builder(incoming) - .http2_only(true) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .serve(svc) - .await - .map_err(map_err)?; + if let Some(rx) = self.shutdown { + hyper::server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) + .serve(svc) + .with_graceful_shutdown(rx) + .await + .map_err(map_err)?; + } else { + hyper::server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) + .serve(svc) + .await + .map_err(map_err)?; + } Ok(()) } From 70ec64ded3819da47f22033fcc239137eeec6bae Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 20:21:19 +0000 Subject: [PATCH 2/7] Fix issues and respond to feedback Also fill in docs --- tonic/src/transport/server.rs | 53 +++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 6a7126cfe..1e3eee8f9 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -56,7 +56,7 @@ pub struct Server { init_connection_window_size: Option, max_concurrent_streams: Option, tcp_keepalive: Option, - shutdown: Option>>, + shutdown: Option>>, tcp_nodelay: bool, } @@ -111,10 +111,22 @@ impl Server { } } - /// Set the graceful shutdown thing - pub fn graceful_shutdown(self, shutdown: Box>) -> Self { + /// Prepares the server to handle graceful shutdown when the future completes + /// + /// ``` + /// # use tonic::transport::Server; + /// # use tower_service::Service; + /// # let mut builder = Server::builder(); + /// let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + /// builder.graceful_shutdown(async { + /// rx.await.ok(); + /// }); + /// + /// let _ = tx.send(()); + /// ``` + pub fn graceful_shutdown>(self, shutdown: F) -> Self { Server { - shutdown: Some(shutdown), + shutdown: Some(Arc::new(shutdown)), ..self } } @@ -277,26 +289,19 @@ impl Server { concurrency_limit, // timeout, }; - if let Some(rx) = self.shutdown { - hyper::server::builder(incoming) - .http2_only(true) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .serve(svc) - .with_graceful_shutdown(rx) - .await - .map_err(map_err)?; - } else { - hyper::server::builder(incoming) - .http2_only(true) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .serve(svc) - .await - .map_err(map_err)?; - } + let serve_fut = hyper::server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) + .serve(svc); + + let final_fut = match self.shutdown { + Some(rx) => serve_fut.with_graceful_shutdown(rx), + None => serve_fut, + }; + + final_fut.await.map_err(map_err)?; Ok(()) } From 2346ab01e308c6ed61e2e8657ecb0b76aad195c6 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 21:12:38 +0000 Subject: [PATCH 3/7] Update tonic/src/transport/server.rs Co-Authored-By: Lucio Franco --- tonic/src/transport/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 1e3eee8f9..06bba6e00 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -289,6 +289,7 @@ impl Server { concurrency_limit, // timeout, }; + let serve_fut = hyper::server::builder(incoming) .http2_only(true) .http2_initial_connection_window_size(init_connection_window_size) From c7b2c934198929c3ef3f6cb0ece98862e2d26b33 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 21:35:13 +0000 Subject: [PATCH 4/7] Tried a different approach Issues wth sizes I can't know the size of so moved to try something else --- tonic/src/transport/server.rs | 42 ++++++++++------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 1e3eee8f9..a4099a45e 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -56,7 +56,6 @@ pub struct Server { init_connection_window_size: Option, max_concurrent_streams: Option, tcp_keepalive: Option, - shutdown: Option>>, tcp_nodelay: bool, } @@ -111,26 +110,6 @@ impl Server { } } - /// Prepares the server to handle graceful shutdown when the future completes - /// - /// ``` - /// # use tonic::transport::Server; - /// # use tower_service::Service; - /// # let mut builder = Server::builder(); - /// let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - /// builder.graceful_shutdown(async { - /// rx.await.ok(); - /// }); - /// - /// let _ = tx.send(()); - /// ``` - pub fn graceful_shutdown>(self, shutdown: F) -> Self { - Server { - shutdown: Some(Arc::new(shutdown)), - ..self - } - } - // FIXME: tower-timeout currentlly uses `From` instead of `Into` for the error // so our services do not align. // pub fn timeout(&mut self, timeout: Duration) -> &mut Self { @@ -243,11 +222,12 @@ impl Server { Router::new(self.clone(), svc) } - pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> + pub(crate) async fn serve(self, addr: SocketAddr, svc: S, shutdown_signal: Option) -> Result<(), super::Error> where S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, + F: Future, { let interceptor = self.interceptor.clone(); let concurrency_limit = self.concurrency_limit; @@ -289,19 +269,17 @@ impl Server { concurrency_limit, // timeout, }; - let serve_fut = hyper::server::builder(incoming) + let serve_fut = hyper::Server::builder(incoming) .http2_only(true) .http2_initial_connection_window_size(init_connection_window_size) .http2_initial_stream_window_size(init_stream_window_size) .http2_max_concurrent_streams(max_concurrent_streams) .serve(svc); - let final_fut = match self.shutdown { - Some(rx) => serve_fut.with_graceful_shutdown(rx), - None => serve_fut, - }; - - final_fut.await.map_err(map_err)?; + match shutdown_signal { + Some(rx) => serve_fut.with_graceful_shutdown(rx).await, + None => serve_fut.await, + }.map_err(map_err)?; Ok(()) } @@ -371,7 +349,11 @@ where /// /// [`Server`]: struct.Server.html pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> { - self.server.serve(addr, self.routes).await + self.server.serve(addr, self.routes, None::>).await + } + + pub async fn with_graceful_shutdown>(self, addr: SocketAddr, signal: F) -> Result<(), super::Error> { + self.server.serve(addr, self.routes, Some(signal)).await } } From b0d7c1d86c33039c9fc732ceca8cc18eb7df245a Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 22:01:56 +0000 Subject: [PATCH 5/7] Moved to a Pinned Box Going back to the old way but now with pins and boxes. Issues with Option requiring Clone and Unpin for dyn Future --- tonic/src/transport/server.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index a4099a45e..7a8c1db15 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -56,6 +56,7 @@ pub struct Server { init_connection_window_size: Option, max_concurrent_streams: Option, tcp_keepalive: Option, + shutdown_signal: Option>>>, tcp_nodelay: bool, } @@ -110,6 +111,13 @@ impl Server { } } + pub fn graceful_shutdown(self, f: impl Future) -> Self { + Server { + shutdown_signal: Some(Pin::new(Box::new(f))), + ..self + } + } + // FIXME: tower-timeout currentlly uses `From` instead of `Into` for the error // so our services do not align. // pub fn timeout(&mut self, timeout: Duration) -> &mut Self { @@ -222,12 +230,11 @@ impl Server { Router::new(self.clone(), svc) } - pub(crate) async fn serve(self, addr: SocketAddr, svc: S, shutdown_signal: Option) -> Result<(), super::Error> + pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> where S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, - F: Future, { let interceptor = self.interceptor.clone(); let concurrency_limit = self.concurrency_limit; @@ -276,10 +283,11 @@ impl Server { .http2_max_concurrent_streams(max_concurrent_streams) .serve(svc); - match shutdown_signal { + match self.shutdown_signal { Some(rx) => serve_fut.with_graceful_shutdown(rx).await, None => serve_fut.await, - }.map_err(map_err)?; + } + .map_err(map_err)?; Ok(()) } @@ -349,11 +357,7 @@ where /// /// [`Server`]: struct.Server.html pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> { - self.server.serve(addr, self.routes, None::>).await - } - - pub async fn with_graceful_shutdown>(self, addr: SocketAddr, signal: F) -> Result<(), super::Error> { - self.server.serve(addr, self.routes, Some(signal)).await + self.server.serve(addr, self.routes).await } } From 973ab61f99c0db948229583228e3ac6c0501a7b0 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Mon, 9 Dec 2019 22:41:11 +0000 Subject: [PATCH 6/7] Move to serve and serve_with_shutdown --- tonic/src/transport/server.rs | 86 ++++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index 7a8c1db15..b53a514d1 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -56,7 +56,6 @@ pub struct Server { init_connection_window_size: Option, max_concurrent_streams: Option, tcp_keepalive: Option, - shutdown_signal: Option>>>, tcp_nodelay: bool, } @@ -111,13 +110,6 @@ impl Server { } } - pub fn graceful_shutdown(self, f: impl Future) -> Self { - Server { - shutdown_signal: Some(Pin::new(Box::new(f))), - ..self - } - } - // FIXME: tower-timeout currentlly uses `From` instead of `Into` for the error // so our services do not align. // pub fn timeout(&mut self, timeout: Duration) -> &mut Self { @@ -230,14 +222,7 @@ impl Server { Router::new(self.clone(), svc) } - pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> - where - S: Service, Response = Response> + Clone + Send + 'static, - S::Future: Send + 'static, - S::Error: Into + Send, - { - let interceptor = self.interceptor.clone(); - let concurrency_limit = self.concurrency_limit; + fn serve_common(self, addr: SocketAddr) -> hyper::server::Builder { let init_connection_window_size = self.init_connection_window_size; let init_stream_window_size = self.init_stream_window_size; let max_concurrent_streams = self.max_concurrent_streams; @@ -269,6 +254,21 @@ impl Server { } }, ); + hyper::Server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) + } + + pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> + where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, + { + let interceptor = self.interceptor.clone(); + let concurrency_limit = self.concurrency_limit; let svc = MakeSvc { inner: svc, @@ -276,18 +276,37 @@ impl Server { concurrency_limit, // timeout, }; - let serve_fut = hyper::Server::builder(incoming) - .http2_only(true) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - .serve(svc); + self.serve_common(addr).serve(svc).await.map_err(map_err)?; - match self.shutdown_signal { - Some(rx) => serve_fut.with_graceful_shutdown(rx).await, - None => serve_fut.await, - } - .map_err(map_err)?; + Ok(()) + } + + pub(crate) async fn serve_with_shutdown( + self, + addr: SocketAddr, + svc: S, + signal: F, + ) -> Result<(), super::Error> + where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, + F: Future, + { + let interceptor = self.interceptor.clone(); + let concurrency_limit = self.concurrency_limit; + + let svc = MakeSvc { + inner: svc, + interceptor, + concurrency_limit, + // timeout, + }; + self.serve_common(addr) + .serve(svc) + .with_graceful_shutdown(signal) + .await + .map_err(map_err)?; Ok(()) } @@ -359,6 +378,19 @@ where pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> { self.server.serve(addr, self.routes).await } + + /// Consume this [`Server`] creating a future that will execute the server + /// on [`tokio`]'s default executor. And shutdown when the provided signal + /// is received. + /// + /// [`Server`]: struct.Server.html + pub async fn serve_with_shutdown>( + self, + addr: SocketAddr, + f: F, + ) -> Result<(), super::Error> { + self.server.serve_with_shutdown(addr, self.routes, f).await + } } fn map_err(e: impl Into) -> super::Error { From fba82a0a9c32552cf58ca32aea29c18946d16a82 Mon Sep 17 00:00:00 2001 From: xd009642 Date: Wed, 11 Dec 2019 20:24:14 +0000 Subject: [PATCH 7/7] stop trying to be smart, just work Can't get past generic type restricts on async functions so just went for the code duplication approach in the interest of getting something working --- tonic/src/transport/server.rs | 72 ++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/tonic/src/transport/server.rs b/tonic/src/transport/server.rs index b53a514d1..a16657440 100644 --- a/tonic/src/transport/server.rs +++ b/tonic/src/transport/server.rs @@ -222,7 +222,14 @@ impl Server { Router::new(self.clone(), svc) } - fn serve_common(self, addr: SocketAddr) -> hyper::server::Builder { + pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> + where + S: Service, Response = Response> + Clone + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + Send, + { + let interceptor = self.interceptor.clone(); + let concurrency_limit = self.concurrency_limit; let init_connection_window_size = self.init_connection_window_size; let init_stream_window_size = self.init_stream_window_size; let max_concurrent_streams = self.max_concurrent_streams; @@ -254,29 +261,21 @@ impl Server { } }, ); - hyper::Server::builder(incoming) - .http2_only(true) - .http2_initial_connection_window_size(init_connection_window_size) - .http2_initial_stream_window_size(init_stream_window_size) - .http2_max_concurrent_streams(max_concurrent_streams) - } - - pub(crate) async fn serve(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> - where - S: Service, Response = Response> + Clone + Send + 'static, - S::Future: Send + 'static, - S::Error: Into + Send, - { - let interceptor = self.interceptor.clone(); - let concurrency_limit = self.concurrency_limit; - let svc = MakeSvc { inner: svc, interceptor, concurrency_limit, // timeout, }; - self.serve_common(addr).serve(svc).await.map_err(map_err)?; + + hyper::Server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) + .serve(svc) + .await + .map_err(map_err)?; Ok(()) } @@ -295,6 +294,37 @@ impl Server { { let interceptor = self.interceptor.clone(); let concurrency_limit = self.concurrency_limit; + let init_connection_window_size = self.init_connection_window_size; + let init_stream_window_size = self.init_stream_window_size; + let max_concurrent_streams = self.max_concurrent_streams; + // let timeout = self.timeout.clone(); + + let incoming = hyper::server::accept::from_stream::<_, _, crate::Error>( + async_stream::try_stream! { + let mut tcp = TcpIncoming::bind(addr)? + .set_nodelay(self.tcp_nodelay) + .set_keepalive(self.tcp_keepalive); + + while let Some(stream) = tcp.try_next().await? { + #[cfg(feature = "tls")] + { + if let Some(tls) = &self.tls { + let io = match tls.connect(stream.into_inner()).await { + Ok(io) => io, + Err(error) => { + error!(message = "Unable to accept incoming connection.", %error); + continue + }, + }; + yield BoxedIo::new(io); + continue; + } + } + + yield BoxedIo::new(stream); + } + }, + ); let svc = MakeSvc { inner: svc, @@ -302,7 +332,11 @@ impl Server { concurrency_limit, // timeout, }; - self.serve_common(addr) + hyper::Server::builder(incoming) + .http2_only(true) + .http2_initial_connection_window_size(init_connection_window_size) + .http2_initial_stream_window_size(init_stream_window_size) + .http2_max_concurrent_streams(max_concurrent_streams) .serve(svc) .with_graceful_shutdown(signal) .await