Skip to content

Commit

Permalink
api: add byte record oriented IO functions
Browse files Browse the repository at this point in the history
This mirrors the line iterating methods, but permits the caller to
specify an arbitrary terminator.

Closes #41
  • Loading branch information
Freaky authored and BurntSushi committed May 10, 2020
1 parent 7f785ec commit 342e443
Showing 1 changed file with 214 additions and 19 deletions.
233 changes: 214 additions & 19 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,49 @@ pub trait BufReadExt: io::BufRead {
ByteLines { buf: self }
}

/// Returns an iterator over byte-terminated records of this reader, where
/// each record is represented as a byte string.
///
/// Each item yielded by this iterator is a `io::Result<Vec<u8>>`, where
/// an error is yielded if there was a problem reading from the underlying
/// reader.
///
/// On success, the next record in the iterator is returned. The record
/// does *not* contain its trailing terminator.
///
/// Note that calling `byte_records(b'\n')` differs from `byte_lines()` in
/// that it has no special handling for `\r`.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use std::io;
///
/// use bstr::io::BufReadExt;
///
/// # fn example() -> Result<(), io::Error> {
/// let cursor = io::Cursor::new(b"lorem\x00ipsum\x00dolor");
///
/// let mut records = vec![];
/// for result in cursor.byte_records(b'\x00') {
/// let record = result?;
/// records.push(record);
/// }
/// assert_eq!(records.len(), 3);
/// assert_eq!(records[0], "lorem".as_bytes());
/// assert_eq!(records[1], "ipsum".as_bytes());
/// assert_eq!(records[2], "dolor".as_bytes());
/// # Ok(()) }; example().unwrap()
/// ```
fn byte_records(self, terminator: u8) -> ByteRecords<Self>
where
Self: Sized,
{
ByteRecords { terminator, buf: self }
}

/// Executes the given closure on each line in the underlying reader.
///
/// If the closure returns an error (or if the underlying reader returns an
Expand Down Expand Up @@ -98,7 +141,58 @@ pub trait BufReadExt: io::BufRead {
F: FnMut(&[u8]) -> io::Result<bool>,
{
self.for_byte_line_with_terminator(|line| {
for_each_line(&trim_slice(&line))
for_each_line(&trim_line_slice(&line))
})
}

/// Executes the given closure on each byte-terminated record in the
/// underlying reader.
///
/// If the closure returns an error (or if the underlying reader returns an
/// error), then iteration is stopped and the error is returned. If false
/// is returned, then iteration is stopped and no error is returned.
///
/// The closure given is called on exactly the same values as yielded by
/// the [`byte_records`](trait.BufReadExt.html#method.byte_records)
/// iterator. Namely, records do _not_ contain a trailing terminator byte.
///
/// This routine is useful for iterating over records as quickly as
/// possible. Namely, a single allocation is reused for each record.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use std::io;
///
/// use bstr::io::BufReadExt;
///
/// # fn example() -> Result<(), io::Error> {
/// let cursor = io::Cursor::new(b"lorem\x00ipsum\x00dolor");
///
/// let mut records = vec![];
/// cursor.for_byte_record(b'\x00', |record| {
/// records.push(record.to_vec());
/// Ok(true)
/// })?;
/// assert_eq!(records.len(), 3);
/// assert_eq!(records[0], "lorem".as_bytes());
/// assert_eq!(records[1], "ipsum".as_bytes());
/// assert_eq!(records[2], "dolor".as_bytes());
/// # Ok(()) }; example().unwrap()
/// ```
fn for_byte_record<F>(
self,
terminator: u8,
mut for_each_record: F,
) -> io::Result<()>
where
Self: Sized,
F: FnMut(&[u8]) -> io::Result<bool>,
{
self.for_byte_record_with_terminator(terminator, |chunk| {
for_each_record(&trim_record_slice(&chunk, terminator))
})
}

Expand All @@ -116,6 +210,9 @@ pub trait BufReadExt: io::BufRead {
/// This routine is useful for iterating over lines as quickly as
/// possible. Namely, a single allocation is reused for each line.
///
/// This is identical to `for_byte_record_with_terminator` with a
/// terminator of `\n`.
///
/// # Examples
///
/// Basic usage:
Expand All @@ -140,8 +237,59 @@ pub trait BufReadExt: io::BufRead {
/// # Ok(()) }; example().unwrap()
/// ```
fn for_byte_line_with_terminator<F>(
self,
for_each_line: F,
) -> io::Result<()>
where
Self: Sized,
F: FnMut(&[u8]) -> io::Result<bool>,
{
self.for_byte_record_with_terminator(b'\n', for_each_line)
}

/// Executes the given closure on each byte-terminated record in the
/// underlying reader.
///
/// If the closure returns an error (or if the underlying reader returns an
/// error), then iteration is stopped and the error is returned. If false
/// is returned, then iteration is stopped and no error is returned.
///
/// Unlike
/// [`for_byte_record`](trait.BufReadExt.html#method.for_byte_record),
/// the lines given to the closure *do* include the record terminator, if
/// one exists.
///
/// This routine is useful for iterating over records as quickly as
/// possible. Namely, a single allocation is reused for each record.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use std::io;
///
/// use bstr::B;
/// use bstr::io::BufReadExt;
///
/// # fn example() -> Result<(), io::Error> {
/// let cursor = io::Cursor::new(b"lorem\x00ipsum\x00dolor");
///
/// let mut records = vec![];
/// cursor.for_byte_record_with_terminator(b'\x00', |record| {
/// records.push(record.to_vec());
/// Ok(true)
/// })?;
/// assert_eq!(records.len(), 3);
/// assert_eq!(records[0], B(b"lorem\x00"));
/// assert_eq!(records[1], B("ipsum\x00"));
/// assert_eq!(records[2], B("dolor"));
/// # Ok(()) }; example().unwrap()
/// ```
fn for_byte_record_with_terminator<F>(
mut self,
mut for_each_line: F,
terminator: u8,
mut for_each_record: F,
) -> io::Result<()>
where
Self: Sized,
Expand All @@ -151,14 +299,14 @@ pub trait BufReadExt: io::BufRead {
let mut res = Ok(());
let mut consumed = 0;
'outer: loop {
// Lend out complete line slices from our buffer
// Lend out complete record slices from our buffer
{
let mut buf = self.fill_buf()?;
while let Some(index) = buf.find_byte(b'\n') {
let (line, rest) = buf.split_at(index + 1);
while let Some(index) = buf.find_byte(terminator) {
let (record, rest) = buf.split_at(index + 1);
buf = rest;
consumed += line.len();
match for_each_line(&line) {
consumed += record.len();
match for_each_record(&record) {
Ok(false) => break 'outer,
Err(err) => {
res = Err(err);
Expand All @@ -168,9 +316,9 @@ pub trait BufReadExt: io::BufRead {
}
}

// Copy the final line fragment to our local buffer. This saves
// read_until() from re-scanning a buffer we know contains no
// remaining newlines.
// Copy the final record fragment to our local buffer. This
// saves read_until() from re-scanning a buffer we know
// contains no remaining terminators.
bytes.extend_from_slice(&buf);
consumed += buf.len();
}
Expand All @@ -180,10 +328,10 @@ pub trait BufReadExt: io::BufRead {

// N.B. read_until uses a different version of memchr that may
// be slower than the memchr crate that bstr uses. However, this
// should only run for a fairly small number of lines, assuming a
// should only run for a fairly small number of records, assuming a
// decent buffer size.
self.read_until(b'\n', &mut bytes)?;
if bytes.is_empty() || !for_each_line(&bytes)? {
self.read_until(terminator, &mut bytes)?;
if bytes.is_empty() || !for_each_record(&bytes)? {
break;
}
bytes.clear();
Expand All @@ -208,6 +356,24 @@ pub struct ByteLines<B> {
buf: B,
}

/// An iterator over records from an instance of
/// [`std::io::BufRead`](https://doc.rust-lang.org/std/io/trait.BufRead.html).
///
/// A byte record is any sequence of bytes terminated by a particular byte
/// chosen by the caller. For example, NUL separated byte strings are said to
/// be NUL-terminated byte records.
///
/// This iterator is generally created by calling the
/// [`byte_records`](trait.BufReadExt.html#method.byte_records)
/// method on the
/// [`BufReadExt`](trait.BufReadExt.html)
/// trait.
#[derive(Debug)]
pub struct ByteRecords<B> {
buf: B,
terminator: u8,
}

impl<B: io::BufRead> Iterator for ByteLines<B> {
type Item = io::Result<Vec<u8>>;

Expand All @@ -224,14 +390,20 @@ impl<B: io::BufRead> Iterator for ByteLines<B> {
}
}

fn trim_slice(mut line: &[u8]) -> &[u8] {
if line.last_byte() == Some(b'\n') {
line = &line[..line.len() - 1];
if line.last_byte() == Some(b'\r') {
line = &line[..line.len() - 1];
impl<B: io::BufRead> Iterator for ByteRecords<B> {
type Item = io::Result<Vec<u8>>;

fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
let mut bytes = vec![];
match self.buf.read_until(self.terminator, &mut bytes) {
Err(e) => Some(Err(e)),
Ok(0) => None,
Ok(_) => {
trim_record(&mut bytes, self.terminator);
Some(Ok(bytes))
}
}
}
line
}

fn trim_line(line: &mut Vec<u8>) {
Expand All @@ -243,6 +415,29 @@ fn trim_line(line: &mut Vec<u8>) {
}
}

fn trim_line_slice(mut line: &[u8]) -> &[u8] {
if line.last_byte() == Some(b'\n') {
line = &line[..line.len() - 1];
if line.last_byte() == Some(b'\r') {
line = &line[..line.len() - 1];
}
}
line
}

fn trim_record(record: &mut Vec<u8>, terminator: u8) {
if record.last_byte() == Some(terminator) {
record.pop_byte();
}
}

fn trim_record_slice(mut record: &[u8], terminator: u8) -> &[u8] {
if record.last_byte() == Some(terminator) {
record = &record[..record.len() - 1];
}
record
}

#[cfg(test)]
mod tests {
use super::BufReadExt;
Expand Down

0 comments on commit 342e443

Please sign in to comment.