Skip to content

Commit

Permalink
add MPTCP socket protocol (optional) (#466)
Browse files Browse the repository at this point in the history
Co-authored-by: Rob Ede <robjtede@icloud.com>
  • Loading branch information
Martichou and robjtede authored Jul 17, 2023
1 parent 8d5d1db commit 755b231
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
1 change: 1 addition & 0 deletions actix-server/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased - 2023-xx-xx

- Add support for MultiPath TCP (MPTCP) with `MpTcp` enum and `ServerBuilder::mptcp()` method.
- Minimum supported Rust version (MSRV) is now 1.65.

## 2.2.0 - 2022-12-21
Expand Down
41 changes: 39 additions & 2 deletions actix-server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,30 @@ use crate::{
Server,
};

/// Multipath TCP (MPTCP) preference.
///
/// Also see [`ServerBuilder::mptcp()`].
#[derive(Debug, Clone)]
pub enum MpTcp {
/// MPTCP will not be used when binding sockets.
Disabled,

/// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
/// attempted, too.
TcpFallback,

/// MPTCP will be used when binding sockets (with no fallback).
NoFallback,
}

/// [Server] builder.
pub struct ServerBuilder {
pub(crate) threads: usize,
pub(crate) token: usize,
pub(crate) backlog: u32,
pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(crate) sockets: Vec<(usize, String, MioListener)>,
pub(crate) mptcp: MpTcp,
pub(crate) exit: bool,
pub(crate) listen_os_signals: bool,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
Expand All @@ -43,6 +60,7 @@ impl ServerBuilder {
factories: Vec::new(),
sockets: Vec::new(),
backlog: 2048,
mptcp: MpTcp::Disabled,
exit: false,
listen_os_signals: true,
cmd_tx,
Expand Down Expand Up @@ -96,6 +114,24 @@ impl ServerBuilder {
self
}

/// Sets MultiPath TCP (MPTCP) preference on bound sockets.
///
/// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
/// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
/// for more info about MPTCP itself.
///
/// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
/// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
///
/// This method will have no effect if called after a `bind()`.
///
/// [mptcp.dev]: https://www.mptcp.dev
#[cfg(target_os = "linux")]
pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
self.mptcp = mptcp_enabled;
self
}

/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is reached for
Expand Down Expand Up @@ -144,7 +180,7 @@ impl ServerBuilder {
U: ToSocketAddrs,
N: AsRef<str>,
{
let sockets = bind_addr(addr, self.backlog)?;
let sockets = bind_addr(addr, self.backlog, &self.mptcp)?;

trace!("binding server to: {:?}", &sockets);

Expand Down Expand Up @@ -260,13 +296,14 @@ impl ServerBuilder {
pub(super) fn bind_addr<S: ToSocketAddrs>(
addr: S,
backlog: u32,
mptcp: &MpTcp,
) -> io::Result<Vec<MioTcpListener>> {
let mut opt_err = None;
let mut success = false;
let mut sockets = Vec::new();

for addr in addr.to_socket_addrs()? {
match create_mio_tcp_listener(addr, backlog) {
match create_mio_tcp_listener(addr, backlog, mptcp) {
Ok(lst) => {
success = true;
sockets.push(lst);
Expand Down
5 changes: 4 additions & 1 deletion actix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ mod worker;
#[doc(hidden)]
pub use self::socket::FromStream;
pub use self::{
builder::ServerBuilder, handle::ServerHandle, server::Server, service::ServerServiceFactory,
builder::{MpTcp, ServerBuilder},
handle::ServerHandle,
server::Server,
service::ServerServiceFactory,
test_server::TestServer,
};

Expand Down
26 changes: 24 additions & 2 deletions actix-server/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub(crate) use {
mio::net::UnixListener as MioUnixListener, std::os::unix::net::UnixListener as StdUnixListener,
};

use crate::builder::MpTcp;

pub(crate) enum MioListener {
Tcp(MioTcpListener),
#[cfg(unix)]
Expand Down Expand Up @@ -223,10 +225,30 @@ mod unix_impl {
pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr,
backlog: u32,
mptcp: &MpTcp,
) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type};

let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
#[cfg(not(target_os = "linux"))]
let protocol = Protocol::TCP;
#[cfg(target_os = "linux")]
let protocol = if matches!(mptcp, MpTcp::Disabled) {
Protocol::TCP
} else {
Protocol::MPTCP
};

let socket = match Socket::new(Domain::for_address(addr), Type::STREAM, Some(protocol)) {
Ok(sock) => sock,

Err(err) if matches!(mptcp, MpTcp::TcpFallback) => {
tracing::warn!("binding socket as MPTCP failed: {err}");
tracing::warn!("falling back to TCP");
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?
}

Err(err) => return Err(err),
};

socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
Expand All @@ -247,7 +269,7 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080");

let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let lst = create_mio_tcp_listener(addr, 128).unwrap();
let lst = create_mio_tcp_listener(addr, 128, &MpTcp::Disabled).unwrap();
let lst = MioListener::Tcp(lst);
assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1"));
Expand Down

0 comments on commit 755b231

Please sign in to comment.