Skip to content

Commit

Permalink
io: remove unused csv::csv_core::Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 20, 2021
1 parent 5692bbc commit bdab835
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
4 changes: 2 additions & 2 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
//! }
//! ```
//!
use crate::csv_core::csv::{build_csv_reader, SequentialReader};
use crate::csv_core::csv::{build_csv_reader, CoreReader};
use crate::mmap::MmapBytesReader;
use crate::utils::{resolve_homedir, to_arrow_compatible_df};
use crate::{SerReader, SerWriter};
Expand Down Expand Up @@ -349,7 +349,7 @@ where
self
}

pub fn build_inner_reader(self) -> Result<SequentialReader<R>> {
pub fn build_inner_reader(self) -> Result<CoreReader<R>> {
build_csv_reader(
self.reader,
self.stop_after_n_rows,
Expand Down
44 changes: 19 additions & 25 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::csv_core::{buffer::*, parser::*};
use crate::mmap::MmapBytesReader;
use crate::PhysicalIoExpr;
use crate::ScanAggregation;
use csv::ByteRecordsIntoIter;
use polars_arrow::array::ValueSize;
use polars_arrow::array::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::{prelude::*, POOL};
use rayon::prelude::*;
Expand All @@ -18,13 +17,13 @@ use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicUsize, Arc};

/// CSV file reader
pub struct SequentialReader<R: Read + MmapBytesReader> {
pub struct CoreReader<R: Read + MmapBytesReader> {
/// Explicit schema for the CSV file
schema: SchemaRef,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
record_iter: Option<ByteRecordsIntoIter<R>>,
reader: Option<R>,
/// Current line number, used in error reporting
line_number: usize,
ignore_parser_errors: bool,
Expand All @@ -42,7 +41,7 @@ pub struct SequentialReader<R: Read + MmapBytesReader> {
null_values: Option<Vec<String>>,
}

impl<R> fmt::Debug for SequentialReader<R>
impl<R> fmt::Debug for CoreReader<R>
where
R: Read + MmapBytesReader,
{
Expand Down Expand Up @@ -103,7 +102,7 @@ impl RunningSize {
}
}

impl<R: Read + Sync + Send + MmapBytesReader> SequentialReader<R> {
impl<R: Read + Sync + Send + MmapBytesReader> CoreReader<R> {
/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
Expand Down Expand Up @@ -442,34 +441,30 @@ impl<R: Read + Sync + Send + MmapBytesReader> SequentialReader<R> {
) -> Result<DataFrame> {
let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());

let mut df = match (&self.path, self.record_iter.is_some()) {
let mut df = match &self.path {
// we have a path so we can mmap
(Some(p), _) => {
Some(p) => {
let file = std::fs::File::open(p)?;
let mmap = unsafe { memmap::Mmap::map(&file)? };
let bytes = mmap[..].as_ref();
self.parse_csv(n_threads, bytes, predicate.as_ref())?
}
(None, true) => {
// get a hold of the reader + mmapreader
let mut r = std::mem::take(&mut self.record_iter)
.unwrap()
.into_reader()
.into_inner();
None => {
let mut reader = self.reader.take().unwrap();

// we have a file so we can mmap
if let Some(file) = r.to_file() {
if let Some(file) = reader.to_file() {
let mmap = unsafe { memmap::Mmap::map(file)? };
let bytes = mmap[..].as_ref();
self.parse_csv(n_threads, bytes, predicate.as_ref())?
} else {
// we can get the bytes for free
let bytes = if let Some(bytes) = r.to_bytes() {
let bytes = if let Some(bytes) = reader.to_bytes() {
Cow::Borrowed(bytes)
} else {
// we have to read to an owned buffer to get the bytes.
let mut bytes = Vec::with_capacity(1024 * 128);
r.read_to_end(&mut bytes)?;
reader.read_to_end(&mut bytes)?;
if !bytes.is_empty()
&& (bytes[bytes.len() - 1] != b'\n' || bytes[bytes.len() - 1] != b'\r')
{
Expand All @@ -478,10 +473,12 @@ impl<R: Read + Sync + Send + MmapBytesReader> SequentialReader<R> {
Cow::Owned(bytes)
};

self.parse_csv(n_threads, &bytes, predicate.as_ref())?
let out = self.parse_csv(n_threads, &bytes, predicate.as_ref())?;
// restore reader for a potential second run
self.reader = Some(reader);
out
}
}
_ => return Err(PolarsError::Other("file or reader must be set".into())),
};

if let Some(aggregate) = aggregate {
Expand Down Expand Up @@ -524,7 +521,7 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
) -> Result<SequentialReader<R>> {
) -> Result<CoreReader<R>> {
// check if schema should be inferred
let delimiter = delimiter.unwrap_or(b',');
let schema = match schema {
Expand Down Expand Up @@ -554,13 +551,10 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
projection = Some(prj);
}

let csv_reader = init_csv_reader(reader, has_header, delimiter, comment_char);
let record_iter = Some(csv_reader.into_byte_records());

Ok(SequentialReader {
Ok(CoreReader {
schema,
projection,
record_iter,
reader: Some(reader),
line_number: if has_header { 1 } else { 0 },
ignore_parser_errors,
skip_rows,
Expand Down

0 comments on commit bdab835

Please sign in to comment.