diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index a53146f009191..7dff5f74f4ba0 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -10,12 +10,10 @@ use prelude::*; -use comm::{GenericPort, GenericChan, GenericSmartChan}; -use cmp; +use comm::{GenericPort, GenericChan, GenericSmartChan, RecvIterator}; use io; -use option::{None, Option, Some}; use super::{Reader, Writer}; -use vec::{bytes, CopyableVector, MutableVector, ImmutableVector}; +use vec::CopyableVector; /// Allows reading from a port. /// @@ -30,55 +28,23 @@ use vec::{bytes, CopyableVector, MutableVector, ImmutableVector}; /// None => println!("At the end of the stream!") /// } /// ``` -pub struct PortReader

{ - priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed. - priv pos: uint, // How many of the buffered bytes have already be consumed. - priv port: P, // The port to pull data from. - priv closed: bool, // Whether the pipe this port connects to has been closed. +pub struct PortReader<'a, P> { + priv reader: io::extensions::BytesIterReader>, } -impl> PortReader

{ - pub fn new(port: P) -> PortReader

{ +impl<'a, P: GenericPort<~[u8]>> PortReader<'a, P> { + pub fn new(port: &'a P) -> PortReader<'a, P> { PortReader { - buf: None, - pos: 0, - port: port, - closed: false, + reader: io::extensions::BytesIterReader::new(port.recv_iter()), } } } -impl> Reader for PortReader

{ - fn read(&mut self, buf: &mut [u8]) -> Option { - let mut num_read = 0; - loop { - match self.buf { - Some(ref prev) => { - let dst = buf.mut_slice_from(num_read); - let src = prev.slice_from(self.pos); - let count = cmp::min(dst.len(), src.len()); - bytes::copy_memory(dst, src, count); - num_read += count; - self.pos += count; - }, - None => (), - }; - if num_read == buf.len() || self.closed { - break; - } - self.pos = 0; - self.buf = self.port.try_recv(); - self.closed = self.buf.is_none(); - } - if self.closed && num_read == 0 { - io::io_error::cond.raise(io::standard_error(io::EndOfFile)); - None - } else { - Some(num_read) - } - } +impl<'self, P: GenericPort<~[u8]>> Reader for PortReader<'self, P> { + + fn read(&mut self, buf: &mut [u8]) -> Option { self.reader.read(buf) } - fn eof(&mut self) -> bool { self.closed } + fn eof(&mut self) -> bool { self.reader.eof() } } /// Allows writing to a chan. @@ -153,7 +119,7 @@ mod test { chan.send(~[7u8, 8u8]); } - let mut reader = PortReader::new(port); + let mut reader = PortReader::new(&port); let mut buf = ~[0u8, ..3]; assert_eq!(false, reader.eof()); diff --git a/src/libstd/io/extensions.rs b/src/libstd/io/extensions.rs index 564e664027f73..80b0a595b5252 100644 --- a/src/libstd/io/extensions.rs +++ b/src/libstd/io/extensions.rs @@ -13,9 +13,11 @@ // XXX: Not sure how this should be structured // XXX: Iteration should probably be considered separately +use cmp::min; +use io::{EndOfFile, Decorator, Reader, io_error, standard_error}; use iter::Iterator; -use option::Option; -use io::{Reader, Decorator}; +use option::{None, Option, Some}; +use vec::bytes; /// An iterator that reads a single byte on each iteration, /// until `.read_byte()` returns `None`. @@ -54,6 +56,73 @@ impl<'self, R: Reader> Iterator for ByteIterator { } } + +/// Allows reading from an iterator of byte vectors. +/// +/// # Example +/// +/// ``` +/// let chunks = ~[~[1u8, 2u8], ~[3u8], ~[4u8, 5u8]].move_iter().filter(|vals| vals.len() > 1); +/// let mut reader = BytesIterReader::new(chunks); +/// +/// let mut buf = ~[0u8, ..3]; +/// match reader.read(buf) { +/// Some(nread) => println!("Read {} bytes", nread), +/// None => println!("At the end of the stream!") +/// } +/// ``` +pub struct BytesIterReader { + priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed. + priv pos: uint, // How many of the buffered bytes have already be consumed. + priv iter: I, // The iter to pull data from. + priv finished: bool, // Whether the wrapped iterator has been exhausted. +} + +impl> BytesIterReader { + pub fn new(iter: I) -> BytesIterReader { + BytesIterReader { + buf: None, + pos: 0, + iter: iter, + finished: false, + } + } +} + +impl> Reader for BytesIterReader { + + fn read(&mut self, buf: &mut [u8]) -> Option { + let mut num_read = 0; + loop { + match self.buf { + Some(ref prev) => { + let dst = buf.mut_slice_from(num_read); + let src = prev.slice_from(self.pos); + let count = min(dst.len(), src.len()); + bytes::copy_memory(dst, src, count); + num_read += count; + self.pos += count; + }, + None => (), + }; + if num_read == buf.len() || self.finished { + break; + } + self.pos = 0; + self.buf = self.iter.next(); + self.finished = self.buf.is_none(); + } + if self.finished && num_read == 0 { + io_error::cond.raise(standard_error(EndOfFile)); + None + } else { + Some(num_read) + } + } + + fn eof(&mut self) -> bool { self.finished } +} + pub fn u64_to_le_bytes(n: u64, size: uint, f: |v: &[u8]| -> T) -> T { assert!(size <= 8u); match size { @@ -138,7 +207,7 @@ pub fn u64_from_be_bytes(data: &[u8], mod test { use option::{None, Option, Some}; use io::mem::{MemReader, MemWriter}; - use io::{Reader, io_error, placeholder_error}; + use io::{EndOfFile, Reader, io_error, placeholder_error, standard_error}; use vec::ImmutableVector; struct InitialZeroByteReader { @@ -486,4 +555,59 @@ mod test { assert!(reader.read_le_f32() == 8.1250); } + #[test] + fn test_bytes_iter_reader() { + let input = (~[ + ~[1u8, 2u8], + ~[], + ~[42], // Should be skipped. + ~[3u8, 4u8], + ~[5u8, 6u8], + ~[7u8, 8u8], + ]).move_iter().filter(|x| x.len() != 1); + + let mut reader = super::BytesIterReader::new(input); + let mut buf = ~[0u8, ..3]; + + assert_eq!(false, reader.eof()); + + assert_eq!(Some(0), reader.read(~[])); + assert_eq!(false, reader.eof()); + + assert_eq!(Some(3), reader.read(buf)); + assert_eq!(false, reader.eof()); + assert_eq!(~[1,2,3], buf); + + assert_eq!(Some(3), reader.read(buf)); + assert_eq!(false, reader.eof()); + assert_eq!(~[4,5,6], buf); + + assert_eq!(Some(2), reader.read(buf)); + assert_eq!(~[7,8,6], buf); + assert_eq!(true, reader.eof()); + + let mut err = None; + let result = io_error::cond.trap(|standard_error(k, _, _)| { + err = Some(k) + }).inside(|| { + reader.read(buf) + }); + assert_eq!(Some(EndOfFile), err); + assert_eq!(None, result); + assert_eq!(true, reader.eof()); + assert_eq!(~[7,8,6], buf); + + // Ensure it continues to fail in the same way. + err = None; + let result = io_error::cond.trap(|standard_error(k, _, _)| { + err = Some(k) + }).inside(|| { + reader.read(buf) + }); + assert_eq!(Some(EndOfFile), err); + assert_eq!(None, result); + assert_eq!(true, reader.eof()); + assert_eq!(~[7,8,6], buf); + } + }