Skip to content

Commit

Permalink
api: add record-oriented IO functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Freaky committed Feb 28, 2020
1 parent 2cfc51c commit af2cb60
Showing 1 changed file with 161 additions and 11 deletions.
172 changes: 161 additions & 11 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,49 @@ pub trait BufReadExt: io::BufRead {
ByteLines { buf: self }
}

/// Returns an iterator over byte-terminated records of this reader, where
/// each record is represented as a byte string.
///
/// Each item yielded by this iterator is a `io::Result<Vec<u8>>`, where
/// an error is yielded if there was a problem reading from the underlying
/// reader.
///
/// On success, the next record in the iterator is returned. The line does
/// *not* contain a trailing terminator.
///
/// Note this differs from `byte_lines()` in that it has no special handling
/// for `\r`.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use std::io;
///
/// use bstr::io::BufReadExt;
///
/// # fn example() -> Result<(), io::Error> {
/// let cursor = io::Cursor::new(b"lorem\0ipsum\0dolor");
///
/// let mut records = vec![];
/// for result in cursor.byte_records(0) {
/// let record = result?;
/// records.push(record);
/// }
/// assert_eq!(records.len(), 3);
/// assert_eq!(records[0], "lorem".as_bytes());
/// assert_eq!(records[1], "ipsum".as_bytes());
/// assert_eq!(records[2], "dolor".as_bytes());
/// # Ok(()) }; example().unwrap()
/// ```
fn byte_records(self, terminator: u8) -> ByteRecords<Self>
where
Self: Sized,
{
ByteRecords { terminator, buf: self }
}

/// Executes the given closure on each line in the underlying reader.
///
/// If the closure returns an error (or if the underlying reader returns an
Expand Down Expand Up @@ -116,6 +159,9 @@ pub trait BufReadExt: io::BufRead {
/// This routine is useful for iterating over lines as quickly as
/// possible. Namely, a single allocation is reused for each line.
///
/// This is identical to `for_byte_terminated_record_with_terminator` with a
/// terminator of `\n`.
///
/// # Examples
///
/// Basic usage:
Expand All @@ -140,8 +186,75 @@ pub trait BufReadExt: io::BufRead {
/// # Ok(()) }; example().unwrap()
/// ```
fn for_byte_line_with_terminator<F>(
self,
for_each_line: F,
) -> io::Result<()>
where
Self: Sized,
F: FnMut(&[u8]) -> io::Result<bool>,
{
self.for_byte_terminated_record_with_terminator(b'\n', for_each_line)
}

/// Executes the given closure on each byte-terminated record in the
/// underlying reader.
///
/// If the closure returns an error (or if the underlying reader returns an
/// error), then iteration is stopped and the error is returned. If false
/// is returned, then iteration is stopped and no error is returned.
///
/// The closure given is called on exactly the same values as yielded by
/// the [`byte_records`](trait.BufReadExt.html#method.byte_records)
/// iterator. Namely, records do _not_ contain a trailing terminator byte.
///
/// This routine is useful for iterating over records as quickly as
/// possible. Namely, a single allocation is reused for each record.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use std::io;
///
/// use bstr::io::BufReadExt;
///
/// # fn example() -> Result<(), io::Error> {
/// let cursor = io::Cursor::new(b"lorem\0ipsum\0dolor");
///
/// let mut records = vec![];
/// cursor.for_byte_terminated_record(0, |record| {
/// records.push(record.to_vec());
/// Ok(true)
/// })?;
/// assert_eq!(records.len(), 3);
/// assert_eq!(records[0], "lorem".as_bytes());
/// assert_eq!(records[1], "ipsum".as_bytes());
/// assert_eq!(records[2], "dolor".as_bytes());
/// # Ok(()) }; example().unwrap()
/// ```
fn for_byte_terminated_record<F>(
self,
terminator: u8,
mut for_each_record: F,
) -> io::Result<()>
where
Self: Sized,
F: FnMut(&[u8]) -> io::Result<bool>,
{
self.for_byte_terminated_record_with_terminator(terminator, |chunk| {
if chunk.last() == Some(&terminator) {
for_each_record(&chunk[0..chunk.len() - 1])
} else {
for_each_record(&chunk)
}
})
}

fn for_byte_terminated_record_with_terminator<F>(
mut self,
mut for_each_line: F,
terminator: u8,
mut for_each_record: F
) -> io::Result<()>
where
Self: Sized,
Expand All @@ -151,14 +264,14 @@ pub trait BufReadExt: io::BufRead {
let mut res = Ok(());
let mut consumed = 0;
'outer: loop {
// Lend out complete line slices from our buffer
// Lend out complete record slices from our buffer
{
let mut buf = self.fill_buf()?;
while let Some(index) = buf.find_byte(b'\n') {
let (line, rest) = buf.split_at(index + 1);
while let Some(index) = buf.find_byte(terminator) {
let (record, rest) = buf.split_at(index + 1);
buf = rest;
consumed += line.len();
match for_each_line(&line) {
consumed += record.len();
match for_each_record(&record) {
Ok(false) => break 'outer,
Err(err) => {
res = Err(err);
Expand All @@ -168,9 +281,9 @@ pub trait BufReadExt: io::BufRead {
}
}

// Copy the final line fragment to our local buffer. This saves
// Copy the final record fragment to our local buffer. This saves
// read_until() from re-scanning a buffer we know contains no
// remaining newlines.
// remaining terminators.
bytes.extend_from_slice(&buf);
consumed += buf.len();
}
Expand All @@ -180,10 +293,10 @@ pub trait BufReadExt: io::BufRead {

// N.B. read_until uses a different version of memchr that may
// be slower than the memchr crate that bstr uses. However, this
// should only run for a fairly small number of lines, assuming a
// should only run for a fairly small number of records, assuming a
// decent buffer size.
self.read_until(b'\n', &mut bytes)?;
if bytes.is_empty() || !for_each_line(&bytes)? {
self.read_until(terminator, &mut bytes)?;
if bytes.is_empty() || !for_each_record(&bytes)? {
break;
}
bytes.clear();
Expand All @@ -208,6 +321,21 @@ pub struct ByteLines<B> {
buf: B,
}


/// An iterator over records from an instance of
/// [`std::io::BufRead`](https://doc.rust-lang.org/std/io/trait.BufRead.html).
///
/// This iterator is generally created by calling the
/// [`byte_records`](trait.BufReadExt.html#method.byte_records)
/// method on the
/// [`BufReadExt`](trait.BufReadExt.html)
/// trait.
#[derive(Debug)]
pub struct ByteRecords<B> {
buf: B,
terminator: u8,
}

impl<B: io::BufRead> Iterator for ByteLines<B> {
type Item = io::Result<Vec<u8>>;

Expand All @@ -224,6 +352,22 @@ impl<B: io::BufRead> Iterator for ByteLines<B> {
}
}

impl<B: io::BufRead> Iterator for ByteRecords<B> {
type Item = io::Result<Vec<u8>>;

fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
let mut bytes = vec![];
match self.buf.read_until(self.terminator, &mut bytes) {
Err(e) => Some(Err(e)),
Ok(0) => None,
Ok(_) => {
trim_record(&mut bytes, self.terminator);
Some(Ok(bytes))
}
}
}
}

fn trim_slice(mut line: &[u8]) -> &[u8] {
if line.last_byte() == Some(b'\n') {
line = &line[..line.len() - 1];
Expand All @@ -243,6 +387,12 @@ fn trim_line(line: &mut Vec<u8>) {
}
}

fn trim_record(record: &mut Vec<u8>, terminator: u8) {
if record.last_byte() == Some(terminator) {
record.pop_byte();
}
}

#[cfg(test)]
mod tests {
use super::BufReadExt;
Expand Down

0 comments on commit af2cb60

Please sign in to comment.