Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Optimized writing of CSV.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 5, 2021
1 parent ded5266 commit 964ca91
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 46 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hash_hasher = "^2.0.3"
csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -86,7 +87,7 @@ full = [
"compute",
]
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["flatbuffers"]
io_ipc_compression = ["lz4", "zstd"]
Expand Down
65 changes: 65 additions & 0 deletions src/io/csv/write/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
pub use streaming_iterator::StreamingIterator;

/// A [`StreamingIterator`] with an internal buffer of [`Vec<u8>`] used to efficiently
/// present items of type `T` as `&[u8]`.
/// It is generic over the type `T` and the transformation `F: T -> &[u8]`.
pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
iterator: I,
f: F,
buffer: Vec<u8>,
is_valid: bool,
}

impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Self {
iterator,
f,
buffer,
is_valid: false,
}
}
}

impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
type Item = [u8];

#[inline]
fn advance(&mut self) {
let a = self.iterator.next();
if let Some(a) = a {
self.is_valid = true;
self.buffer.clear();
(self.f)(a, &mut self.buffer);
} else {
self.is_valid = false;
}
}

#[inline]
fn get(&self) -> Option<&Self::Item> {
if self.is_valid {
Some(&self.buffer)
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.iterator.size_hint()
}
}
9 changes: 6 additions & 3 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod iterator;
mod serialize;

use iterator::StreamingIterator;

use std::io::Write;

// re-export necessary public APIs from csv
Expand All @@ -15,7 +18,7 @@ use crate::{datatypes::Schema, error::Result};
fn new_serializers<'a>(
batch: &'a RecordBatch,
options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn Iterator<Item = Vec<u8>> + 'a>>> {
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
batch
.columns()
.iter()
Expand All @@ -34,7 +37,7 @@ pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result<Vec<
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
}
Expand All @@ -54,7 +57,7 @@ pub fn write_batch<W: Write>(
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
Result::Ok(())
Expand Down
123 changes: 82 additions & 41 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use lexical_core::ToLexical;

use crate::temporal_conversions;
use crate::util::lexical_to_bytes;
use crate::types::NativeType;
use crate::util::lexical_to_bytes_mut;
use crate::{
array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array},
datatypes::{DataType, TimeUnit},
error::Result,
};

use super::iterator::{BufStreamingIterator, StreamingIterator};

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SerializeOptions {
pub date_format: String,
Expand All @@ -23,16 +28,24 @@ impl Default for SerializeOptions {
}
}

fn primitive_write<'a, T: NativeType + ToLexical>(
array: &'a PrimitiveArray<T>,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a> {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
lexical_to_bytes_mut(*x, buf)
}
},
vec![],
))
}

macro_rules! dyn_primitive {
($ty:ident, $array:expr) => {{
let array = $array
.as_any()
.downcast_ref::<PrimitiveArray<$ty>>()
.unwrap();
let iter = array
.iter()
.map(move |x| x.map(|x| lexical_to_bytes(*x)).unwrap_or(vec![]));
Box::new(iter)
($ty:ty, $array:expr) => {{
let array = $array.as_any().downcast_ref().unwrap();
primitive_write::<$ty>(array)
}};
}

Expand All @@ -42,11 +55,15 @@ macro_rules! dyn_date {
.as_any()
.downcast_ref::<PrimitiveArray<$ty>>()
.unwrap();
let iter = array.iter().map(move |x| {
x.map(|x| ($fn)(*x).format($format).to_string().into_bytes())
.unwrap_or_default()
});
Box::new(iter)
Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
if let Some(x) = x {
buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes())
}
},
vec![],
))
}};
}

Expand All @@ -62,15 +79,23 @@ macro_rules! dyn_date {
pub fn new_serializer<'a>(
array: &'a dyn Array,
options: &'a SerializeOptions,
) -> Result<Box<dyn Iterator<Item = Vec<u8>> + 'a>> {
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
Ok(match array.data_type() {
DataType::Boolean => {
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
Box::new(
array
.iter()
.map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()),
)
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
if x {
buf.extend_from_slice(b"true");
} else {
buf.extend_from_slice(b"false");
}
}
},
vec![],
))
}
DataType::UInt8 => {
dyn_primitive!(u8, array)
Expand Down Expand Up @@ -184,35 +209,51 @@ pub fn new_serializer<'a>(
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(
array
.iter()
.map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()),
)
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
}
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
Box::new(
array
.iter()
.map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()),
)
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
}
DataType::Binary => {
let array = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Box::new(
array
.iter()
.map(|x| x.map(|x| x.to_vec()).unwrap_or_default()),
)
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x);
}
},
vec![],
))
}
DataType::LargeBinary => {
let array = array.as_any().downcast_ref::<BinaryArray<i64>>().unwrap();
Box::new(
array
.iter()
.map(|x| x.map(|x| x.to_vec()).unwrap_or_default()),
)
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x);
}
},
vec![],
))
}
_ => todo!(),
})
Expand Down
10 changes: 9 additions & 1 deletion src/util/lexical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
#[inline]
pub fn lexical_to_bytes<N: lexical_core::ToLexical>(n: N) -> Vec<u8> {
let mut buf = Vec::<u8>::with_capacity(N::FORMATTED_SIZE_DECIMAL);
lexical_to_bytes_mut(n, &mut buf);
buf
}

/// Converts numeric type to a `String`
#[inline]
pub fn lexical_to_bytes_mut<N: lexical_core::ToLexical>(n: N, buf: &mut Vec<u8>) {
buf.clear();
buf.reserve(N::FORMATTED_SIZE_DECIMAL);
unsafe {
// JUSTIFICATION
// Benefit
Expand All @@ -13,7 +22,6 @@ pub fn lexical_to_bytes<N: lexical_core::ToLexical>(n: N) -> Vec<u8> {
let len = lexical_core::write(n, slice).len();
buf.set_len(len);
}
buf
}

/// Converts numeric type to a `String`
Expand Down

0 comments on commit 964ca91

Please sign in to comment.