@@ -330,6 +330,7 @@ pub use self::{
330330} ;
331331use crate :: mem:: take;
332332use crate :: ops:: { Deref , DerefMut } ;
333+ use crate :: sys:: anonymous_pipe:: { AnonPipe , pipe as pipe_inner} ;
333334use crate :: { cmp, fmt, slice, str, sys} ;
334335
335336mod buffered;
@@ -3250,3 +3251,251 @@ impl<B: BufRead> Iterator for Lines<B> {
32503251 }
32513252 }
32523253}
3254+
3255+ /// Create anonymous pipe that is close-on-exec and blocking.
3256+ ///
3257+ /// # Behavior
3258+ ///
3259+ /// A pipe is a synchronous, unidirectional data channel between two or more processes, like an
3260+ /// interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular:
3261+ ///
3262+ /// * A read on a [`PipeReader`] blocks until the pipe is non-empty.
3263+ /// * A write on a [`PipeWriter`] blocks when the pipe is full.
3264+ /// * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`]
3265+ /// returns EOF.
3266+ /// * [`PipeReader`] can be shared, but only one process will consume the data in the pipe.
3267+ ///
3268+ /// # Capacity
3269+ ///
3270+ /// Pipe capacity is platform dependent. To quote the Linux [man page]:
3271+ ///
3272+ /// > Different implementations have different limits for the pipe capacity. Applications should
3273+ /// > not rely on a particular capacity: an application should be designed so that a reading process
3274+ /// > consumes data as soon as it is available, so that a writing process does not remain blocked.
3275+ ///
3276+ /// # Examples
3277+ ///
3278+ /// ```no_run
3279+ /// #![feature(anonymous_pipe)]
3280+ /// # #[cfg(miri)] fn main() {}
3281+ /// # #[cfg(not(miri))]
3282+ /// # fn main() -> std::io::Result<()> {
3283+ /// # use std::process::Command;
3284+ /// # use std::io::{Read, Write};
3285+ /// let (ping_rx, mut ping_tx) = std::io::pipe()?;
3286+ /// let (mut pong_rx, pong_tx) = std::io::pipe()?;
3287+ ///
3288+ /// // Spawn a process that echoes its input.
3289+ /// let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?;
3290+ ///
3291+ /// ping_tx.write_all(b"hello")?;
3292+ /// // Close to unblock echo_server's reader.
3293+ /// drop(ping_tx);
3294+ ///
3295+ /// let mut buf = String::new();
3296+ /// // Block until echo_server's writer is closed.
3297+ /// pong_rx.read_to_string(&mut buf)?;
3298+ /// assert_eq!(&buf, "hello");
3299+ ///
3300+ /// echo_server.wait()?;
3301+ /// # Ok(())
3302+ /// # }
3303+ /// ```
3304+ /// [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html
3305+ /// [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe
3306+ /// [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html
3307+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3308+ #[ inline]
3309+ pub fn pipe ( ) -> Result < ( PipeReader , PipeWriter ) > {
3310+ pipe_inner ( ) . map ( |( reader, writer) | ( PipeReader ( reader) , PipeWriter ( writer) ) )
3311+ }
3312+
3313+ /// Read end of the anonymous pipe.
3314+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3315+ #[ derive( Debug ) ]
3316+ pub struct PipeReader ( pub ( crate ) AnonPipe ) ;
3317+
3318+ /// Write end of the anonymous pipe.
3319+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3320+ #[ derive( Debug ) ]
3321+ pub struct PipeWriter ( pub ( crate ) AnonPipe ) ;
3322+
3323+ impl PipeReader {
3324+ /// Create a new [`PipeReader`] instance that shares the same underlying file description.
3325+ ///
3326+ /// # Examples
3327+ ///
3328+ /// ```no_run
3329+ /// #![feature(anonymous_pipe)]
3330+ /// # #[cfg(miri)] fn main() {}
3331+ /// # #[cfg(not(miri))]
3332+ /// # fn main() -> std::io::Result<()> {
3333+ /// # use std::fs;
3334+ /// # use std::io::Write;
3335+ /// # use std::process::Command;
3336+ /// const NUM_SLOT: u8 = 2;
3337+ /// const NUM_PROC: u8 = 5;
3338+ /// const OUTPUT: &str = "work.txt";
3339+ ///
3340+ /// let mut jobs = vec![];
3341+ /// let (reader, mut writer) = std::io::pipe()?;
3342+ ///
3343+ /// // Write NUM_SLOT characters the pipe.
3344+ /// writer.write_all(&[b'|'; NUM_SLOT as usize])?;
3345+ ///
3346+ /// // Spawn several processes that read a character from the pipe, do some work, then
3347+ /// // write back to the pipe. When the pipe is empty, the processes block, so only
3348+ /// // NUM_SLOT processes can be working at any given time.
3349+ /// for _ in 0..NUM_PROC {
3350+ /// jobs.push(
3351+ /// Command::new("bash")
3352+ /// .args(["-c",
3353+ /// &format!(
3354+ /// "read -n 1\n\
3355+ /// echo -n 'x' >> '{OUTPUT}'\n\
3356+ /// echo -n '|'",
3357+ /// ),
3358+ /// ])
3359+ /// .stdin(reader.try_clone()?)
3360+ /// .stdout(writer.try_clone()?)
3361+ /// .spawn()?,
3362+ /// );
3363+ /// }
3364+ ///
3365+ /// // Wait for all jobs to finish.
3366+ /// for mut job in jobs {
3367+ /// job.wait()?;
3368+ /// }
3369+ ///
3370+ /// // Check our work and clean up.
3371+ /// let xs = fs::read_to_string(OUTPUT)?;
3372+ /// fs::remove_file(OUTPUT)?;
3373+ /// assert_eq!(xs, "x".repeat(NUM_PROC.into()));
3374+ /// # Ok(())
3375+ /// # }
3376+ /// ```
3377+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3378+ pub fn try_clone ( & self ) -> Result < Self > {
3379+ self . 0 . try_clone ( ) . map ( Self )
3380+ }
3381+ }
3382+
3383+ impl PipeWriter {
3384+ /// Create a new [`PipeWriter`] instance that shares the same underlying file description.
3385+ ///
3386+ /// # Examples
3387+ ///
3388+ /// ```no_run
3389+ /// #![feature(anonymous_pipe)]
3390+ /// # #[cfg(miri)] fn main() {}
3391+ /// # #[cfg(not(miri))]
3392+ /// # fn main() -> std::io::Result<()> {
3393+ /// # use std::process::Command;
3394+ /// # use std::io::Read;
3395+ /// let (mut reader, writer) = std::io::pipe()?;
3396+ ///
3397+ /// // Spawn a process that writes to stdout and stderr.
3398+ /// let mut peer = Command::new("bash")
3399+ /// .args([
3400+ /// "-c",
3401+ /// "echo -n foo\n\
3402+ /// echo -n bar >&2"
3403+ /// ])
3404+ /// .stdout(writer.try_clone()?)
3405+ /// .stderr(writer)
3406+ /// .spawn()?;
3407+ ///
3408+ /// // Read and check the result.
3409+ /// let mut msg = String::new();
3410+ /// reader.read_to_string(&mut msg)?;
3411+ /// assert_eq!(&msg, "foobar");
3412+ ///
3413+ /// peer.wait()?;
3414+ /// # Ok(())
3415+ /// # }
3416+ /// ```
3417+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3418+ pub fn try_clone ( & self ) -> Result < Self > {
3419+ self . 0 . try_clone ( ) . map ( Self )
3420+ }
3421+ }
3422+
3423+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3424+ impl Read for & PipeReader {
3425+ fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < usize > {
3426+ self . 0 . read ( buf)
3427+ }
3428+ fn read_vectored ( & mut self , bufs : & mut [ IoSliceMut < ' _ > ] ) -> Result < usize > {
3429+ self . 0 . read_vectored ( bufs)
3430+ }
3431+ #[ inline]
3432+ fn is_read_vectored ( & self ) -> bool {
3433+ self . 0 . is_read_vectored ( )
3434+ }
3435+ fn read_to_end ( & mut self , buf : & mut Vec < u8 > ) -> Result < usize > {
3436+ self . 0 . read_to_end ( buf)
3437+ }
3438+ fn read_buf ( & mut self , buf : BorrowedCursor < ' _ > ) -> Result < ( ) > {
3439+ self . 0 . read_buf ( buf)
3440+ }
3441+ }
3442+
3443+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3444+ impl Read for PipeReader {
3445+ fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < usize > {
3446+ self . 0 . read ( buf)
3447+ }
3448+ fn read_vectored ( & mut self , bufs : & mut [ IoSliceMut < ' _ > ] ) -> Result < usize > {
3449+ self . 0 . read_vectored ( bufs)
3450+ }
3451+ #[ inline]
3452+ fn is_read_vectored ( & self ) -> bool {
3453+ self . 0 . is_read_vectored ( )
3454+ }
3455+ fn read_to_end ( & mut self , buf : & mut Vec < u8 > ) -> Result < usize > {
3456+ self . 0 . read_to_end ( buf)
3457+ }
3458+ fn read_buf ( & mut self , buf : BorrowedCursor < ' _ > ) -> Result < ( ) > {
3459+ self . 0 . read_buf ( buf)
3460+ }
3461+ }
3462+
3463+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3464+ impl Write for & PipeWriter {
3465+ fn write ( & mut self , buf : & [ u8 ] ) -> Result < usize > {
3466+ self . 0 . write ( buf)
3467+ }
3468+ #[ inline]
3469+ fn flush ( & mut self ) -> Result < ( ) > {
3470+ Ok ( ( ) )
3471+ }
3472+
3473+ fn write_vectored ( & mut self , bufs : & [ IoSlice < ' _ > ] ) -> Result < usize > {
3474+ self . 0 . write_vectored ( bufs)
3475+ }
3476+
3477+ #[ inline]
3478+ fn is_write_vectored ( & self ) -> bool {
3479+ self . 0 . is_write_vectored ( )
3480+ }
3481+ }
3482+
3483+ #[ unstable( feature = "anonymous_pipe" , issue = "127154" ) ]
3484+ impl Write for PipeWriter {
3485+ fn write ( & mut self , buf : & [ u8 ] ) -> Result < usize > {
3486+ self . 0 . write ( buf)
3487+ }
3488+ #[ inline]
3489+ fn flush ( & mut self ) -> Result < ( ) > {
3490+ Ok ( ( ) )
3491+ }
3492+
3493+ fn write_vectored ( & mut self , bufs : & [ IoSlice < ' _ > ] ) -> Result < usize > {
3494+ self . 0 . write_vectored ( bufs)
3495+ }
3496+
3497+ #[ inline]
3498+ fn is_write_vectored ( & self ) -> bool {
3499+ self . 0 . is_write_vectored ( )
3500+ }
3501+ }
0 commit comments