Skip to content

Commit

Permalink
net: add try_read_buf and try_recv_buf (#3351)
Browse files Browse the repository at this point in the history
  • Loading branch information
cssivision authored Jan 2, 2021
1 parent 56272b2 commit 3b6bee8
Show file tree
Hide file tree
Showing 8 changed files with 770 additions and 1 deletion.
83 changes: 83 additions & 0 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

cfg_io_util! {
use bytes::BufMut;
}

cfg_net! {
/// A TCP stream between a local and a remote socket.
///
Expand Down Expand Up @@ -559,6 +563,85 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
/// [`ready()`]: TcpStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// let mut buf = Vec::with_capacity(4096);
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_buf(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;

let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).read(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down
136 changes: 135 additions & 1 deletion tokio/src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};

cfg_io_util! {
use bytes::BufMut;
}

cfg_net! {
/// A UDP socket
///
Expand Down Expand Up @@ -683,6 +687,137 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv(buf))
}

cfg_io_util! {
/// Try to receive data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// The function must be called with valid byte array buf of sufficient size
/// to hold the message bytes. If a message is too long to fit in the
/// supplied buffer, excess bytes may be discarded.
///
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
/// returned. This function is usually paired with `readable()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
/// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_buf(&mut buf) {
/// Ok(n) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).recv(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}

/// Try to receive a single datagram message on the socket. On success,
/// returns the number of bytes read and the origin.
///
/// The function must be called with valid byte array buf of sufficient size
/// to hold the message bytes. If a message is too long to fit in the
/// supplied buffer, excess bytes may be discarded.
///
/// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
/// returned. This function is usually paired with `readable()`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UdpSocket;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_buf_from(&mut buf) {
/// Ok((n, _addr)) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
// buffer.
let (n, addr) = (&*self.io).recv_from(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok((n, addr))
})
}
}

/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
Expand Down Expand Up @@ -904,7 +1039,6 @@ impl UdpSocket {
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
/// socket.connect("127.0.0.1:8081").await?;
///
/// loop {
/// // Wait for the socket to be readable
Expand Down
128 changes: 128 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};

cfg_io_util! {
use bytes::BufMut;
}

cfg_net_unix! {
/// An I/O object representing a Unix datagram socket.
///
Expand Down Expand Up @@ -652,6 +656,130 @@ impl UnixDatagram {
.try_io(Interest::READABLE, || self.io.recv(buf))
}

cfg_io_util! {
/// Try to receive data from the socket without waiting.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixDatagram;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let client_path = dir.path().join("client.sock");
/// let server_path = dir.path().join("server.sock");
/// let socket = UnixDatagram::bind(&client_path)?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_buf_from(&mut buf) {
/// Ok((n, _addr)) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
// buffer.
let (n, addr) = (&*self.io).recv_from(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok((n, addr))
})?;

Ok((n, SocketAddr(addr)))
}

/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixDatagram;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let client_path = dir.path().join("client.sock");
/// let server_path = dir.path().join("server.sock");
/// let socket = UnixDatagram::bind(&client_path)?;
/// socket.connect(&server_path)?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// let mut buf = Vec::with_capacity(1024);
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv_buf(&mut buf) {
/// Ok(n) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };

// Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
// buffer.
let n = (&*self.io).recv(dst)?;

unsafe {
buf.advance_mut(n);
}

Ok(n)
})
}
}

/// Sends data on the socket to the specified address.
///
/// # Examples
Expand Down
Loading

0 comments on commit 3b6bee8

Please sign in to comment.