Skip to content

Commit

Permalink
feat: support UnixSeqpacket and its listener (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah authored Jul 7, 2023
1 parent d3a0689 commit c9e8c8c
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 1 deletion.
4 changes: 4 additions & 0 deletions monoio/src/net/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ mod split;
mod stream;
mod ucred;

#[cfg(target_os = "linux")]
mod seq_packet;
pub use datagram::UnixDatagram;
pub use listener::UnixListener;
pub use pipe::{new_pipe, Pipe};
#[cfg(target_os = "linux")]
pub use seq_packet::{UnixSeqpacket, UnixSeqpacketListener};
pub use socket_addr::SocketAddr;
pub use split::{UnixOwnedReadHalf, UnixOwnedWriteHalf};
pub use stream::UnixStream;
Expand Down
92 changes: 92 additions & 0 deletions monoio/src/net/unix/seq_packet/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{
future::Future,
io,
os::fd::{AsRawFd, RawFd},
path::Path,
};

use super::UnixSeqpacket;
use crate::{
driver::{op::Op, shared_fd::SharedFd},
io::stream::Stream,
net::{
new_socket,
unix::{socket_addr::socket_addr, SocketAddr},
},
};

const DEFAULT_BACKLOG: libc::c_int = 128;

/// Listner for UnixSeqpacket
pub struct UnixSeqpacketListener {
fd: SharedFd,
}

impl UnixSeqpacketListener {
/// Creates a new `UnixSeqpacketListener` bound to the specified path with custom backlog
pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: libc::c_int) -> io::Result<Self> {
let (addr, addr_len) = socket_addr(path.as_ref())?;
let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?;
crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?;
crate::syscall!(listen(socket, backlog))?;
Ok(Self {
fd: SharedFd::new(socket)?,
})
}

/// Creates a new `UnixSeqpacketListener` bound to the specified path with default backlog(128)
#[inline]
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Self::bind_with_backlog(path, DEFAULT_BACKLOG)
}

/// Accept a UnixSeqpacket
pub async fn accept(&self) -> io::Result<(UnixSeqpacket, SocketAddr)> {
let op = Op::accept(&self.fd)?;

// Await the completion of the event
let completion = op.await;

// Convert fd
let fd = completion.meta.result?;

// Construct stream
let stream = UnixSeqpacket::from_shared_fd(SharedFd::new(fd as _)?);

// Construct SocketAddr
let mut storage = unsafe { std::mem::MaybeUninit::assume_init(completion.data.addr.0) };
let storage: *mut libc::sockaddr_storage = &mut storage as *mut _;
let raw_addr_un: libc::sockaddr_un = unsafe { *storage.cast() };
let raw_addr_len = completion.data.addr.1;

let addr = SocketAddr::from_parts(raw_addr_un, raw_addr_len);

Ok((stream, addr))
}
}

impl AsRawFd for UnixSeqpacketListener {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.fd.raw_fd()
}
}

impl std::fmt::Debug for UnixSeqpacketListener {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnixSeqpacketListener")
.field("fd", &self.fd)
.finish()
}
}

impl Stream for UnixSeqpacketListener {
type Item = io::Result<(UnixSeqpacket, SocketAddr)>;

type NextFuture<'a> = impl Future<Output = Option<Self::Item>> + 'a;

#[inline]
fn next(&mut self) -> Self::NextFuture<'_> {
async move { Some(self.accept().await) }
}
}
161 changes: 161 additions & 0 deletions monoio/src/net/unix/seq_packet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//! UnixSeqpacket related.
//! Only available on linux.

use std::{
io,
os::unix::prelude::{AsRawFd, RawFd},
path::Path,
};

use super::{
socket_addr::{local_addr, pair, peer_addr, socket_addr},
SocketAddr,
};
use crate::{
buf::{IoBuf, IoBufMut},
driver::{op::Op, shared_fd::SharedFd},
net::new_socket,
};

mod listener;
pub use listener::UnixSeqpacketListener;

/// UnixSeqpacket
pub struct UnixSeqpacket {
fd: SharedFd,
}

impl UnixSeqpacket {
pub(crate) fn from_shared_fd(fd: SharedFd) -> Self {
Self { fd }
}

/// Creates an unnamed pair of connected sockets.
pub fn pair() -> io::Result<(Self, Self)> {
let (a, b) = pair(libc::SOCK_SEQPACKET)?;
Ok((
Self::from_shared_fd(SharedFd::new(a)?),
Self::from_shared_fd(SharedFd::new(b)?),
))
}

/// Connects the socket to the specified address.
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let (addr, addr_len) = socket_addr(path.as_ref())?;
Self::inner_connect(addr, addr_len).await
}

/// Connects the socket to an address.
pub async fn connect_addr(addr: SocketAddr) -> io::Result<Self> {
let (addr, addr_len) = addr.into_parts();
Self::inner_connect(addr, addr_len).await
}

#[inline(always)]
async fn inner_connect(
sockaddr: libc::sockaddr_un,
socklen: libc::socklen_t,
) -> io::Result<Self> {
let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?;
let op = Op::connect_unix(SharedFd::new(socket)?, sockaddr, socklen)?;
let completion = op.await;
completion.meta.result?;

Ok(Self::from_shared_fd(completion.data.fd))
}

/// Returns the socket address of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
local_addr(self.as_raw_fd())
}

/// Returns the socket address of the remote half of this connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
peer_addr(self.as_raw_fd())
}

/// Wait for read readiness.
/// Note: Do not use it before every io. It is different from other runtimes!
///
/// Everytime call to this method may pay a syscall cost.
/// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
/// inner readiness state; if !relaxed, it will call syscall poll after that.
///
/// If relaxed, on legacy driver it may return false positive result.
/// If you want to do io by your own, you must maintain io readiness and wait
/// for io ready with relaxed=false.
pub async fn readable(&self, relaxed: bool) -> io::Result<()> {
let op = Op::poll_read(&self.fd, relaxed).unwrap();
op.wait().await
}

/// Wait for write readiness.
/// Note: Do not use it before every io. It is different from other runtimes!
///
/// Everytime call to this method may pay a syscall cost.
/// In uring impl, it will push a PollAdd op; in epoll impl, it will use use
/// inner readiness state; if !relaxed, it will call syscall poll after that.
///
/// If relaxed, on legacy driver it may return false positive result.
/// If you want to do io by your own, you must maintain io readiness and wait
/// for io ready with relaxed=false.
pub async fn writable(&self, relaxed: bool) -> io::Result<()> {
let op = Op::poll_write(&self.fd, relaxed).unwrap();
op.wait().await
}

/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
pub async fn send_to<T: IoBuf, P: AsRef<Path>>(
&self,
buf: T,
path: P,
) -> crate::BufResult<usize, T> {
let addr = match crate::net::unix::socket_addr::socket_addr(path.as_ref()) {
Ok(addr) => addr,
Err(e) => return (Err(e), buf),
};
let op = Op::send_msg_unix(
self.fd.clone(),
buf,
Some(SocketAddr::from_parts(addr.0, addr.1)),
)
.unwrap();
op.wait().await
}

/// Receives a single datagram message on the socket. On success, returns the number
/// of bytes read and the origin.
pub async fn recv_from<T: IoBufMut>(&self, buf: T) -> crate::BufResult<(usize, SocketAddr), T> {
let op = Op::recv_msg_unix(self.fd.clone(), buf).unwrap();
op.wait().await
}

/// Sends data on the socket to the remote address to which it is connected.
pub async fn send<T: IoBuf>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::send_msg_unix(self.fd.clone(), buf, None).unwrap();
op.wait().await
}

/// Receives a single datagram message on the socket from the remote address to
/// which it is connected. On success, returns the number of bytes read.
pub async fn recv<T: IoBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::recv(self.fd.clone(), buf).unwrap();
op.read().await
}
}

impl AsRawFd for UnixSeqpacket {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.fd.raw_fd()
}
}

impl std::fmt::Debug for UnixSeqpacket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnixSeqpacket")
.field("fd", &self.fd)
.finish()
}
}
18 changes: 17 additions & 1 deletion monoio/src/net/unix/socket_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,25 @@ pub(crate) fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
where
T: FromRawFd,
{
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd"
))]
let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;

#[cfg(target_os = "linux")]
let flags = {
if crate::driver::op::is_legacy() {
flags | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK
} else {
flags | libc::SOCK_CLOEXEC
}
};

let mut fds = [-1; 2];
crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };
Expand Down
22 changes: 22 additions & 0 deletions monoio/tests/unix_seqpacket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#[cfg(target_os = "linux")]
#[monoio::test_all]
async fn test_seqpacket() -> std::io::Result<()> {
use monoio::net::unix::{UnixSeqpacket, UnixSeqpacketListener};

let dir = tempfile::Builder::new()
.prefix("monoio-unix-seqpacket-tests")
.tempdir()
.unwrap();
let sock_path = dir.path().join("seqpacket.sock");

let listener = UnixSeqpacketListener::bind(&sock_path).unwrap();
monoio::spawn(async move {
let (conn, _addr) = listener.accept().await.unwrap();
let (res, buf) = conn.recv(vec![0; 100]).await;
assert_eq!(res.unwrap(), 5);
assert_eq!(buf, b"hello");
});
let conn = UnixSeqpacket::connect(&sock_path).await.unwrap();
conn.send(b"hello").await.0.unwrap();
Ok(())
}

0 comments on commit c9e8c8c

Please sign in to comment.