Skip to content

Commit

Permalink
Use socket2 to support SO_NODELAY and SO_KEEPALIVE on incoming connec…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
alexrudy committed Apr 3, 2024
1 parent 7759661 commit f4f57d2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
4 changes: 3 additions & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ transport = [
"dep:hyper-util",
"tokio/net",
"tokio/time",
"dep:socket2",
"dep:tower",
"dep:hyper-timeout",
]
Expand Down Expand Up @@ -80,10 +81,12 @@ prost = { version = "0.12", default-features = false, features = [
async-trait = { version = "0.1.13", optional = true }

# transport
axum = { version = "0.7", default_features = false, optional = true }
h2 = { version = "0.4", optional = true }
hyper = { version = "1.0", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"], optional = true }
hyper-timeout = { version = "0.5", optional = true }
socket2 = { version = ">=0.4.7, <0.6.0", optional = true, features = ["all"] }
tokio-stream = { version = "0.1", features = ["net"] }
tower = { version = "0.4.7", default-features = false, features = [
"balance",
Expand All @@ -95,7 +98,6 @@ tower = { version = "0.4.7", default-features = false, features = [
"timeout",
"util",
], optional = true }
axum = { version = "0.7", default_features = false, optional = true }

# rustls
async-stream = { version = "0.3", optional = true }
Expand Down
33 changes: 31 additions & 2 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::transport::service::ServerIo;
use std::{
net::{SocketAddr, TcpListener as StdTcpListener},
pin::Pin,
task::{Context, Poll},
task::{ready, Context, Poll},
time::Duration,
};
use tokio::{
Expand All @@ -12,6 +12,7 @@ use tokio::{
};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::{Stream, StreamExt};
use tracing::warn;

#[cfg(not(feature = "tls"))]
pub(crate) fn tcp_incoming<IO, IE, L>(
Expand Down Expand Up @@ -195,7 +196,35 @@ impl Stream for TcpIncoming {
type Item = Result<TcpStream, std::io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
match ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Some(Ok(stream)) => {
set_accept_socketoptions(&stream, self.tcp_nodelay, self.tcp_keepalive_timeout);
Some(Ok(stream)).into()
}
other => Poll::Ready(other),
}
}
}

// Consistent with hyper-0.14, this function does not return an error.
fn set_accept_socketoptions(
stream: &TcpStream,
tcp_nodelay: bool,
tcp_keepalive_timeout: Option<Duration>,
) {
if tcp_nodelay {
if let Err(e) = stream.set_nodelay(true) {
warn!("error trying to set TCP nodelay: {}", e);
}
}

if let Some(timeout) = tcp_keepalive_timeout {
let sock_ref = socket2::SockRef::from(&stream);
let sock_keepalive = socket2::TcpKeepalive::new().with_time(timeout);

if let Err(e) = sock_ref.set_tcp_keepalive(&sock_keepalive) {
warn!("error trying to set TCP keepalive: {}", e);
}
}
}

Expand Down
21 changes: 10 additions & 11 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,16 +523,17 @@ impl<L> Server<L> {
let timeout = self.timeout;
let max_frame_size = self.max_frame_size;

// FIXME: this requires additonal implementation here.
let http2_only = !self.accept_http1;
// TODO: Reqiures support from hyper-util
let _http2_only = !self.accept_http1;

let http2_keepalive_interval = self.http2_keepalive_interval;
let http2_keepalive_timeout = self
.http2_keepalive_timeout
.unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
let http2_adaptive_window = self.http2_adaptive_window;

let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
// TODO: Requires a new release of hyper and hyper-util
let _http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;

let make_service = self.service_builder.service(svc);

Expand All @@ -547,6 +548,9 @@ impl<L> Server<L> {

let mut builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());

//TODO: Set http2-only when available in hyper_util
//builder.http2_only(http2_only);

builder
.http2()
.initial_connection_window_size(init_connection_window_size)
Expand All @@ -555,8 +559,8 @@ impl<L> Server<L> {
.keep_alive_interval(http2_keepalive_interval)
.keep_alive_timeout(http2_keepalive_timeout)
.adaptive_window(http2_adaptive_window.unwrap_or_default())
// FIXME: wait for this to be added to hyper-util
// .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
// TODO: wait for this to be added to hyper-util
//.max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
.max_frame_size(max_frame_size);

let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
Expand Down Expand Up @@ -1068,18 +1072,13 @@ where
}
}

// A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
#[pin_project]
struct Fuse<F> {
#[pin]
inner: Option<F>,
}

impl<F> Fuse<F> {
fn is_terminated(self: &Pin<&mut Self>) -> bool {
self.inner.is_none()
}
}

impl<F> Future for Fuse<F>
where
F: Future,
Expand Down

0 comments on commit f4f57d2

Please sign in to comment.