diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs
index f9cf847621e1e..a53146f009191 100644
--- a/src/libstd/io/comm_adapters.rs
+++ b/src/libstd/io/comm_adapters.rs
@@ -8,30 +8,107 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use option::Option;
-use comm::{GenericPort, GenericChan};
+use prelude::*;
+
+use comm::{GenericPort, GenericChan, GenericSmartChan};
+use cmp;
+use io;
+use option::{None, Option, Some};
use super::{Reader, Writer};
+use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};
-pub struct PortReader
;
+/// Allows reading from a port.
+///
+/// # Example
+///
+/// ```
+/// let reader = PortReader::new(port);
+///
+/// let mut buf = ~[0u8, ..100];
+/// match reader.read(buf) {
+/// Some(nread) => println!("Read {} bytes", nread),
+/// None => println!("At the end of the stream!")
+/// }
+/// ```
+pub struct PortReader
{
+ priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed.
+ priv pos: uint, // How many of the buffered bytes have already be consumed.
+ priv port: P, // The port to pull data from.
+ priv closed: bool, // Whether the pipe this port connects to has been closed.
+}
impl> PortReader {
- pub fn new(_port: P) -> PortReader
{ fail!() }
+ pub fn new(port: P) -> PortReader
{
+ PortReader {
+ buf: None,
+ pos: 0,
+ port: port,
+ closed: false,
+ }
+ }
}
impl> Reader for PortReader {
- fn read(&mut self, _buf: &mut [u8]) -> Option { fail!() }
+ fn read(&mut self, buf: &mut [u8]) -> Option {
+ let mut num_read = 0;
+ loop {
+ match self.buf {
+ Some(ref prev) => {
+ let dst = buf.mut_slice_from(num_read);
+ let src = prev.slice_from(self.pos);
+ let count = cmp::min(dst.len(), src.len());
+ bytes::copy_memory(dst, src, count);
+ num_read += count;
+ self.pos += count;
+ },
+ None => (),
+ };
+ if num_read == buf.len() || self.closed {
+ break;
+ }
+ self.pos = 0;
+ self.buf = self.port.try_recv();
+ self.closed = self.buf.is_none();
+ }
+ if self.closed && num_read == 0 {
+ io::io_error::cond.raise(io::standard_error(io::EndOfFile));
+ None
+ } else {
+ Some(num_read)
+ }
+ }
- fn eof(&mut self) -> bool { fail!() }
+ fn eof(&mut self) -> bool { self.closed }
}
-pub struct ChanWriter;
+/// Allows writing to a chan.
+///
+/// # Example
+///
+/// ```
+/// let writer = ChanWriter::new(chan);
+/// writer.write("hello, world".as_bytes());
+/// ```
+pub struct ChanWriter {
+ chan: C,
+}
-impl> ChanWriter {
- pub fn new(_chan: C) -> ChanWriter { fail!() }
+impl> ChanWriter {
+ pub fn new(chan: C) -> ChanWriter {
+ ChanWriter { chan: chan }
+ }
}
-impl> Writer for ChanWriter {
- fn write(&mut self, _buf: &[u8]) { fail!() }
+impl> Writer for ChanWriter {
+ fn write(&mut self, buf: &[u8]) {
+ if !self.chan.try_send(buf.to_owned()) {
+ io::io_error::cond.raise(io::IoError {
+ kind: io::BrokenPipe,
+ desc: "Pipe closed",
+ detail: None
+ });
+ }
+ }
}
pub struct ReaderPort;
@@ -55,3 +132,87 @@ impl WriterChan {
impl GenericChan<~[u8]> for WriterChan {
fn send(&self, _x: ~[u8]) { fail!() }
}
+
+
+#[cfg(test)]
+mod test {
+ use prelude::*;
+ use super::*;
+ use io;
+ use comm;
+ use task;
+
+ #[test]
+ fn test_port_reader() {
+ let (port, chan) = comm::stream();
+ do task::spawn {
+ chan.send(~[1u8, 2u8]);
+ chan.send(~[]);
+ chan.send(~[3u8, 4u8]);
+ chan.send(~[5u8, 6u8]);
+ chan.send(~[7u8, 8u8]);
+ }
+
+ let mut reader = PortReader::new(port);
+ let mut buf = ~[0u8, ..3];
+
+ assert_eq!(false, reader.eof());
+
+ assert_eq!(Some(0), reader.read(~[]));
+ assert_eq!(false, reader.eof());
+
+ assert_eq!(Some(3), reader.read(buf));
+ assert_eq!(false, reader.eof());
+ assert_eq!(~[1,2,3], buf);
+
+ assert_eq!(Some(3), reader.read(buf));
+ assert_eq!(false, reader.eof());
+ assert_eq!(~[4,5,6], buf);
+
+ assert_eq!(Some(2), reader.read(buf));
+ assert_eq!(~[7,8,6], buf);
+ assert_eq!(true, reader.eof());
+
+ let mut err = None;
+ let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
+ err = Some(k)
+ }).inside(|| {
+ reader.read(buf)
+ });
+ assert_eq!(Some(io::EndOfFile), err);
+ assert_eq!(None, result);
+ assert_eq!(true, reader.eof());
+ assert_eq!(~[7,8,6], buf);
+
+ // Ensure it continues to fail in the same way.
+ err = None;
+ let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
+ err = Some(k)
+ }).inside(|| {
+ reader.read(buf)
+ });
+ assert_eq!(Some(io::EndOfFile), err);
+ assert_eq!(None, result);
+ assert_eq!(true, reader.eof());
+ assert_eq!(~[7,8,6], buf);
+ }
+
+ #[test]
+ fn test_chan_writer() {
+ let (port, chan) = comm::stream();
+ let mut writer = ChanWriter::new(chan);
+ writer.write_be_u32(42);
+
+ let wanted = ~[0u8, 0u8, 0u8, 42u8];
+ let got = do task::try { port.recv() }.unwrap();
+ assert_eq!(wanted, got);
+
+ let mut err = None;
+ io::io_error::cond.trap(|io::IoError { kind, .. } | {
+ err = Some(kind)
+ }).inside(|| {
+ writer.write_u8(1)
+ });
+ assert_eq!(Some(io::BrokenPipe), err);
+ }
+}