Skip to content

Commit

Permalink
Added support for grpc max_connection_age (#1865)
Browse files Browse the repository at this point in the history
- changes after review
  • Loading branch information
ionut-slaveanu authored Aug 27, 2024
1 parent 3d804e9 commit 75e5201
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use http::{Request, Response};
use http_body_util::BodyExt;
use hyper::{body::Incoming, service::Service as HyperService};
use pin_project::pin_project;
use std::future::pending;
use std::{
convert::Infallible,
fmt,
Expand All @@ -57,6 +58,7 @@ use std::{
time::Duration,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::sleep;
use tokio_stream::Stream;
use tower::{
layer::util::{Identity, Stack},
Expand Down Expand Up @@ -99,6 +101,7 @@ pub struct Server<L = Identity> {
max_frame_size: Option<u32>,
accept_http1: bool,
service_builder: ServiceBuilder<L>,
max_connection_age: Option<Duration>,
}

impl Default for Server<Identity> {
Expand All @@ -122,6 +125,7 @@ impl Default for Server<Identity> {
max_frame_size: None,
accept_http1: false,
service_builder: Default::default(),
max_connection_age: None,
}
}
}
Expand Down Expand Up @@ -234,6 +238,27 @@ impl<L> Server<L> {
}
}

/// Sets the maximum time option in milliseconds that a connection may exist
///
/// Default is no limit (`None`).
///
/// # Example
///
/// ```
/// # use tonic::transport::Server;
/// # use tower_service::Service;
/// # use std::time::Duration;
/// # let builder = Server::builder();
/// builder.max_connection_age(Duration::from_secs(60));
/// ```
#[must_use]
pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
Server {
max_connection_age: Some(max_connection_age),
..self
}
}

/// Set whether HTTP2 Ping frames are enabled on accepted connections.
///
/// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
Expand Down Expand Up @@ -498,6 +523,7 @@ impl<L> Server<L> {
http2_max_header_list_size: self.http2_max_header_list_size,
max_frame_size: self.max_frame_size,
accept_http1: self.accept_http1,
max_connection_age: self.max_connection_age,
}
}

Expand Down Expand Up @@ -538,6 +564,7 @@ impl<L> Server<L> {
.unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
let http2_adaptive_window = self.http2_adaptive_window;
let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
let max_connection_age = self.max_connection_age;

let svc = self.service_builder.service(svc);

Expand Down Expand Up @@ -619,7 +646,7 @@ impl<L> Server<L> {
let hyper_io = TokioIo::new(io);
let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(boxed)));

serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()));
serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age);
}
}
}
Expand Down Expand Up @@ -647,6 +674,7 @@ fn serve_connection<B, IO, S, E>(
hyper_svc: S,
builder: ConnectionBuilder<E>,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
max_connection_age: Option<Duration>,
) where
B: http_body::Body + Send + 'static,
B::Data: Send,
Expand All @@ -665,6 +693,9 @@ fn serve_connection<B, IO, S, E>(

let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));

let sleep = sleep_or_pending(max_connection_age);
tokio::pin!(sleep);

loop {
tokio::select! {
rv = &mut conn => {
Expand All @@ -673,6 +704,10 @@ fn serve_connection<B, IO, S, E>(
}
break;
},
_ = &mut sleep => {
conn.as_mut().graceful_shutdown();
sleep.set(sleep_or_pending(None));
},
_ = &mut sig => {
conn.as_mut().graceful_shutdown();
}
Expand All @@ -685,6 +720,13 @@ fn serve_connection<B, IO, S, E>(
});
}

async fn sleep_or_pending(wait_for: Option<Duration>) {
match wait_for {
Some(wait) => sleep(wait).await,
None => pending().await,
};
}

impl<L> Router<L> {
pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
Self { server, routes }
Expand Down

0 comments on commit 75e5201

Please sign in to comment.