Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sendmmsg() / recvmmsg() #1017

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fc23427
Add sendmmsg() / recvmmsg()
dholroyd Jan 23, 2019
ba40139
Add missing cfg gate on the impl
dholroyd Jan 23, 2019
83dc6c2
std::ptr is already in scope, so use that
dholroyd Jan 23, 2019
1e2dd8d
Use timespec directly, don't convert Duration
dholroyd Jan 23, 2019
898baa2
Use mem::uninitialized for mmsghdr
dholroyd Jan 23, 2019
21f6644
Use convention for msg_iovlen in place elsewhere
dholroyd Jan 23, 2019
0b571b7
Make imports conditional
dholroyd Jan 23, 2019
ada4296
Minimal docs
dholroyd Jan 23, 2019
65b2ae0
Add PR link
dholroyd Jan 24, 2019
a4ade88
Add msg_name / msg_control support
dholroyd Jan 25, 2019
defb24a
Don's assume int size
dholroyd Jan 25, 2019
b2ffef3
Reformat long lines
dholroyd Jan 25, 2019
22c9a90
mmsg on additional targets
dholroyd Jan 25, 2019
8b5d16a
More cmsg
dholroyd Jan 26, 2019
a61d2ef
Remove threading from test
dholroyd Jan 26, 2019
dadfe3f
Deduplicate
dholroyd Jan 26, 2019
de7df0e
mmsg not supported on android it seems
dholroyd Jan 26, 2019
1e79bf0
Might be u32 rather than usize on some targets
dholroyd Jan 26, 2019
4a3b203
Add missing 'use'
dholroyd Jan 26, 2019
ca4611d
Remove freebsd/netbsd again since the build fails
dholroyd Jan 26, 2019
c709020
Add freebsd/netbsd back, having added libc support
dholroyd Apr 1, 2019
8182cb3
mmsg bits for android
dholroyd Apr 1, 2019
481c286
Catch up with latest nix API changes
dholroyd Apr 1, 2019
2dc8647
Need cfg(android) on imports too
dholroyd Apr 1, 2019
5490901
Remove non-working cmsg testing
dholroyd Apr 1, 2019
66b9292
Fix soundness issue due to missing lifetime
dholroyd Apr 7, 2019
52b081d
Fix some clippy lints
dholroyd Apr 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).
([#969](https://github.com/nix-rust/nix/pull/969))
- Add several errno constants from OpenBSD 6.2
([#1036](https://github.com/nix-rust/nix/pull/1036))
- Added support for sendmmsg() / recvmmsg() on Linux
dholroyd marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is blisfully out-of-date. You can remove the "on Linux" part.

([#1017](https://github.com/nix-rust/nix/pull/1017))

### Changed
- `PollFd` event flags renamed to `PollFlags` ([#1024](https://github.com/nix-rust/nix/pull/1024/))
Expand Down
23 changes: 23 additions & 0 deletions src/sys/socket/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,29 @@ impl SockAddr {
SockAddr::Link(LinkAddr(ref ether_addr)) => (mem::transmute(ether_addr), mem::size_of::<libc::sockaddr_dl>() as libc::socklen_t),
}
}

pub unsafe fn as_ffi_pair_mut(&mut self) -> (&mut libc::sockaddr, libc::socklen_t) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicative. Can you think of any way to share code between this method and as_ffi_pair? Also, you should wrap to 80 cols.

match *self {
SockAddr::Inet(InetAddr::V4(ref mut addr)) => (mem::transmute(addr), mem::size_of::<libc::sockaddr_in>() as libc::socklen_t),
SockAddr::Inet(InetAddr::V6(ref mut addr)) => (mem::transmute(addr), mem::size_of::<libc::sockaddr_in6>() as libc::socklen_t),
SockAddr::Unix(UnixAddr(ref mut addr, len)) => (mem::transmute(addr), (len + offset_of!(libc::sockaddr_un, sun_path)) as libc::socklen_t),
#[cfg(any(target_os = "android", target_os = "linux"))]
SockAddr::Netlink(NetlinkAddr(ref mut sa)) => (mem::transmute(sa), mem::size_of::<libc::sockaddr_nl>() as libc::socklen_t),
#[cfg(any(target_os = "android", target_os = "linux"))]
SockAddr::Alg(AlgAddr(ref mut sa)) => (mem::transmute(sa), mem::size_of::<libc::sockaddr_alg>() as libc::socklen_t),
#[cfg(any(target_os = "ios", target_os = "macos"))]
SockAddr::SysControl(SysControlAddr(ref mut sa)) => (mem::transmute(sa), mem::size_of::<libc::sockaddr_ctl>() as libc::socklen_t),
#[cfg(any(target_os = "android", target_os = "linux"))]
SockAddr::Link(LinkAddr(ref mut ether_addr)) => (mem::transmute(ether_addr), mem::size_of::<libc::sockaddr_ll>() as libc::socklen_t),
#[cfg(any(target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"))]
SockAddr::Link(LinkAddr(ref mut ether_addr)) => (mem::transmute(ether_addr), mem::size_of::<libc::sockaddr_dl>() as libc::socklen_t),
}
}
}

impl PartialEq for SockAddr {
Expand Down
244 changes: 244 additions & 0 deletions src/sys/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ use std::{fmt, mem, ptr, slice};
use std::os::unix::io::RawFd;
use sys::time::TimeVal;
use sys::uio::IoVec;
#[cfg(any(
target_os = "linux",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent for consistency's sake.

target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
use sys::time::TimeSpec;
#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
use std::marker::PhantomData;

mod addr;
pub mod sockopt;
Expand Down Expand Up @@ -1334,3 +1348,233 @@ pub fn shutdown(df: RawFd, how: Shutdown) -> Result<()> {
Errno::result(shutdown(df, how)).map(drop)
}
}

#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
#[repr(C)]
#[allow(missing_debug_implementations)]
pub struct SendMMsgHdr<'a>(libc::mmsghdr, PhantomData<&'a ()>);

#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
impl<'a> SendMMsgHdr<'a> {
pub fn new(iov: &mut[IoVec<&'a mut [u8]>],
cmsgs: &[ControlMessage],
flags: MsgFlags,
addr: Option<&'a mut SockAddr>) -> SendMMsgHdr<'a> {
let capacity = cmsgs.iter().map(|c| c.space()).sum();

// First size the buffer needed to hold the cmsgs. It must be zeroed,
// because subsequent code will not clear the padding bytes.
let cmsg_buffer = vec![0u8; capacity];

// Next encode the sending address, if provided
let (name, namelen) = match addr {
Some(addr) => {
let (x, y) = unsafe { addr.as_ffi_pair_mut() };
(x as *mut libc::sockaddr as *mut libc::c_void, y)
},
None => (ptr::null_mut(), 0),
};

// The message header must be initialized before the individual cmsgs.
let cmsg_ptr = if capacity > 0 {
cmsg_buffer.as_ptr() as *mut c_void
} else {
ptr::null_mut()
};

let mhdr = {
// Musl's msghdr has private fields, so this is the only way to
// initialize it.
let mut mhdr: msghdr = unsafe{mem::uninitialized()};
mhdr.msg_name = name as *mut _;
mhdr.msg_namelen = namelen;
// transmute iov into a mutable pointer. sendmsg doesn't really mutate
// the buffer, but the standard says that it takes a mutable pointer
mhdr.msg_iov = iov.as_ptr() as *mut _;
mhdr.msg_iovlen = iov.len() as _;
mhdr.msg_control = cmsg_ptr;
mhdr.msg_controllen = capacity as _;
mhdr.msg_flags = 0;
mhdr
};

// Encode each cmsg. This must happen after initializing the header because
// CMSG_NEXT_HDR and friends read the msg_control and msg_controllen fields.
// CMSG_FIRSTHDR is always safe
let mut pmhdr: *mut cmsghdr = unsafe{CMSG_FIRSTHDR(&mhdr as *const msghdr)};
for cmsg in cmsgs {
assert_ne!(pmhdr, ptr::null_mut());
// Safe because we know that pmhdr is valid, and we initialized it with
// sufficient space
unsafe { cmsg.encode_into(pmhdr) };
// Safe because mhdr is valid
pmhdr = unsafe{CMSG_NXTHDR(&mhdr as *const msghdr, pmhdr)};
}

let mut hdr: libc::mmsghdr = unsafe { mem::uninitialized() };
dholroyd marked this conversation as resolved.
Show resolved Hide resolved
hdr.msg_hdr.msg_control = cmsg_ptr;
hdr.msg_hdr.msg_controllen = capacity as _;
hdr.msg_hdr.msg_flags = flags.bits();
hdr.msg_hdr.msg_iov = iov.as_ptr() as *mut libc::iovec;
hdr.msg_hdr.msg_iovlen = iov.len() as _;
hdr.msg_hdr.msg_name = name;
hdr.msg_hdr.msg_namelen = namelen;
hdr.msg_len = 0;
SendMMsgHdr(hdr, PhantomData)
}

/// The number of bytes actually transferred, after a call to `sendmmsg()` or `recvmmsg()`.
pub fn msg_len(&self) -> usize {
self.0.msg_len as usize
}

pub fn address(&self) -> Result<SockAddr> {
unsafe { sockaddr_storage_to_addr(&*(self.0.msg_hdr.msg_name as *const libc::sockaddr_storage),
self.0.msg_hdr.msg_namelen as usize) }
}

pub fn cmsgs(&self) -> CmsgIterator {
let cmsghdr = unsafe {
if self.0.msg_hdr.msg_controllen > 0 {
// got control message(s)
debug_assert!(!self.0.msg_hdr.msg_control.is_null());
CMSG_FIRSTHDR(&self.0.msg_hdr as *const msghdr)
} else {
ptr::null()
}.as_ref()
};
CmsgIterator {
cmsghdr,
mhdr: &self.0.msg_hdr,
}
}
}

#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
#[repr(C)]
#[allow(missing_debug_implementations)]
pub struct RecvMMsgHdr<'a>(libc::mmsghdr, PhantomData<&'a ()>);

#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
impl<'a> RecvMMsgHdr<'a> {
pub fn new(iov: &'a mut[IoVec<&'a mut [u8]>],
cmsg_buffer: Option<&'a mut CmsgBuffer>,
flags: MsgFlags,
addr: Option<&'a mut SockAddr>) -> RecvMMsgHdr<'a> {
let (name, namelen) = match addr {
Some(addr) => {
let (x, y) = unsafe { addr.as_ffi_pair_mut() };
(x as *mut libc::sockaddr, y)
},
None => (ptr::null_mut(), 0),
};
let (msg_control, msg_controllen) = match cmsg_buffer {
Some(cmsgspace) => {
let msg_buf = cmsgspace.as_bytes_mut();
(msg_buf.as_mut_ptr(), msg_buf.len())
},
None => (ptr::null_mut(), 0),
};
let mut hdr: libc::mmsghdr = unsafe { mem::uninitialized() };
hdr.msg_hdr.msg_control = msg_control as *mut _;
hdr.msg_hdr.msg_controllen = msg_controllen as _;
hdr.msg_hdr.msg_flags = flags.bits();
hdr.msg_hdr.msg_iov = iov.as_mut_ptr() as *mut iovec;
hdr.msg_hdr.msg_iovlen = iov.len() as _;
hdr.msg_hdr.msg_name = name as _;
hdr.msg_hdr.msg_namelen = namelen;
hdr.msg_len = 0;
RecvMMsgHdr(hdr, PhantomData)
}

/// The number of bytes actually transferred, after a call to `sendmmsg()` or `recvmmsg()`.
pub fn msg_len(&self) -> usize {
self.0.msg_len as usize
}

pub fn address(&self) -> Result<SockAddr> {
unsafe { sockaddr_storage_to_addr(&*(self.0.msg_hdr.msg_name as *const libc::sockaddr_storage),
self.0.msg_hdr.msg_namelen as usize) }
}

pub fn cmsgs(&self) -> CmsgIterator {
let cmsghdr = unsafe {
if self.0.msg_hdr.msg_controllen > 0 {
// got control message(s)
debug_assert!(!self.0.msg_hdr.msg_control.is_null());
CMSG_FIRSTHDR(&self.0.msg_hdr as *const msghdr)
} else {
ptr::null()
}.as_ref()
};
CmsgIterator {
cmsghdr,
mhdr: &self.0.msg_hdr,
}
}
}

/// Receive multiple messages from a socket using a single system call.
#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
pub fn recvmmsg(fd: RawFd, msgvec: &mut[RecvMMsgHdr],
flags: MsgFlags,
mut timeout: Option<TimeSpec>) -> Result<usize> {
let tptr = match timeout {
Some(ref mut time) => time.as_ref() as *const libc::timespec,
None => ptr::null_mut(),
};
let ret = unsafe {
libc::recvmmsg(
fd,
msgvec.as_mut_ptr() as *mut libc::mmsghdr,
msgvec.len() as _,
flags.bits(),
Copy link

@gnzlbg gnzlbg May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: on musl the flag argument is an unsigned int, while on gnu, it is a signed int, so one probably needs an as _ here as well.

tptr as *mut libc::timespec,
)
};
Ok(Errno::result(ret)? as usize)
}

/// Transmit multiple messages on a socket using a single system call.
#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
pub fn sendmmsg(fd: RawFd, msgvec: &mut[SendMMsgHdr]) -> Result<usize> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very complicated function to use. Could you provide an example? Also, why no flags argument?

let ret = unsafe {
libc::sendmmsg(
fd,
msgvec.as_mut_ptr() as *mut libc::mmsghdr,
msgvec.len() as _,
0,
)
};
Ok(Errno::result(ret)? as usize)
}
92 changes: 92 additions & 0 deletions test/sys/test_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -959,3 +959,95 @@ pub fn test_recv_ipv6pktinfo() {
);
}
}

#[cfg(any(
target_os = "linux",
target_os = "freebsd",
target_os = "netbsd",
target_os = "android",
))]
#[test]
pub fn test_mmsg() {
use nix::sys::socket::{
bind, connect, recvmmsg, sendmmsg, socket, AddressFamily,
InetAddr, MsgFlags, RecvMMsgHdr, SendMMsgHdr, SockAddr, SockFlag, SockType,
};
use nix::sys::uio::IoVec;
use std::thread;
use std::time;

let sender = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None,
)
.expect("send socket failed");
let sockaddr = SockAddr::new_inet(InetAddr::from_std(
&SocketAddr::from_str("127.0.0.1:3456").unwrap(),
));
connect(sender, &sockaddr).unwrap();
let mut a = [b'A'; 500];
let mut b = [b'B'; 500];
let mut c = [b'C'; 500];
let mut iov_a = [IoVec::from_mut_slice(&mut a[..])];
let mut iov_b = [IoVec::from_mut_slice(&mut b[..])];
let mut iov_c = [IoVec::from_mut_slice(&mut c[..])];
let mut msgs = [
SendMMsgHdr::new(&mut iov_a[..], &mut [], MsgFlags::empty(), None),
SendMMsgHdr::new(&mut iov_b[..], &mut [], MsgFlags::empty(), None),
SendMMsgHdr::new(&mut iov_c[..], &mut [], MsgFlags::empty(), None),
];

let receiver = socket(
AddressFamily::Inet,
SockType::Datagram,
SockFlag::empty(),
None,
)
.expect("recv socket failed");
bind(receiver, &sockaddr).unwrap();

// now that the receiver is bond, send the messages
sendmmsg(sender, &mut msgs[..]).unwrap();

thread::sleep(time::Duration::from_millis(200));
let mut a = [0u8; 1500];
let mut b = [0u8; 1500];
let mut c = [0u8; 1500];
let mut sockaddr_a = SockAddr::Inet(InetAddr::from_std(
&SocketAddr::from_str("0.0.0.0:0").unwrap(),
));
let mut sockaddr_b = SockAddr::Inet(InetAddr::from_std(
&SocketAddr::from_str("0.0.0.0:0").unwrap(),
));
let mut sockaddr_c = SockAddr::Inet(InetAddr::from_std(
&SocketAddr::from_str("0.0.0.0:0").unwrap(),
));
let mut iov_a = [IoVec::from_mut_slice(&mut a[..])];
let mut iov_b = [IoVec::from_mut_slice(&mut b[..])];
let mut iov_c = [IoVec::from_mut_slice(&mut c[..])];
let mut msgs = [
RecvMMsgHdr::new(
&mut iov_a[..],
None,
MsgFlags::empty(),
Some(&mut sockaddr_a),
),
RecvMMsgHdr::new(
&mut iov_b[..],
None,
MsgFlags::empty(),
Some(&mut sockaddr_b),
),
RecvMMsgHdr::new(
&mut iov_c[..],
None,
MsgFlags::empty(),
Some(&mut sockaddr_c),
),
];
let count = recvmmsg(receiver, &mut msgs[..], MsgFlags::empty(), None).unwrap();
assert_eq!(3, count);
assert_eq!(500, msgs[0].msg_len());
}