Skip to content

Commit

Permalink
Auto merge of rust-lang#3609 - tiif:feat/socketpair, r=RalfJung
Browse files Browse the repository at this point in the history
Add socketpair shim

Fixes rust-lang#3442
Design proposal: https://hackmd.io/`@tiif/Skhc1t0-C`
  • Loading branch information
bors committed Jun 10, 2024
2 parents ad85a20 + c28e606 commit 0d66db9
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 15 deletions.
201 changes: 186 additions & 15 deletions src/tools/miri/src/shims/unix/socket.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::io::{Error, ErrorKind, Read};
use std::rc::{Rc, Weak};

use crate::shims::unix::*;
use crate::*;
use crate::{concurrency::VClock, *};

use self::fd::FileDescriptor;

/// The maximum capacity of the socketpair buffer in bytes.
/// This number is arbitrary as the value can always
/// be configured in the real system.
const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;

/// Pair of connected sockets.
///
/// We currently don't allow sending any data through this pair, so this can be just a dummy.
#[derive(Debug)]
struct SocketPair;
struct SocketPair {
// By making the write link weak, a `write` can detect when all readers are
// gone, and trigger EPIPE as appropriate.
writebuf: Weak<RefCell<Buffer>>,
readbuf: Rc<RefCell<Buffer>>,
is_nonblock: bool,
}

#[derive(Debug)]
struct Buffer {
buf: VecDeque<u8>,
clock: VClock,
/// Indicates if there is at least one active writer to this buffer.
/// If all writers of this buffer are dropped, buf_has_writer becomes false and we
/// indicate EOF instead of blocking.
buf_has_writer: bool,
}

impl FileDescription for SocketPair {
fn name(&self) -> &'static str {
Expand All @@ -20,17 +43,102 @@ impl FileDescription for SocketPair {
self: Box<Self>,
_communicate_allowed: bool,
) -> InterpResult<'tcx, io::Result<()>> {
// This is used to signal socketfd of other side that there is no writer to its readbuf.
// If the upgrade fails, there is no need to update as all read ends have been dropped.
if let Some(writebuf) = self.writebuf.upgrade() {
writebuf.borrow_mut().buf_has_writer = false;
};
Ok(Ok(()))
}

fn read<'tcx>(
&mut self,
_communicate_allowed: bool,
bytes: &mut [u8],
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, io::Result<usize>> {
let request_byte_size = bytes.len();
let mut readbuf = self.readbuf.borrow_mut();

// Always succeed on read size 0.
if request_byte_size == 0 {
return Ok(Ok(0));
}

if readbuf.buf.is_empty() {
if !readbuf.buf_has_writer {
// Socketpair with no writer and empty buffer.
// 0 bytes successfully read indicates end-of-file.
return Ok(Ok(0));
} else {
if self.is_nonblock {
// Non-blocking socketpair with writer and empty buffer.
// https://linux.die.net/man/2/read
// EAGAIN or EWOULDBLOCK can be returned for socket,
// POSIX.1-2001 allows either error to be returned for this case.
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
return Ok(Err(Error::from(ErrorKind::WouldBlock)));
} else {
// Blocking socketpair with writer and empty buffer.
// FIXME: blocking is currently not supported
throw_unsup_format!("socketpair read: blocking isn't supported yet");
}
}
}

// Synchronize with all previous writes to this buffer.
// FIXME: this over-synchronizes; a more precise approach would be to
// only sync with the writes whose data we will read.
ecx.acquire_clock(&readbuf.clock);
// Do full read / partial read based on the space available.
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
let actual_read_size = readbuf.buf.read(bytes).unwrap();
return Ok(Ok(actual_read_size));
}

fn write<'tcx>(
&mut self,
_communicate_allowed: bool,
bytes: &[u8],
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, io::Result<usize>> {
let write_size = bytes.len();
// Always succeed on write size 0.
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
if write_size == 0 {
return Ok(Ok(0));
}

let Some(writebuf) = self.writebuf.upgrade() else {
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
// closed.
return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
};
let mut writebuf = writebuf.borrow_mut();
let data_size = writebuf.buf.len();
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.checked_sub(data_size).unwrap();
if available_space == 0 {
if self.is_nonblock {
// Non-blocking socketpair with a full buffer.
return Ok(Err(Error::from(ErrorKind::WouldBlock)));
} else {
// Blocking socketpair with a full buffer.
throw_unsup_format!("socketpair write: blocking isn't supported yet");
}
}
// Remember this clock so `read` can synchronize with us.
if let Some(clock) = &ecx.release_clock() {
writebuf.clock.join(clock);
}
// Do full write / partial write based on the space available.
let actual_write_size = write_size.min(available_space);
writebuf.buf.extend(&bytes[..actual_write_size]);
return Ok(Ok(actual_write_size));
}
}

impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}
pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Currently this function this function is a stub. Eventually we need to
/// properly implement an FD type for sockets and have this function create
/// two sockets and associated FDs such that writing to one will produce
/// data that can be read from the other.
///
/// For more information on the arguments see the socketpair manpage:
/// <https://linux.die.net/man/2/socketpair>
fn socketpair(
Expand All @@ -42,17 +150,80 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
) -> InterpResult<'tcx, Scalar> {
let this = self.eval_context_mut();

let _domain = this.read_scalar(domain)?.to_i32()?;
let _type_ = this.read_scalar(type_)?.to_i32()?;
let _protocol = this.read_scalar(protocol)?.to_i32()?;
let domain = this.read_scalar(domain)?.to_i32()?;
let mut type_ = this.read_scalar(type_)?.to_i32()?;
let protocol = this.read_scalar(protocol)?.to_i32()?;
let sv = this.deref_pointer(sv)?;

// FIXME: fail on unsupported inputs
let mut is_sock_nonblock = false;

// Parse and remove the type flags that we support. If type != 0 after removing,
// unsupported flags are used.
if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") {
type_ &= !(this.eval_libc_i32("SOCK_STREAM"));
}

// SOCK_NONBLOCK only exists on Linux.
if this.tcx.sess.target.os == "linux" {
if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
is_sock_nonblock = true;
type_ &= !(this.eval_libc_i32("SOCK_NONBLOCK"));
}
if type_ & this.eval_libc_i32("SOCK_CLOEXEC") == this.eval_libc_i32("SOCK_CLOEXEC") {
type_ &= !(this.eval_libc_i32("SOCK_CLOEXEC"));
}
}

// Fail on unsupported input.
// AF_UNIX and AF_LOCAL are synonyms, so we accept both in case
// their values differ.
if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") {
throw_unsup_format!(
"socketpair: Unsupported domain {:#x} is used, only AF_UNIX \
and AF_LOCAL are allowed",
domain
);
} else if type_ != 0 {
throw_unsup_format!(
"socketpair: Unsupported type {:#x} is used, only SOCK_STREAM, \
SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
type_
);
} else if protocol != 0 {
throw_unsup_format!(
"socketpair: Unsupported socket protocol {protocol} is used, \
only 0 is allowed",
);
}

let buffer1 = Rc::new(RefCell::new(Buffer {
buf: VecDeque::new(),
clock: VClock::default(),
buf_has_writer: true,
}));

let buffer2 = Rc::new(RefCell::new(Buffer {
buf: VecDeque::new(),
clock: VClock::default(),
buf_has_writer: true,
}));

let socketpair_0 = SocketPair {
writebuf: Rc::downgrade(&buffer1),
readbuf: Rc::clone(&buffer2),
is_nonblock: is_sock_nonblock,
};

let socketpair_1 = SocketPair {
writebuf: Rc::downgrade(&buffer2),
readbuf: Rc::clone(&buffer1),
is_nonblock: is_sock_nonblock,
};

let fds = &mut this.machine.fds;
let sv0 = fds.insert_fd(FileDescriptor::new(SocketPair));
let sv0 = fds.insert_fd(FileDescriptor::new(socketpair_0));
let sv0 = Scalar::try_from_int(sv0, sv.layout.size).unwrap();
let sv1 = fds.insert_fd(FileDescriptor::new(SocketPair));
let sv1 = fds.insert_fd(FileDescriptor::new(socketpair_1));
let sv1 = Scalar::try_from_int(sv1, sv.layout.size).unwrap();

this.write_scalar(sv0, &sv)?;
Expand Down
12 changes: 12 additions & 0 deletions src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//@ignore-target-windows: no libc socketpair on Windows

// This is temporarily here because blocking on fd is not supported yet.
// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair

fn main() {
let mut fds = [-1, -1];
let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
// The read below will be blocked because the buffer is empty.
let mut buf: [u8; 3] = [0; 3];
let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; //~ERROR: blocking isn't supported
}
14 changes: 14 additions & 0 deletions src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
error: unsupported operation: socketpair read: blocking isn't supported yet
--> $DIR/socketpair_read_blocking.rs:LL:CC
|
LL | let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair read: blocking isn't supported yet
|
= help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
= note: BACKTRACE:
= note: inside `main` at $DIR/socketpair_read_blocking.rs:LL:CC

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 1 previous error

16 changes: 16 additions & 0 deletions src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//@ignore-target-windows: no libc socketpair on Windows
// This is temporarily here because blocking on fd is not supported yet.
// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair
fn main() {
let mut fds = [-1, -1];
let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
// Write size > buffer capacity
// Used up all the space in the buffer.
let arr1: [u8; 212992] = [1; 212992];
let _ = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) };
let data = "abc".as_bytes().as_ptr();
// The write below will be blocked as the buffer is full.
let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; //~ERROR: blocking isn't supported
let mut buf: [u8; 3] = [0; 3];
let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
error: unsupported operation: socketpair write: blocking isn't supported yet
--> $DIR/socketpair_write_blocking.rs:LL:CC
|
LL | let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair write: blocking isn't supported yet
|
= help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
= note: BACKTRACE:
= note: inside `main` at $DIR/socketpair_write_blocking.rs:LL:CC

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 1 previous error

Loading

0 comments on commit 0d66db9

Please sign in to comment.