Skip to content

Commit

Permalink
[WIP] try to refactor into streaming iterator "let ... = x.next()" style
Browse files Browse the repository at this point in the history
  • Loading branch information
Roderick Bovee committed Aug 28, 2019
1 parent 4aef928 commit 572bd74
Show file tree
Hide file tree
Showing 4 changed files with 585 additions and 578 deletions.
92 changes: 49 additions & 43 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ use std::marker::PhantomData;

use crate::util::ParseError;

pub struct RecReader<'a> {
file: &'a mut dyn io::Read,
last: bool,
pub struct RecBuffer<'a, T> {
record_type: PhantomData<T>,
file: Option<&'a mut dyn io::Read>,
pub buf: Vec<u8>,
pub pos: usize,
pub last: bool,
pub count: usize,
}

/// A buffer that wraps an object with the `Read` trait and allows extracting
/// a set of slices to data. Acts as a lower-level primitive for our FASTX
/// readers.
impl<'a> RecReader<'a> {
impl<'a: 'b, 'b, T> RecBuffer<'a, T> where T: RecordFormat<'b> {
/// Instantiate a new buffer.
///
/// # Panics
Expand All @@ -23,7 +26,7 @@ impl<'a> RecReader<'a> {
file: &'a mut dyn io::Read,
buf_size: usize,
header: &[u8],
) -> Result<RecReader<'a>, ParseError> {
) -> Result<Self, ParseError> {
let mut buf = Vec::with_capacity(buf_size + header.len());
unsafe {
buf.set_len(buf_size + header.len());
Expand All @@ -34,27 +37,45 @@ impl<'a> RecReader<'a> {
buf.set_len(amt_read + header.len());
}

Ok(RecReader {
file,
last: false,
Ok(RecBuffer {
record_type: PhantomData,
file: Some(file),
buf,
pos: 0,
last: false,
count: 0,
})
}

pub fn from_bytes(data: &'a [u8]) -> Self {
RecBuffer {
record_type: PhantomData,
file: None,
buf: data.to_vec(),
pos: 0,
last: true,
count: 0,
}
}

/// Refill the buffer and increase its capacity if it's not big enough
pub fn refill(&mut self, used: usize) -> Result<bool, ParseError> {
if used == 0 && self.last {
pub fn refill(&mut self) -> Result<bool, ParseError> {
if self.pos == 0 && self.last {
return Ok(true);
}
let cur_length = self.buf.len() - used;
let cur_length = self.buf.len() - self.pos;
let new_length = cur_length + self.buf.capacity();

let mut new_buf = Vec::with_capacity(new_length);
unsafe {
new_buf.set_len(new_length);
}
new_buf[..cur_length].copy_from_slice(&self.buf[used..]);
let amt_read = self.file.read(&mut new_buf[cur_length..])?;
new_buf[..cur_length].copy_from_slice(&self.buf[self.pos..]);
let amt_read = if let Some(file) = &mut self.file {
file.read(&mut new_buf[cur_length..])?
} else {
0
};
unsafe {
new_buf.set_len(cur_length + amt_read);
}
Expand All @@ -63,47 +84,32 @@ impl<'a> RecReader<'a> {
Ok(false)
}

pub fn get_buffer<T>(&self, record_count: usize) -> RecBuffer<T> {
RecBuffer {
buf: &self.buf,
pos: 0,
last: self.last,
record_type: PhantomData,
count: record_count,
}
}
}

#[derive(Debug)]
pub struct RecBuffer<'a, T> {
pub buf: &'a [u8],
pub pos: usize,
pub last: bool,
record_type: PhantomData<T>,
pub count: usize,
}

impl<'a, T> RecBuffer<'a, T> {
pub fn from_bytes(data: &'a [u8]) -> Self {
RecBuffer {
buf: data,
pos: 0,
last: true,
record_type: PhantomData,
count: 0,
pub fn next(&'a mut self) -> Option<Result<T, ParseError>> {
loop {
if let Some(x) = T::parse(self) {
return Some(x);
}
match self.refill() {
Err(e) => return Some(Err(e)),
Ok(true) => return None,
Ok(false) => {},
};
}
}
}

#[test]
fn test_from_bytes() {
// this is not a useful test, but it does get the compiler to shut up
// about `from_bytes` not being used
// this is not a great test
let rb: RecBuffer<String> = RecBuffer::from_bytes(b"test");
assert_eq!(rb.pos, 0);
assert_eq!(rb.buf, b"test");
}

pub trait RecordFormat<'b> {
fn parse(rbuf: &'b mut RecBuffer<Self>) -> Option<Result<Self, ParseError>> where Self: Sized;
}

// pub fn parse<T, E, F>(reader: &'s mut io::Read, header: &[u8], ref mut callback: F) -> Result<(), E> where
// E: From<ParseError>,
// F: FnMut(T) -> Result<(), E>,
Expand Down
Loading

0 comments on commit 572bd74

Please sign in to comment.