Skip to content

Commit

Permalink
feat: expose tcp_nodelay for clients and servers (#145)
Browse files Browse the repository at this point in the history
* expose tcp_nodelay

* do not depend on difference versions of rand
  • Loading branch information
alce authored and LucioFranco committed Dec 6, 2019
1 parent 6b43f63 commit 0eb9991
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tonic-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ tower = { git = "https://github.com/tower-rs/tower" }
# Required for routeguide
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7.2"
rand = "0.6"

# Required for wellknown types
prost-types = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ rustls-native-certs = { version = "0.1", optional = true }
[dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
static_assertions = "1.0"
rand = "0.7.2"
rand = "0.6"
criterion = "0.3"

[package.metadata.docs.rs]
Expand Down
10 changes: 10 additions & 0 deletions tonic/src/transport/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Endpoint {
pub(super) init_stream_window_size: Option<u32>,
pub(super) init_connection_window_size: Option<u32>,
pub(super) tcp_keepalive: Option<Duration>,
pub(super) tcp_nodelay: bool,
}

impl Endpoint {
Expand Down Expand Up @@ -171,6 +172,14 @@ impl Endpoint {
}
}

/// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Endpoint {
tcp_nodelay: enabled,
..self
}
}

/// Create a channel from this config.
pub async fn connect(&self) -> Result<Channel, super::Error> {
Channel::connect(self.clone()).await
Expand All @@ -191,6 +200,7 @@ impl From<Uri> for Endpoint {
init_stream_window_size: None,
init_connection_window_size: None,
tcp_keepalive: None,
tcp_nodelay: true,
}
}
}
Expand Down
36 changes: 28 additions & 8 deletions tonic/src/transport/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub struct Server {
init_connection_window_size: Option<u32>,
max_concurrent_streams: Option<u32>,
tcp_keepalive: Option<Duration>,
tcp_nodelay: bool,
}

/// A stack based `Service` router.
Expand All @@ -77,7 +78,10 @@ pub trait ServiceName {
impl Server {
/// Create a new server builder that can configure a [`Server`].
pub fn builder() -> Self {
Default::default()
Server {
tcp_nodelay: true,
..Default::default()
}
}
}

Expand Down Expand Up @@ -164,6 +168,14 @@ impl Server {
}
}

/// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Server {
tcp_nodelay: enabled,
..self
}
}

/// Intercept the execution of gRPC methods.
///
/// ```
Expand Down Expand Up @@ -221,12 +233,13 @@ impl Server {
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
let max_concurrent_streams = self.max_concurrent_streams;
let tcp_keepalive = self.tcp_keepalive;
// let timeout = self.timeout.clone();

let incoming = hyper::server::accept::from_stream::<_, _, crate::Error>(
async_stream::try_stream! {
let mut tcp = TcpIncoming::bind(addr, tcp_keepalive)?;
let mut tcp = TcpIncoming::bind(addr)?
.set_nodelay(self.tcp_nodelay)
.set_keepalive(self.tcp_keepalive);

while let Some(stream) = tcp.try_next().await? {
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -418,13 +431,20 @@ struct TcpIncoming {
}

impl TcpIncoming {
fn bind(addr: SocketAddr, tcp_keepalive: Option<Duration>) -> Result<Self, crate::Error> {
let mut inner = conn::AddrIncoming::bind(&addr).map_err(Box::new)?;
inner.set_nodelay(true);
inner.set_keepalive(tcp_keepalive);

fn bind(addr: SocketAddr) -> Result<Self, crate::Error> {
let inner = conn::AddrIncoming::bind(&addr).map_err(Box::new)?;
Ok(Self { inner })
}

fn set_nodelay(mut self, enabled: bool) -> Self {
self.inner.set_nodelay(enabled);
self
}

fn set_keepalive(mut self, tcp_keepalive: Option<Duration>) -> Self {
self.inner.set_keepalive(tcp_keepalive);
self
}
}

impl Stream for TcpIncoming {
Expand Down
8 changes: 6 additions & 2 deletions tonic/src/transport/service/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ pub(crate) struct Connection {
impl Connection {
pub(crate) async fn new(endpoint: Endpoint) -> Result<Self, crate::Error> {
#[cfg(feature = "tls")]
let connector = connector(endpoint.tls.clone(), endpoint.tcp_keepalive);
let connector = connector(endpoint.tls.clone())
.set_keepalive(endpoint.tcp_keepalive)
.set_nodelay(endpoint.tcp_nodelay);

#[cfg(not(feature = "tls"))]
let connector = connector(endpoint.tcp_keepalive);
let connector = connector()
.set_keepalive(endpoint.tcp_keepalive)
.set_nodelay(endpoint.tcp_nodelay);

let settings = Builder::new()
.http2_initial_stream_window_size(endpoint.init_stream_window_size)
Expand Down
42 changes: 30 additions & 12 deletions tonic/src/transport/service/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ use tower_make::MakeConnection;
use tower_service::Service;

#[cfg(not(feature = "tls"))]
pub(crate) fn connector(tcp_keepalive: Option<Duration>) -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);
http.set_keepalive(tcp_keepalive);
http
pub(crate) fn connector() -> Connector {
Connector::new()
}

#[cfg(feature = "tls")]
pub(crate) fn connector(tls: Option<TlsConnector>, tcp_keepalive: Option<Duration>) -> Connector {
Connector::new(tls, tcp_keepalive)
pub(crate) fn connector(tls: Option<TlsConnector>) -> Connector {
Connector::new(tls)
}

pub(crate) struct Connector {
Expand All @@ -31,13 +27,35 @@ pub(crate) struct Connector {
}

impl Connector {
#[cfg(not(feature = "tls"))]
pub(crate) fn new() -> Self {
Self {
http: Self::http_connector(),
}
}

#[cfg(feature = "tls")]
pub(crate) fn new(tls: Option<TlsConnector>, tcp_keepalive: Option<Duration>) -> Self {
fn new(tls: Option<TlsConnector>) -> Self {
Self {
http: Self::http_connector(),
tls,
}
}

pub(crate) fn set_nodelay(mut self, enabled: bool) -> Self {
self.http.set_nodelay(enabled);
self
}

pub(crate) fn set_keepalive(mut self, duration: Option<Duration>) -> Self {
self.http.set_keepalive(duration);
self
}

fn http_connector() -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);
http.set_keepalive(tcp_keepalive);
Self { http, tls }
http
}
}

Expand Down

0 comments on commit 0eb9991

Please sign in to comment.