Skip to content

Commit

Permalink
Add CSV Decoder::capacity (#3674) (#3677)
Browse files Browse the repository at this point in the history
* Add CSV Decoder::capacity (#3674)

* Add test

* Remove unnecessary extern

* Add docs
  • Loading branch information
tustvold committed Feb 10, 2023
1 parent 5b1821e commit 3e08a75
Showing 1 changed file with 81 additions and 2 deletions.
83 changes: 81 additions & 2 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,15 @@ impl<R: BufRead> BufReader<R> {
loop {
let buf = self.reader.fill_buf()?;
let decoded = self.decoder.decode(buf)?;
if decoded == 0 {
self.reader.consume(decoded);
// Yield if decoded no bytes or the decoder is full
//
// The capacity check avoids looping around and potentially
// blocking reading data in fill_buf that isn't needed
// to flush the next batch
if decoded == 0 || self.decoder.capacity() == 0 {
break;
}
self.reader.consume(decoded);
}

self.decoder.flush()
Expand Down Expand Up @@ -574,6 +579,11 @@ impl Decoder {
self.line_number += rows.len();
Ok(Some(batch))
}

/// Returns the number of records that can be read before requiring a call to [`Self::flush`]
pub fn capacity(&self) -> usize {
self.batch_size - self.record_decoder.len()
}
}

/// Parses a slice of [`StringRecords`] into a [RecordBatch]
Expand Down Expand Up @@ -2269,4 +2279,73 @@ mod tests {
"Csv error: Encountered invalid UTF-8 data for line 1 and field 1",
);
}

struct InstrumentedRead<R> {
r: R,
fill_count: usize,
fill_sizes: Vec<usize>,
}

impl<R> InstrumentedRead<R> {
fn new(r: R) -> Self {
Self {
r,
fill_count: 0,
fill_sizes: vec![],
}
}
}

impl<R: Seek> Seek for InstrumentedRead<R> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.r.seek(pos)
}
}

impl<R: BufRead> Read for InstrumentedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.r.read(buf)
}
}

impl<R: BufRead> BufRead for InstrumentedRead<R> {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.fill_count += 1;
let buf = self.r.fill_buf()?;
self.fill_sizes.push(buf.len());
Ok(buf)
}

fn consume(&mut self, amt: usize) {
self.r.consume(amt)
}
}

#[test]
fn test_io() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
]));
let csv = "foo,bar\nbaz,foo\na,b\nc,d";
let mut read = InstrumentedRead::new(Cursor::new(csv.as_bytes()));
let reader = ReaderBuilder::new()
.with_schema(schema)
.with_batch_size(3)
.build_buffered(&mut read)
.unwrap();

let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].num_rows(), 3);
assert_eq!(batches[1].num_rows(), 1);

// Expect 4 calls to fill_buf
// 1. Read first 3 rows
// 2. Read final row
// 3. Delimit and flush final row
// 4. Iterator finished
assert_eq!(&read.fill_sizes, &[23, 3, 0, 0]);
assert_eq!(read.fill_count, 4);
}
}

0 comments on commit 3e08a75

Please sign in to comment.