Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: refactor streaming CSV inference code #4717

Merged
merged 1 commit into from
Dec 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 105 additions & 69 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use bytes::Buf;
use bytes::{Buf, Bytes};

use datafusion_common::DataFusionError;

use futures::{pin_mut, StreamExt, TryStreamExt};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{ObjectMeta, ObjectStore};

use super::FileFormat;
Expand Down Expand Up @@ -125,75 +125,16 @@ impl FileFormat for CsvFormat {

let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

'iterating_objects: for object in objects {
for object in objects {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await?
.into_stream()
.map_err(|e| DataFusionError::External(Box::new(e)));
let stream = newline_delimited_stream(stream);
pin_mut!(stream);

let mut column_names = vec![];
let mut column_type_possibilities = vec![];
let mut first_chunk = true;

'reading_object: while let Some(data) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
self.file_compression_type.convert_read(data.reader())?,
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
self.has_header && first_chunk,
)?;
records_to_read -= records_read;

if first_chunk {
// set up initial structures for recording inferred schema across chunks
(column_names, column_type_possibilities) = fields
.into_iter()
.map(|field| {
let mut possibilities = HashSet::new();
if records_read > 0 {
// at least 1 data row read, record the inferred datatype
possibilities.insert(field.data_type().clone());
}
(field.name().clone(), possibilities)
})
.unzip();
first_chunk = false;
} else {
if fields.len() != column_type_possibilities.len() {
return Err(DataFusionError::Execution(
format!(
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
Expected {} records, found {} records",
column_type_possibilities.len(),
fields.len()
)
));
}

column_type_possibilities.iter_mut().zip(fields).for_each(
|(possibilities, field)| {
possibilities.insert(field.data_type().clone());
},
);
}

if records_to_read == 0 {
break 'reading_object;
}
}

schemas.push(build_schema_helper(
column_names,
&column_type_possibilities,
));
let stream = read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.await?;
records_to_read -= records_read;
schemas.push(schema);
if records_to_read == 0 {
break 'iterating_objects;
break;
}
}

Expand Down Expand Up @@ -227,6 +168,101 @@ impl FileFormat for CsvFormat {
}
}

/// Return a newline delimited stream from the specified file on
/// object store
///
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> impl Stream<Item = Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);

match stream {
Ok(s) => newline_delimited_stream(
s.into_stream()
.map_err(|e| DataFusionError::External(Box::new(e))),
)
.left_stream(),
Err(e) => futures::stream::iter(vec![Err(e)]).right_stream(),
}
}

impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
/// number of lines that were read
async fn infer_schema_from_stream(
&self,
mut records_to_read: usize,
stream: impl Stream<Item = Result<Bytes>>,
) -> Result<(Schema, usize)> {
let mut total_records_read = 0;
let mut column_names = vec![];
let mut column_type_possibilities = vec![];
let mut first_chunk = true;

pin_mut!(stream);

while let Some(chunk) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
self.file_compression_type.convert_read(chunk.reader())?,
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
self.has_header && first_chunk,
)?;
records_to_read -= records_read;
total_records_read += records_read;

if first_chunk {
// set up initial structures for recording inferred schema across chunks
(column_names, column_type_possibilities) = fields
.into_iter()
.map(|field| {
let mut possibilities = HashSet::new();
if records_read > 0 {
// at least 1 data row read, record the inferred datatype
possibilities.insert(field.data_type().clone());
}
(field.name().clone(), possibilities)
})
.unzip();
first_chunk = false;
} else {
if fields.len() != column_type_possibilities.len() {
return Err(DataFusionError::Execution(
format!(
"Encountered unequal lengths between records on CSV file whilst inferring schema. \
Expected {} records, found {} records",
column_type_possibilities.len(),
fields.len()
)
));
}

column_type_possibilities.iter_mut().zip(fields).for_each(
|(possibilities, field)| {
possibilities.insert(field.data_type().clone());
},
);
}

if records_to_read == 0 {
break;
}
}

let schema = build_schema_helper(column_names, &column_type_possibilities);
Ok((schema, total_records_read))
}
}

fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schema {
let fields = names
.into_iter()
Expand Down