diff --git a/polars/polars-io/src/csv.rs b/polars/polars-io/src/csv.rs index cdeeff6fde20..8ffbd5fb7a2c 100644 --- a/polars/polars-io/src/csv.rs +++ b/polars/polars-io/src/csv.rs @@ -41,7 +41,7 @@ //! } //! ``` //! -use crate::csv_core::csv::{build_csv_reader, SequentialReader}; +use crate::csv_core::csv::{build_csv_reader, CoreReader}; use crate::mmap::MmapBytesReader; use crate::utils::{resolve_homedir, to_arrow_compatible_df}; use crate::{SerReader, SerWriter}; @@ -345,7 +345,7 @@ where self } - pub fn build_inner_reader(self) -> Result> { + pub fn build_inner_reader(self) -> Result> { build_csv_reader( self.reader, self.stop_after_n_rows, diff --git a/polars/polars-io/src/csv_core/csv.rs b/polars/polars-io/src/csv_core/csv.rs index feadec6e4a79..a856151037c2 100644 --- a/polars/polars-io/src/csv_core/csv.rs +++ b/polars/polars-io/src/csv_core/csv.rs @@ -4,7 +4,6 @@ use crate::csv_core::{buffer::*, parser::*}; use crate::mmap::MmapBytesReader; use crate::PhysicalIoExpr; use crate::ScanAggregation; -use csv::ByteRecordsIntoIter; use polars_arrow::array::*; use polars_core::utils::accumulate_dataframes_vertical; use polars_core::{prelude::*, POOL}; @@ -18,13 +17,13 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicUsize, Arc}; /// CSV file reader -pub struct SequentialReader { +pub struct CoreReader { /// Explicit schema for the CSV file schema: SchemaRef, /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - record_iter: Option>, + reader: Option, /// Current line number, used in error reporting line_number: usize, ignore_parser_errors: bool, @@ -42,7 +41,7 @@ pub struct SequentialReader { null_values: Option>, } -impl fmt::Debug for SequentialReader +impl fmt::Debug for CoreReader where R: Read + MmapBytesReader, { @@ -103,7 +102,7 @@ impl RunningSize { } } -impl SequentialReader { +impl CoreReader { /// Returns the schema of the reader, useful for getting the schema without reading /// record batches pub fn schema(&self) -> SchemaRef { @@ -442,34 +441,30 @@ impl SequentialReader { ) -> Result { let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads()); - let mut df = match (&self.path, self.record_iter.is_some()) { + let mut df = match &self.path { // we have a path so we can mmap - (Some(p), _) => { + Some(p) => { let file = std::fs::File::open(p)?; let mmap = unsafe { memmap::Mmap::map(&file)? }; let bytes = mmap[..].as_ref(); self.parse_csv(n_threads, bytes, predicate.as_ref())? } - (None, true) => { - // get a hold of the reader + mmapreader - let mut r = std::mem::take(&mut self.record_iter) - .unwrap() - .into_reader() - .into_inner(); + None => { + let mut reader = self.reader.take().unwrap(); // we have a file so we can mmap - if let Some(file) = r.to_file() { + if let Some(file) = reader.to_file() { let mmap = unsafe { memmap::Mmap::map(file)? }; let bytes = mmap[..].as_ref(); self.parse_csv(n_threads, bytes, predicate.as_ref())? } else { // we can get the bytes for free - let bytes = if let Some(bytes) = r.to_bytes() { + let bytes = if let Some(bytes) = reader.to_bytes() { Cow::Borrowed(bytes) } else { // we have to read to an owned buffer to get the bytes. let mut bytes = Vec::with_capacity(1024 * 128); - r.read_to_end(&mut bytes)?; + reader.read_to_end(&mut bytes)?; if !bytes.is_empty() && (bytes[bytes.len() - 1] != b'\n' || bytes[bytes.len() - 1] != b'\r') { @@ -478,10 +473,12 @@ impl SequentialReader { Cow::Owned(bytes) }; - self.parse_csv(n_threads, &bytes, predicate.as_ref())? + let out = self.parse_csv(n_threads, &bytes, predicate.as_ref())?; + // restore reader for a potential second run + self.reader = Some(reader); + out } } - _ => return Err(PolarsError::Other("file or reader must be set".into())), }; if let Some(aggregate) = aggregate { @@ -524,7 +521,7 @@ pub fn build_csv_reader( low_memory: bool, comment_char: Option, null_values: Option, -) -> Result> { +) -> Result> { // check if schema should be inferred let delimiter = delimiter.unwrap_or(b','); let schema = match schema { @@ -554,13 +551,10 @@ pub fn build_csv_reader( projection = Some(prj); } - let csv_reader = init_csv_reader(reader, has_header, delimiter, comment_char); - let record_iter = Some(csv_reader.into_byte_records()); - - Ok(SequentialReader { + Ok(CoreReader { schema, projection, - record_iter, + reader: Some(reader), line_number: if has_header { 1 } else { 0 }, ignore_parser_errors, skip_rows,