diff --git a/src/tools/miri/src/shims/unix/socket.rs b/src/tools/miri/src/shims/unix/socket.rs index 7eb7e28ea367d..c14cb750146f2 100644 --- a/src/tools/miri/src/shims/unix/socket.rs +++ b/src/tools/miri/src/shims/unix/socket.rs @@ -1,15 +1,38 @@ +use std::cell::RefCell; +use std::collections::VecDeque; use std::io; +use std::io::{Error, ErrorKind, Read}; +use std::rc::{Rc, Weak}; use crate::shims::unix::*; -use crate::*; +use crate::{concurrency::VClock, *}; use self::fd::FileDescriptor; +/// The maximum capacity of the socketpair buffer in bytes. +/// This number is arbitrary as the value can always +/// be configured in the real system. +const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992; + /// Pair of connected sockets. -/// -/// We currently don't allow sending any data through this pair, so this can be just a dummy. #[derive(Debug)] -struct SocketPair; +struct SocketPair { + // By making the write link weak, a `write` can detect when all readers are + // gone, and trigger EPIPE as appropriate. + writebuf: Weak>, + readbuf: Rc>, + is_nonblock: bool, +} + +#[derive(Debug)] +struct Buffer { + buf: VecDeque, + clock: VClock, + /// Indicates if there is at least one active writer to this buffer. + /// If all writers of this buffer are dropped, buf_has_writer becomes false and we + /// indicate EOF instead of blocking. + buf_has_writer: bool, +} impl FileDescription for SocketPair { fn name(&self) -> &'static str { @@ -20,17 +43,102 @@ impl FileDescription for SocketPair { self: Box, _communicate_allowed: bool, ) -> InterpResult<'tcx, io::Result<()>> { + // This is used to signal socketfd of other side that there is no writer to its readbuf. + // If the upgrade fails, there is no need to update as all read ends have been dropped. + if let Some(writebuf) = self.writebuf.upgrade() { + writebuf.borrow_mut().buf_has_writer = false; + }; Ok(Ok(())) } + + fn read<'tcx>( + &mut self, + _communicate_allowed: bool, + bytes: &mut [u8], + ecx: &mut MiriInterpCx<'tcx>, + ) -> InterpResult<'tcx, io::Result> { + let request_byte_size = bytes.len(); + let mut readbuf = self.readbuf.borrow_mut(); + + // Always succeed on read size 0. + if request_byte_size == 0 { + return Ok(Ok(0)); + } + + if readbuf.buf.is_empty() { + if !readbuf.buf_has_writer { + // Socketpair with no writer and empty buffer. + // 0 bytes successfully read indicates end-of-file. + return Ok(Ok(0)); + } else { + if self.is_nonblock { + // Non-blocking socketpair with writer and empty buffer. + // https://linux.die.net/man/2/read + // EAGAIN or EWOULDBLOCK can be returned for socket, + // POSIX.1-2001 allows either error to be returned for this case. + // Since there is no ErrorKind for EAGAIN, WouldBlock is used. + return Ok(Err(Error::from(ErrorKind::WouldBlock))); + } else { + // Blocking socketpair with writer and empty buffer. + // FIXME: blocking is currently not supported + throw_unsup_format!("socketpair read: blocking isn't supported yet"); + } + } + } + + // Synchronize with all previous writes to this buffer. + // FIXME: this over-synchronizes; a more precise approach would be to + // only sync with the writes whose data we will read. + ecx.acquire_clock(&readbuf.clock); + // Do full read / partial read based on the space available. + // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior. + let actual_read_size = readbuf.buf.read(bytes).unwrap(); + return Ok(Ok(actual_read_size)); + } + + fn write<'tcx>( + &mut self, + _communicate_allowed: bool, + bytes: &[u8], + ecx: &mut MiriInterpCx<'tcx>, + ) -> InterpResult<'tcx, io::Result> { + let write_size = bytes.len(); + // Always succeed on write size 0. + // ("If count is zero and fd refers to a file other than a regular file, the results are not specified.") + if write_size == 0 { + return Ok(Ok(0)); + } + + let Some(writebuf) = self.writebuf.upgrade() else { + // If the upgrade from Weak to Rc fails, it indicates that all read ends have been + // closed. + return Ok(Err(Error::from(ErrorKind::BrokenPipe))); + }; + let mut writebuf = writebuf.borrow_mut(); + let data_size = writebuf.buf.len(); + let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.checked_sub(data_size).unwrap(); + if available_space == 0 { + if self.is_nonblock { + // Non-blocking socketpair with a full buffer. + return Ok(Err(Error::from(ErrorKind::WouldBlock))); + } else { + // Blocking socketpair with a full buffer. + throw_unsup_format!("socketpair write: blocking isn't supported yet"); + } + } + // Remember this clock so `read` can synchronize with us. + if let Some(clock) = &ecx.release_clock() { + writebuf.clock.join(clock); + } + // Do full write / partial write based on the space available. + let actual_write_size = write_size.min(available_space); + writebuf.buf.extend(&bytes[..actual_write_size]); + return Ok(Ok(actual_write_size)); + } } impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { - /// Currently this function this function is a stub. Eventually we need to - /// properly implement an FD type for sockets and have this function create - /// two sockets and associated FDs such that writing to one will produce - /// data that can be read from the other. - /// /// For more information on the arguments see the socketpair manpage: /// fn socketpair( @@ -42,17 +150,80 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ) -> InterpResult<'tcx, Scalar> { let this = self.eval_context_mut(); - let _domain = this.read_scalar(domain)?.to_i32()?; - let _type_ = this.read_scalar(type_)?.to_i32()?; - let _protocol = this.read_scalar(protocol)?.to_i32()?; + let domain = this.read_scalar(domain)?.to_i32()?; + let mut type_ = this.read_scalar(type_)?.to_i32()?; + let protocol = this.read_scalar(protocol)?.to_i32()?; let sv = this.deref_pointer(sv)?; - // FIXME: fail on unsupported inputs + let mut is_sock_nonblock = false; + + // Parse and remove the type flags that we support. If type != 0 after removing, + // unsupported flags are used. + if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") { + type_ &= !(this.eval_libc_i32("SOCK_STREAM")); + } + + // SOCK_NONBLOCK only exists on Linux. + if this.tcx.sess.target.os == "linux" { + if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") { + is_sock_nonblock = true; + type_ &= !(this.eval_libc_i32("SOCK_NONBLOCK")); + } + if type_ & this.eval_libc_i32("SOCK_CLOEXEC") == this.eval_libc_i32("SOCK_CLOEXEC") { + type_ &= !(this.eval_libc_i32("SOCK_CLOEXEC")); + } + } + + // Fail on unsupported input. + // AF_UNIX and AF_LOCAL are synonyms, so we accept both in case + // their values differ. + if domain != this.eval_libc_i32("AF_UNIX") && domain != this.eval_libc_i32("AF_LOCAL") { + throw_unsup_format!( + "socketpair: Unsupported domain {:#x} is used, only AF_UNIX \ + and AF_LOCAL are allowed", + domain + ); + } else if type_ != 0 { + throw_unsup_format!( + "socketpair: Unsupported type {:#x} is used, only SOCK_STREAM, \ + SOCK_CLOEXEC and SOCK_NONBLOCK are allowed", + type_ + ); + } else if protocol != 0 { + throw_unsup_format!( + "socketpair: Unsupported socket protocol {protocol} is used, \ + only 0 is allowed", + ); + } + + let buffer1 = Rc::new(RefCell::new(Buffer { + buf: VecDeque::new(), + clock: VClock::default(), + buf_has_writer: true, + })); + + let buffer2 = Rc::new(RefCell::new(Buffer { + buf: VecDeque::new(), + clock: VClock::default(), + buf_has_writer: true, + })); + + let socketpair_0 = SocketPair { + writebuf: Rc::downgrade(&buffer1), + readbuf: Rc::clone(&buffer2), + is_nonblock: is_sock_nonblock, + }; + + let socketpair_1 = SocketPair { + writebuf: Rc::downgrade(&buffer2), + readbuf: Rc::clone(&buffer1), + is_nonblock: is_sock_nonblock, + }; let fds = &mut this.machine.fds; - let sv0 = fds.insert_fd(FileDescriptor::new(SocketPair)); + let sv0 = fds.insert_fd(FileDescriptor::new(socketpair_0)); let sv0 = Scalar::try_from_int(sv0, sv.layout.size).unwrap(); - let sv1 = fds.insert_fd(FileDescriptor::new(SocketPair)); + let sv1 = fds.insert_fd(FileDescriptor::new(socketpair_1)); let sv1 = Scalar::try_from_int(sv1, sv.layout.size).unwrap(); this.write_scalar(sv0, &sv)?; diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.rs b/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.rs new file mode 100644 index 0000000000000..c28a6d966fe55 --- /dev/null +++ b/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.rs @@ -0,0 +1,12 @@ +//@ignore-target-windows: no libc socketpair on Windows + +// This is temporarily here because blocking on fd is not supported yet. +// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair + +fn main() { + let mut fds = [-1, -1]; + let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + // The read below will be blocked because the buffer is empty. + let mut buf: [u8; 3] = [0; 3]; + let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; //~ERROR: blocking isn't supported +} diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.stderr b/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.stderr new file mode 100644 index 0000000000000..b5ed72d9f1b18 --- /dev/null +++ b/src/tools/miri/tests/fail-dep/libc/socketpair_read_blocking.stderr @@ -0,0 +1,14 @@ +error: unsupported operation: socketpair read: blocking isn't supported yet + --> $DIR/socketpair_read_blocking.rs:LL:CC + | +LL | let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair read: blocking isn't supported yet + | + = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support + = note: BACKTRACE: + = note: inside `main` at $DIR/socketpair_read_blocking.rs:LL:CC + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 1 previous error + diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.rs b/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.rs new file mode 100644 index 0000000000000..e2fbc0ae4b426 --- /dev/null +++ b/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.rs @@ -0,0 +1,16 @@ +//@ignore-target-windows: no libc socketpair on Windows +// This is temporarily here because blocking on fd is not supported yet. +// When blocking is eventually supported, this will be moved to pass-dep/libc/libc-socketpair +fn main() { + let mut fds = [-1, -1]; + let _ = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + // Write size > buffer capacity + // Used up all the space in the buffer. + let arr1: [u8; 212992] = [1; 212992]; + let _ = unsafe { libc::write(fds[0], arr1.as_ptr() as *const libc::c_void, 212992) }; + let data = "abc".as_bytes().as_ptr(); + // The write below will be blocked as the buffer is full. + let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; //~ERROR: blocking isn't supported + let mut buf: [u8; 3] = [0; 3]; + let _res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; +} diff --git a/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.stderr b/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.stderr new file mode 100644 index 0000000000000..7b3a0d27636de --- /dev/null +++ b/src/tools/miri/tests/fail-dep/libc/socketpair_write_blocking.stderr @@ -0,0 +1,14 @@ +error: unsupported operation: socketpair write: blocking isn't supported yet + --> $DIR/socketpair_write_blocking.rs:LL:CC + | +LL | let _ = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ socketpair write: blocking isn't supported yet + | + = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support + = note: BACKTRACE: + = note: inside `main` at $DIR/socketpair_write_blocking.rs:LL:CC + +note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace + +error: aborting due to 1 previous error + diff --git a/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs new file mode 100644 index 0000000000000..324c0127ee976 --- /dev/null +++ b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs @@ -0,0 +1,124 @@ +//@ignore-target-windows: No libc socketpair on Windows +// test_race depends on a deterministic schedule. +//@compile-flags: -Zmiri-preemption-rate=0 +use std::thread; +fn main() { + test_socketpair(); + test_socketpair_threaded(); + test_race(); +} + +fn test_socketpair() { + let mut fds = [-1, -1]; + let mut res = + unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + // Read size == data available in buffer. + let data = "abcde".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; + assert_eq!(res, 5); + let mut buf: [u8; 5] = [0; 5]; + res = unsafe { + libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() + }; + assert_eq!(res, 5); + assert_eq!(buf, "abcde".as_bytes()); + + // Read size > data available in buffer. + let data = "abc".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3).try_into().unwrap() }; + assert_eq!(res, 3); + let mut buf2: [u8; 5] = [0; 5]; + res = unsafe { + libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t).try_into().unwrap() + }; + assert_eq!(res, 3); + assert_eq!(&buf2[0..3], "abc".as_bytes()); + + // Test read and write from another direction. + // Read size == data available in buffer. + let data = "12345".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() }; + assert_eq!(res, 5); + let mut buf3: [u8; 5] = [0; 5]; + res = unsafe { + libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap() + }; + assert_eq!(res, 5); + assert_eq!(buf3, "12345".as_bytes()); + + // Read size > data available in buffer. + let data = "123".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() }; + assert_eq!(res, 3); + let mut buf4: [u8; 5] = [0; 5]; + res = unsafe { + libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap() + }; + assert_eq!(res, 3); + assert_eq!(&buf4[0..3], "123".as_bytes()); +} + +fn test_socketpair_threaded() { + let mut fds = [-1, -1]; + let mut res = + unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + let data = "abcde".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; + assert_eq!(res, 5); + let thread1 = thread::spawn(move || { + let mut buf: [u8; 5] = [0; 5]; + let res: i64 = unsafe { + libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) + .try_into() + .unwrap() + }; + assert_eq!(res, 5); + assert_eq!(buf, "abcde".as_bytes()); + }); + thread1.join().unwrap(); + + // Read and write from different direction + let thread2 = thread::spawn(move || { + let data = "12345".as_bytes().as_ptr(); + let res: i64 = + unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; + assert_eq!(res, 5); + }); + thread2.join().unwrap(); + let mut buf: [u8; 5] = [0; 5]; + res = unsafe { + libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() + }; + assert_eq!(res, 5); + assert_eq!(buf, "12345".as_bytes()); +} +fn test_race() { + static mut VAL: u8 = 0; + let mut fds = [-1, -1]; + let mut res = + unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let thread1 = thread::spawn(move || { + let mut buf: [u8; 1] = [0; 1]; + // write() from the main thread will occur before the read() here + // because preemption is disabled and the main thread yields after write(). + let res: i32 = unsafe { + libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) + .try_into() + .unwrap() + }; + assert_eq!(res, 1); + assert_eq!(buf, "a".as_bytes()); + unsafe { assert_eq!(VAL, 1) }; + }); + unsafe { VAL = 1 }; + let data = "a".as_bytes().as_ptr(); + res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1).try_into().unwrap() }; + assert_eq!(res, 1); + thread::yield_now(); + thread1.join().unwrap(); +}