Skip to content

Commit

Permalink
Drop sendmmsg support
Browse files Browse the repository at this point in the history
As we no longer buffer multiple transmits in memory, this complexity
is unused. GSO is expected to account for most, if not all, of the
performance benefit.
  • Loading branch information
Ralith committed Apr 26, 2024
1 parent 0722280 commit ee08826
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 300 deletions.
48 changes: 18 additions & 30 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,25 @@ impl UdpSocketState {
})
}

pub fn send(&self, socket: UdpSockRef<'_>, transmits: &[Transmit]) -> io::Result<usize> {
let mut sent = 0;
for transmit in transmits {
match socket.0.send_to(
&transmit.contents,
&socket2::SockAddr::from(transmit.destination),
) {
Ok(_) => {
sent += 1;
}
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
Err(_) if sent != 0 => return Ok(sent),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
return Err(e);
}

// Other errors are ignored, since they will usually be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(&self.last_send_error, e, transmit);
sent += 1;
}
}
pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit) -> io::Result<()> {
let Err(e) = socket.0.send_to(
&transmit.contents,
&socket2::SockAddr::from(transmit.destination),
) else {
return Ok(());
};
if e.kind() == io::ErrorKind::WouldBlock {
return Err(e);
}
Ok(sent)

// Other errors are ignored, since they will usually be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(&self.last_send_error, e, transmit);
Ok(())
}

pub fn recv(
Expand Down
203 changes: 53 additions & 150 deletions quinn-udp/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl UdpSocketState {
})
}

pub fn send(&self, socket: UdpSockRef<'_>, transmits: &[Transmit]) -> io::Result<usize> {
send(self, socket.0, transmits)
pub fn send(&self, socket: UdpSockRef<'_>, transmit: &Transmit) -> io::Result<()> {
send(self, socket.0, transmit)
}

pub fn recv(
Expand Down Expand Up @@ -213,8 +213,8 @@ fn send(
#[allow(unused_variables)] // only used on Linux
state: &UdpSocketState,
io: SockRef<'_>,
transmits: &[Transmit],
) -> io::Result<usize> {
transmit: &Transmit,
) -> io::Result<()> {
#[allow(unused_mut)] // only mutable on FreeBSD
let mut encode_src_ip = true;
#[cfg(target_os = "freebsd")]
Expand All @@ -227,41 +227,22 @@ fn send(
}
}
}
let mut msgs: [libc::mmsghdr; BATCH_SIZE] = unsafe { mem::zeroed() };
let mut iovecs: [libc::iovec; BATCH_SIZE] = unsafe { mem::zeroed() };
let mut cmsgs = [cmsg::Aligned([0u8; CMSG_LEN]); BATCH_SIZE];
// This assume_init looks a bit weird because one might think it
// assumes the SockAddr data to be initialized, but that call
// refers to the whole array, which itself is made up of MaybeUninit
// containers. Their presence protects the SockAddr inside from
// being assumed as initialized by the assume_init call.
// TODO: Replace this with uninit_array once it becomes MSRV-stable
let mut addrs: [MaybeUninit<socket2::SockAddr>; BATCH_SIZE] =
unsafe { MaybeUninit::uninit().assume_init() };
for (i, transmit) in transmits.iter().enumerate().take(BATCH_SIZE) {
let dst_addr = unsafe {
ptr::write(
addrs[i].as_mut_ptr(),
socket2::SockAddr::from(transmit.destination),
);
&*addrs[i].as_ptr()
};
prepare_msg(
transmit,
dst_addr,
&mut msgs[i].msg_hdr,
&mut iovecs[i],
&mut cmsgs[i],
encode_src_ip,
state.sendmsg_einval(),
);
}
let num_transmits = transmits.len().min(BATCH_SIZE);
let mut msg_hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iovec: libc::iovec = unsafe { mem::zeroed() };
let mut cmsgs = cmsg::Aligned([0u8; CMSG_LEN]);
let dst_addr = socket2::SockAddr::from(transmit.destination);
prepare_msg(
transmit,
&dst_addr,
&mut msg_hdr,
&mut iovec,
&mut cmsgs,
encode_src_ip,
state.sendmsg_einval(),
);

loop {
let n = unsafe {
sendmmsg_with_fallback(io.as_raw_fd(), msgs.as_mut_ptr(), num_transmits as _)
};
let n = unsafe { libc::sendmsg(io.as_raw_fd(), &msg_hdr, 0) };
if n == -1 {
let e = io::Error::last_os_error();
match e.kind() {
Expand Down Expand Up @@ -301,135 +282,57 @@ fn send(
// - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
// these by automatically clamping the MTUD upper bound to the interface MTU.
if e.raw_os_error() != Some(libc::EMSGSIZE) {
log_sendmsg_error(&state.last_send_error, e, &transmits[0]);
log_sendmsg_error(&state.last_send_error, e, transmit);
}

// The ERRORS section in https://man7.org/linux/man-pages/man2/sendmmsg.2.html
// describes that errors will only be returned if no message could be transmitted
// at all. Therefore drop the first (problematic) message,
// and retry the remaining ones.
return Ok(num_transmits.min(1));
return Ok(());
}
}
}
return Ok(n as usize);
return Ok(());
}
}

#[cfg(any(target_os = "macos", target_os = "ios"))]
fn send(state: &UdpSocketState, io: SockRef<'_>, transmits: &[Transmit]) -> io::Result<usize> {
fn send(state: &UdpSocketState, io: SockRef<'_>, transmit: &Transmit) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iov: libc::iovec = unsafe { mem::zeroed() };
let mut ctrl = cmsg::Aligned([0u8; CMSG_LEN]);
let mut sent = 0;

while sent < transmits.len() {
let addr = socket2::SockAddr::from(transmits[sent].destination);
prepare_msg(
&transmits[sent],
&addr,
&mut hdr,
&mut iov,
&mut ctrl,
// Only tested on macOS and iOS
cfg!(target_os = "macos") || cfg!(target_os = "ios"),
state.sendmsg_einval(),
);
let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
if n == -1 {
let e = io::Error::last_os_error();
match e.kind() {
io::ErrorKind::Interrupted => {
// Retry the transmission
}
io::ErrorKind::WouldBlock if sent != 0 => return Ok(sent),
io::ErrorKind::WouldBlock => return Err(e),
_ => {
// Other errors are ignored, since they will usually be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
// - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
// these by automatically clamping the MTUD upper bound to the interface MTU.
if e.raw_os_error() != Some(libc::EMSGSIZE) {
log_sendmsg_error(&state.last_send_error, e, &transmits[sent]);
}
sent += 1;
let addr = socket2::SockAddr::from(transmit.destination);
prepare_msg(
transmit,
&addr,
&mut hdr,
&mut iov,
&mut ctrl,
// Only tested on macOS and iOS
cfg!(target_os = "macos") || cfg!(target_os = "ios"),
state.sendmsg_einval(),
);
let n = unsafe { libc::sendmsg(io.as_raw_fd(), &hdr, 0) };
if n == -1 {
let e = io::Error::last_os_error();
match e.kind() {
io::ErrorKind::Interrupted => {
// Retry the transmission
}
io::ErrorKind::WouldBlock => return Err(e),
_ => {
// Other errors are ignored, since they will usually be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
// - EMSGSIZE is expected for MTU probes. Future work might be able to avoid
// these by automatically clamping the MTUD upper bound to the interface MTU.
if e.raw_os_error() != Some(libc::EMSGSIZE) {
log_sendmsg_error(&state.last_send_error, e, transmit);
}
}
} else {
sent += 1;
}
}
Ok(sent)
}

/// Implementation of `sendmmsg` with a fallback
/// to `sendmsg` if syscall is not available.
///
/// It uses [`libc::syscall`] instead of [`libc::sendmmsg`]
/// to avoid linking error on systems where libc does not contain `sendmmsg`.
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
unsafe fn sendmmsg_with_fallback(
sockfd: libc::c_int,
msgvec: *mut libc::mmsghdr,
vlen: libc::c_uint,
) -> libc::c_int {
let flags = 0;

#[cfg(not(target_os = "freebsd"))]
{
let ret = libc::syscall(libc::SYS_sendmmsg, sockfd, msgvec, vlen, flags) as libc::c_int;
if ret != -1 {
return ret;
}
}

// libc on FreeBSD implements `sendmmsg` as a high-level abstraction over `sendmsg`,
// thus `SYS_sendmmsg` constant and direct system call do not exist
#[cfg(target_os = "freebsd")]
{
let ret = libc::sendmmsg(sockfd, msgvec, vlen as usize, flags) as libc::c_int;
if ret != -1 {
return ret;
}
}

let e = io::Error::last_os_error();
match e.raw_os_error() {
Some(libc::ENOSYS) => {
// Fallback to `sendmsg`.
sendmmsg_fallback(sockfd, msgvec, vlen)
}
_ => -1,
}
}

/// Fallback implementation of `sendmmsg` using `sendmsg`
/// for systems which do not support `sendmmsg`
/// such as Linux <3.0.
#[cfg(not(any(target_os = "macos", target_os = "ios")))]
unsafe fn sendmmsg_fallback(
sockfd: libc::c_int,
msgvec: *mut libc::mmsghdr,
vlen: libc::c_uint,
) -> libc::c_int {
let flags = 0;
if vlen == 0 {
return 0;
}

let n = libc::sendmsg(sockfd, &(*msgvec).msg_hdr, flags);
if n == -1 {
-1
} else {
// type of `msg_len` field differs on Linux and FreeBSD,
// it is up to the compiler to infer and cast `n` to correct type
(*msgvec).msg_len = n as _;
1
}
Ok(())
}

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
Expand Down
Loading

0 comments on commit ee08826

Please sign in to comment.