Skip to content

Commit 9911160

Browse files
committed
Auto merge of #73761 - rijenkii:master, r=KodrAus
Add `peek` and `peek_from` to `UnixStream` and `UnixDatagram` This is my first PR, so I'm sure I've done some things wrong. This PR: * adds `peek` function to `UnixStream`; * adds `peek` and `peek_from` to `UnixDatagram`; * moves `UnixDatagram::recv_from` implementation to a private function `recv_from_flags`, as `peek_from` uses the same code, just with different flags. I've taken the documentation from `TcpStream` and `UdpStream`, so it may or may not make sense (I'm bad with english words). Also, I'm not sure what I should write in the `unstable` attribute, so I've made up the name and set the issue to "none". Closes #68565.
2 parents bc57bd8 + 64b8fd7 commit 9911160

File tree

2 files changed

+192
-20
lines changed

2 files changed

+192
-20
lines changed

library/std/src/sys/unix/ext/net.rs

+112-20
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,32 @@ impl UnixStream {
594594
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
595595
self.0.shutdown(how)
596596
}
597+
598+
/// Receives data on the socket from the remote address to which it is
599+
/// connected, without removing that data from the queue. On success,
600+
/// returns the number of bytes peeked.
601+
///
602+
/// Successive calls return the same data. This is accomplished by passing
603+
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
604+
///
605+
/// # Examples
606+
///
607+
/// ```no_run
608+
/// #![feature(unix_socket_peek)]
609+
///
610+
/// use std::os::unix::net::UnixStream;
611+
///
612+
/// fn main() -> std::io::Result<()> {
613+
/// let socket = UnixStream::connect("/tmp/sock")?;
614+
/// let mut buf = [0; 10];
615+
/// let len = socket.peek(&mut buf).expect("peek failed");
616+
/// Ok(())
617+
/// }
618+
/// ```
619+
#[unstable(feature = "unix_socket_peek", issue = "none")]
620+
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
621+
self.0.peek(buf)
622+
}
597623
}
598624

599625
#[stable(feature = "unix_socket", since = "1.10.0")]
@@ -1291,6 +1317,33 @@ impl UnixDatagram {
12911317
SocketAddr::new(|addr, len| unsafe { libc::getpeername(*self.0.as_inner(), addr, len) })
12921318
}
12931319

1320+
fn recv_from_flags(
1321+
&self,
1322+
buf: &mut [u8],
1323+
flags: libc::c_int,
1324+
) -> io::Result<(usize, SocketAddr)> {
1325+
let mut count = 0;
1326+
let addr = SocketAddr::new(|addr, len| unsafe {
1327+
count = libc::recvfrom(
1328+
*self.0.as_inner(),
1329+
buf.as_mut_ptr() as *mut _,
1330+
buf.len(),
1331+
flags,
1332+
addr,
1333+
len,
1334+
);
1335+
if count > 0 {
1336+
1
1337+
} else if count == 0 {
1338+
0
1339+
} else {
1340+
-1
1341+
}
1342+
})?;
1343+
1344+
Ok((count as usize, addr))
1345+
}
1346+
12941347
/// Receives data from the socket.
12951348
///
12961349
/// On success, returns the number of bytes read and the address from
@@ -1311,26 +1364,7 @@ impl UnixDatagram {
13111364
/// ```
13121365
#[stable(feature = "unix_socket", since = "1.10.0")]
13131366
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1314-
let mut count = 0;
1315-
let addr = SocketAddr::new(|addr, len| unsafe {
1316-
count = libc::recvfrom(
1317-
*self.0.as_inner(),
1318-
buf.as_mut_ptr() as *mut _,
1319-
buf.len(),
1320-
0,
1321-
addr,
1322-
len,
1323-
);
1324-
if count > 0 {
1325-
1
1326-
} else if count == 0 {
1327-
0
1328-
} else {
1329-
-1
1330-
}
1331-
})?;
1332-
1333-
Ok((count as usize, addr))
1367+
self.recv_from_flags(buf, 0)
13341368
}
13351369

13361370
/// Receives data from the socket.
@@ -1601,6 +1635,64 @@ impl UnixDatagram {
16011635
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
16021636
self.0.shutdown(how)
16031637
}
1638+
1639+
/// Receives data on the socket from the remote address to which it is
1640+
/// connected, without removing that data from the queue. On success,
1641+
/// returns the number of bytes peeked.
1642+
///
1643+
/// Successive calls return the same data. This is accomplished by passing
1644+
/// `MSG_PEEK` as a flag to the underlying `recv` system call.
1645+
///
1646+
/// # Examples
1647+
///
1648+
/// ```no_run
1649+
/// #![feature(unix_socket_peek)]
1650+
///
1651+
/// use std::os::unix::net::UnixDatagram;
1652+
///
1653+
/// fn main() -> std::io::Result<()> {
1654+
/// let socket = UnixDatagram::bind("/tmp/sock")?;
1655+
/// let mut buf = [0; 10];
1656+
/// let len = socket.peek(&mut buf).expect("peek failed");
1657+
/// Ok(())
1658+
/// }
1659+
/// ```
1660+
#[unstable(feature = "unix_socket_peek", issue = "none")]
1661+
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1662+
self.0.peek(buf)
1663+
}
1664+
1665+
/// Receives a single datagram message on the socket, without removing it from the
1666+
/// queue. On success, returns the number of bytes read and the origin.
1667+
///
1668+
/// The function must be called with valid byte array `buf` of sufficient size to
1669+
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
1670+
/// excess bytes may be discarded.
1671+
///
1672+
/// Successive calls return the same data. This is accomplished by passing
1673+
/// `MSG_PEEK` as a flag to the underlying `recvfrom` system call.
1674+
///
1675+
/// Do not use this function to implement busy waiting, instead use `libc::poll` to
1676+
/// synchronize IO events on one or more sockets.
1677+
///
1678+
/// # Examples
1679+
///
1680+
/// ```no_run
1681+
/// #![feature(unix_socket_peek)]
1682+
///
1683+
/// use std::os::unix::net::UnixDatagram;
1684+
///
1685+
/// fn main() -> std::io::Result<()> {
1686+
/// let socket = UnixDatagram::bind("/tmp/sock")?;
1687+
/// let mut buf = [0; 10];
1688+
/// let (len, addr) = socket.peek_from(&mut buf).expect("peek failed");
1689+
/// Ok(())
1690+
/// }
1691+
/// ```
1692+
#[unstable(feature = "unix_socket_peek", issue = "none")]
1693+
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1694+
self.recv_from_flags(buf, libc::MSG_PEEK)
1695+
}
16041696
}
16051697

16061698
#[stable(feature = "unix_socket", since = "1.10.0")]

library/std/src/sys/unix/ext/net/tests.rs

+80
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,83 @@ fn test_unix_datagram_timeout_zero_duration() {
372372
fn abstract_namespace_not_allowed() {
373373
assert!(UnixStream::connect("\0asdf").is_err());
374374
}
375+
376+
#[test]
377+
fn test_unix_stream_peek() {
378+
let (txdone, rxdone) = crate::sync::mpsc::channel();
379+
380+
let dir = tmpdir();
381+
let path = dir.path().join("sock");
382+
383+
let listener = or_panic!(UnixListener::bind(&path));
384+
let thread = thread::spawn(move || {
385+
let mut stream = or_panic!(listener.accept()).0;
386+
or_panic!(stream.write_all(&[1, 3, 3, 7]));
387+
or_panic!(rxdone.recv());
388+
});
389+
390+
let mut stream = or_panic!(UnixStream::connect(&path));
391+
let mut buf = [0; 10];
392+
for _ in 0..2 {
393+
assert_eq!(or_panic!(stream.peek(&mut buf)), 4);
394+
}
395+
assert_eq!(or_panic!(stream.read(&mut buf)), 4);
396+
397+
or_panic!(stream.set_nonblocking(true));
398+
match stream.peek(&mut buf) {
399+
Ok(_) => panic!("expected error"),
400+
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
401+
Err(e) => panic!("unexpected error: {}", e),
402+
}
403+
404+
or_panic!(txdone.send(()));
405+
thread.join().unwrap();
406+
}
407+
408+
#[test]
409+
fn test_unix_datagram_peek() {
410+
let dir = tmpdir();
411+
let path1 = dir.path().join("sock");
412+
413+
let sock1 = or_panic!(UnixDatagram::bind(&path1));
414+
let sock2 = or_panic!(UnixDatagram::unbound());
415+
or_panic!(sock2.connect(&path1));
416+
417+
let msg = b"hello world";
418+
or_panic!(sock2.send(msg));
419+
for _ in 0..2 {
420+
let mut buf = [0; 11];
421+
let size = or_panic!(sock1.peek(&mut buf));
422+
assert_eq!(size, 11);
423+
assert_eq!(msg, &buf[..]);
424+
}
425+
426+
let mut buf = [0; 11];
427+
let size = or_panic!(sock1.recv(&mut buf));
428+
assert_eq!(size, 11);
429+
assert_eq!(msg, &buf[..]);
430+
}
431+
432+
#[test]
433+
fn test_unix_datagram_peek_from() {
434+
let dir = tmpdir();
435+
let path1 = dir.path().join("sock");
436+
437+
let sock1 = or_panic!(UnixDatagram::bind(&path1));
438+
let sock2 = or_panic!(UnixDatagram::unbound());
439+
or_panic!(sock2.connect(&path1));
440+
441+
let msg = b"hello world";
442+
or_panic!(sock2.send(msg));
443+
for _ in 0..2 {
444+
let mut buf = [0; 11];
445+
let (size, _) = or_panic!(sock1.peek_from(&mut buf));
446+
assert_eq!(size, 11);
447+
assert_eq!(msg, &buf[..]);
448+
}
449+
450+
let mut buf = [0; 11];
451+
let size = or_panic!(sock1.recv(&mut buf));
452+
assert_eq!(size, 11);
453+
assert_eq!(msg, &buf[..]);
454+
}

0 commit comments

Comments
 (0)