Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for async csv reading. #562

Merged
merged 3 commits into from
Oct 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ hash_hasher = "^2.0.3"
# For SIMD utf8 validation
simdutf8 = "0.1.3"

# for csv io
csv = { version = "^1.1", optional = true }

# for csv async io
csv-async = { version = "^1.1", optional = true }

regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
Expand Down Expand Up @@ -78,6 +82,9 @@ criterion = "0.3"
flate2 = "1"
doc-comment = "0.3"
crossbeam-channel = "0.5.1"
# used to test async readers
tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }

Expand All @@ -89,6 +96,7 @@ rustdoc-args = ["--cfg", "docsrs"]
default = []
full = [
"io_csv",
"io_csv_async",
"io_json",
"io_ipc",
"io_flight",
Expand All @@ -106,7 +114,9 @@ full = [
]
merge_sort = ["itertools"]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["arrow-format"]
Expand Down Expand Up @@ -146,6 +156,8 @@ skip_feature_sets = [
["io_csv"],
["io_csv_read"],
["io_csv_write"],
["io_csv_async"],
["io_csv_read_async"],
["io_avro"],
["io_json"],
["io_flight"],
Expand Down
36 changes: 36 additions & 0 deletions examples/csv_read_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::Arc;

use futures::io::Cursor;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::csv::read_async::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let file = File::open(file_path).await?.compat();

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let schema = Arc::new(infer_schema(&mut reader, None, true, &infer).await?);

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

let batch = deserialize_batch(
&rows[..rows_read],
schema.fields(),
None,
0,
deserialize_column,
)?;
println!("{}", batch.column(0));
Ok(())
}
13 changes: 13 additions & 0 deletions guide/src/io/csv_reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ thereby maximizing IO throughput. The example below shows how to do just that:
{{#include ../../../examples/csv_read_parallel.rs}}
```

## Async

This crate also supports reading from a CSV asyncronously through the `csv-async` crate.
The example below demonstrates this:

```rust
{{#include ../../../examples/csv_read_async.rs}}
```

Note that the deserialization _should_ be performed on a separate thread to not
block (see also [here](https://ryhl.io/blog/async-what-is-blocking/)), which this
example does not show.

## Customization

In the code above, `parser` and `infer` allow for customization: they declare
Expand Down
16 changes: 12 additions & 4 deletions src/io/csv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#![deny(missing_docs)]
//! Transfer data between the Arrow memory format and CSV (comma-separated values).
//! Convert data between the Arrow and CSV (comma-separated values).

use crate::error::ArrowError;

pub use csv::Error as CSVError;
#[cfg(any(feature = "io_csv_read_async", feature = "io_csv_read"))]
mod read_utils;
#[cfg(any(feature = "io_csv_read_async", feature = "io_csv_read"))]
mod utils;

impl From<CSVError> for ArrowError {
fn from(error: CSVError) -> Self {
#[cfg(any(feature = "io_csv_read", feature = "io_csv_write"))]
impl From<csv::Error> for ArrowError {
fn from(error: csv::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
Expand All @@ -23,3 +27,7 @@ pub mod read;
#[cfg(feature = "io_csv_write")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_csv_write")))]
pub mod write;

#[cfg(feature = "io_csv_read_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_csv_read_async")))]
pub mod read_async;
214 changes: 13 additions & 201 deletions src/io/csv/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,22 @@
use std::sync::Arc;

use chrono::Datelike;
use csv::ByteRecord;

use crate::{
array::*,
datatypes::*,
error::{ArrowError, Result},
array::Array,
datatypes::{DataType, Field},
error::Result,
record_batch::RecordBatch,
temporal_conversions,
types::{NativeType, NaturalDataType},
};

use super::infer_schema::RFC3339;

fn deserialize_primitive<T, F>(
rows: &[ByteRecord],
column: usize,
datatype: DataType,
op: F,
) -> Arc<dyn Array>
where
T: NativeType + NaturalDataType + lexical_core::FromLexical,
F: Fn(&[u8]) -> Option<T>,
{
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => {
if bytes.is_empty() {
return None;
}
op(bytes)
}
None => None,
});
Arc::new(PrimitiveArray::<T>::from_trusted_len_iter(iter).to(datatype))
}

fn deserialize_boolean<F>(rows: &[ByteRecord], column: usize, op: F) -> Arc<dyn Array>
where
F: Fn(&[u8]) -> Option<bool>,
{
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => {
if bytes.is_empty() {
return None;
}
op(bytes)
}
None => None,
});
Arc::new(BooleanArray::from_trusted_len_iter(iter))
}

fn deserialize_utf8<O: Offset>(rows: &[ByteRecord], column: usize) -> Arc<dyn Array> {
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => simdutf8::basic::from_utf8(bytes).ok(),
None => None,
});
Arc::new(Utf8Array::<O>::from_trusted_len_iter(iter))
}

fn deserialize_binary<O: Offset>(rows: &[ByteRecord], column: usize) -> Arc<dyn Array> {
let iter = rows.iter().map(|row| row.get(column));
Arc::new(BinaryArray::<O>::from_trusted_len_iter(iter))
}
use super::super::read_utils::{
deserialize_batch as deserialize_batch_gen, deserialize_column as deserialize_column_gen,
ByteRecordGeneric,
};

#[inline]
fn deserialize_datetime<T: chrono::TimeZone>(string: &str, tz: &T) -> Option<i64> {
let mut parsed = chrono::format::Parsed::new();
let fmt = chrono::format::StrftimeItems::new(RFC3339);
if chrono::format::parse(&mut parsed, string, fmt).is_ok() {
parsed
.to_datetime()
.map(|x| x.naive_utc())
.map(|x| tz.from_utc_datetime(&x))
.map(|x| x.timestamp_nanos())
.ok()
} else {
None
impl ByteRecordGeneric for ByteRecord {
fn get(&self, index: usize) -> Option<&[u8]> {
self.get(index)
}
}

Expand All @@ -86,114 +25,9 @@ pub fn deserialize_column(
rows: &[ByteRecord],
column: usize,
datatype: DataType,
_line_number: usize,
line_number: usize,
) -> Result<Arc<dyn Array>> {
use DataType::*;
Ok(match datatype {
Boolean => deserialize_boolean(rows, column, |bytes| {
if bytes.eq_ignore_ascii_case(b"false") {
Some(false)
} else if bytes.eq_ignore_ascii_case(b"true") {
Some(true)
} else {
None
}
}),
Int8 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i8>(bytes).ok()
}),
Int16 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i16>(bytes).ok()
}),
Int32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i32>(bytes).ok()
}),
Int64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<i64>(bytes).ok()
}),
UInt8 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u8>(bytes).ok()
}),
UInt16 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u16>(bytes).ok()
}),
UInt32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u32>(bytes).ok()
}),
UInt64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<u64>(bytes).ok()
}),
Float32 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<f32>(bytes).ok()
}),
Float64 => deserialize_primitive(rows, column, datatype, |bytes| {
lexical_core::parse::<f64>(bytes).ok()
}),
Date32 => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| x.parse::<chrono::NaiveDate>().ok())
.map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE)
}),
Date64 => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_millis())
}),
Timestamp(TimeUnit::Nanosecond, None) => {
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos())
})
}
Timestamp(TimeUnit::Microsecond, None) => {
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos() / 1000)
})
}
Timestamp(time_unit, None) => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos())
.map(|x| match time_unit {
TimeUnit::Second => x / 1_000_000_000,
TimeUnit::Millisecond => x / 1_000_000,
TimeUnit::Microsecond => x / 1_000,
TimeUnit::Nanosecond => x,
})
}),
Timestamp(time_unit, Some(ref tz)) => {
let tz = temporal_conversions::parse_offset(tz)?;
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
.and_then(|x| deserialize_datetime(x, &tz))
.map(|x| match time_unit {
TimeUnit::Second => x / 1_000_000_000,
TimeUnit::Millisecond => x / 1_000_000,
TimeUnit::Microsecond => x / 1_000,
TimeUnit::Nanosecond => x,
})
})
}
Utf8 => deserialize_utf8::<i32>(rows, column),
LargeUtf8 => deserialize_utf8::<i64>(rows, column),
Binary => deserialize_binary::<i32>(rows, column),
LargeBinary => deserialize_binary::<i64>(rows, column),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type \"{:?}\" is not implemented",
other
)))
}
})
deserialize_column_gen(rows, column, datatype, line_number)
}

/// Deserializes rows [`ByteRecord`] into a [`RecordBatch`].
Expand All @@ -209,27 +43,5 @@ pub fn deserialize_batch<F>(
where
F: Fn(&[ByteRecord], usize, DataType, usize) -> Result<Arc<dyn Array>>,
{
let projection: Vec<usize> = match projection {
Some(v) => v.to_vec(),
None => fields.iter().enumerate().map(|(i, _)| i).collect(),
};
let projected_fields: Vec<Field> = projection.iter().map(|i| fields[*i].clone()).collect();

let schema = Arc::new(Schema::new(projected_fields));

if rows.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}

let columns = projection
.iter()
.map(|column| {
let column = *column;
let field = &fields[column];
let data_type = field.data_type();
deserialize_column(rows, column, data_type.clone(), line_number)
})
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(schema, columns)
deserialize_batch_gen(rows, fields, projection, line_number, deserialize_column)
}
Loading