Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved performance of CSV reading.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 3, 2021
1 parent 4320687 commit 54e49dc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/io/csv/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::super::read_utils::{
};

impl ByteRecordGeneric for ByteRecord {
#[inline]
fn get(&self, index: usize) -> Option<&[u8]> {
self.get(index)
}
Expand Down
1 change: 1 addition & 0 deletions src/io/csv/read_async/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use super::super::read_utils::{
};

impl ByteRecordGeneric for ByteRecord {
#[inline]
fn get(&self, index: usize) -> Option<&[u8]> {
self.get(index)
}
Expand Down
30 changes: 17 additions & 13 deletions src/io/csv/read_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ use crate::{

use super::utils::RFC3339;

#[inline]
fn to_utf8(bytes: &[u8]) -> Option<&str> {
simdutf8::basic::from_utf8(bytes).ok()
}

#[inline]
fn deserialize_primitive<T, B: ByteRecordGeneric, F>(
rows: &[B],
column: usize,
Expand Down Expand Up @@ -86,6 +92,7 @@ fn deserialize_decimal(bytes: &[u8], precision: usize, scale: usize) -> Option<i
}
}

#[inline]
fn deserialize_boolean<B, F>(rows: &[B], column: usize, op: F) -> Arc<dyn Array>
where
B: ByteRecordGeneric,
Expand All @@ -103,14 +110,16 @@ where
Arc::new(BooleanArray::from_trusted_len_iter(iter))
}

#[inline]
fn deserialize_utf8<O: Offset, B: ByteRecordGeneric>(rows: &[B], column: usize) -> Arc<dyn Array> {
let iter = rows.iter().map(|row| match row.get(column) {
Some(bytes) => simdutf8::basic::from_utf8(bytes).ok(),
Some(bytes) => to_utf8(bytes),
None => None,
});
Arc::new(Utf8Array::<O>::from_trusted_len_iter(iter))
}

#[inline]
fn deserialize_binary<O: Offset, B: ByteRecordGeneric>(
rows: &[B],
column: usize,
Expand All @@ -136,6 +145,7 @@ fn deserialize_datetime<T: chrono::TimeZone>(string: &str, tz: &T) -> Option<i64
}

/// Deserializes `column` of `rows` into an [`Array`] of [`DataType`] `datatype`.
#[inline]
pub(crate) fn deserialize_column<B: ByteRecordGeneric>(
rows: &[B],
column: usize,
Expand Down Expand Up @@ -184,36 +194,31 @@ pub(crate) fn deserialize_column<B: ByteRecordGeneric>(
lexical_core::parse::<f64>(bytes).ok()
}),
Date32 => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDate>().ok())
.map(|x| x.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE)
}),
Date64 => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_millis())
}),
Timestamp(TimeUnit::Nanosecond, None) => {
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos())
})
}
Timestamp(TimeUnit::Microsecond, None) => {
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos() / 1000)
})
}
Timestamp(time_unit, None) => deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| x.parse::<chrono::NaiveDateTime>().ok())
.map(|x| x.timestamp_nanos())
.map(|x| match time_unit {
Expand All @@ -226,8 +231,7 @@ pub(crate) fn deserialize_column<B: ByteRecordGeneric>(
Timestamp(time_unit, Some(ref tz)) => {
let tz = temporal_conversions::parse_offset(tz)?;
deserialize_primitive(rows, column, datatype, |bytes| {
simdutf8::basic::from_utf8(bytes)
.ok()
to_utf8(bytes)
.and_then(|x| deserialize_datetime(x, &tz))
.map(|x| match time_unit {
TimeUnit::Second => x / 1_000_000_000,
Expand Down

0 comments on commit 54e49dc

Please sign in to comment.