From 964ca9194297f8eeaf76534df82bc48eff24c28f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 5 Sep 2021 07:46:20 +0000 Subject: [PATCH] Optimized writing of CSV. --- Cargo.toml | 3 +- src/io/csv/write/iterator.rs | 65 ++++++++++++++++++ src/io/csv/write/mod.rs | 9 ++- src/io/csv/write/serialize.rs | 123 ++++++++++++++++++++++------------ src/util/lexical.rs | 10 ++- 5 files changed, 164 insertions(+), 46 deletions(-) create mode 100644 src/io/csv/write/iterator.rs diff --git a/Cargo.toml b/Cargo.toml index 06e97b1bf77..0ed0e091a2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ hash_hasher = "^2.0.3" csv = { version = "^1.1", optional = true } regex = { version = "^1.3", optional = true } lazy_static = { version = "^1.4", optional = true } +streaming-iterator = { version = "0.1", optional = true } serde = { version = "^1.0", features = ["rc"], optional = true } serde_derive = { version = "^1.0", optional = true } @@ -86,7 +87,7 @@ full = [ "compute", ] merge_sort = ["itertools"] -io_csv = ["csv", "lazy_static", "regex", "lexical-core"] +io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"] io_json = ["serde", "serde_json", "indexmap"] io_ipc = ["flatbuffers"] io_ipc_compression = ["lz4", "zstd"] diff --git a/src/io/csv/write/iterator.rs b/src/io/csv/write/iterator.rs new file mode 100644 index 00000000000..65e16b4d173 --- /dev/null +++ b/src/io/csv/write/iterator.rs @@ -0,0 +1,65 @@ +pub use streaming_iterator::StreamingIterator; + +/// A [`StreamingIterator`] with an internal buffer of [`Vec`] used to efficiently +/// present items of type `T` as `&[u8]`. +/// It is generic over the type `T` and the transformation `F: T -> &[u8]`. +pub struct BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + iterator: I, + f: F, + buffer: Vec, + is_valid: bool, +} + +impl BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + #[inline] + pub fn new(iterator: I, f: F, buffer: Vec) -> Self { + Self { + iterator, + f, + buffer, + is_valid: false, + } + } +} + +impl StreamingIterator for BufStreamingIterator +where + I: Iterator, + F: Fn(T, &mut Vec), +{ + type Item = [u8]; + + #[inline] + fn advance(&mut self) { + let a = self.iterator.next(); + if let Some(a) = a { + self.is_valid = true; + self.buffer.clear(); + (self.f)(a, &mut self.buffer); + } else { + self.is_valid = false; + } + } + + #[inline] + fn get(&self) -> Option<&Self::Item> { + if self.is_valid { + Some(&self.buffer) + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iterator.size_hint() + } +} diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 61c37424a31..4bde82c24ea 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -1,5 +1,8 @@ +mod iterator; mod serialize; +use iterator::StreamingIterator; + use std::io::Write; // re-export necessary public APIs from csv @@ -15,7 +18,7 @@ use crate::{datatypes::Schema, error::Result}; fn new_serializers<'a>( batch: &'a RecordBatch, options: &'a SerializeOptions, -) -> Result> + 'a>>> { +) -> Result + 'a>>> { batch .columns() .iter() @@ -34,7 +37,7 @@ pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result( serializers .iter_mut() // `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch` - .for_each(|iter| record.push_field(&iter.next().unwrap())); + .for_each(|iter| record.push_field(iter.next().unwrap())); writer.write_byte_record(&record)?; record.clear(); Result::Ok(()) diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 44a184eecb0..60eb7863cf8 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -1,11 +1,16 @@ +use lexical_core::ToLexical; + use crate::temporal_conversions; -use crate::util::lexical_to_bytes; +use crate::types::NativeType; +use crate::util::lexical_to_bytes_mut; use crate::{ array::{Array, BinaryArray, BooleanArray, PrimitiveArray, Utf8Array}, datatypes::{DataType, TimeUnit}, error::Result, }; +use super::iterator::{BufStreamingIterator, StreamingIterator}; + #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct SerializeOptions { pub date_format: String, @@ -23,16 +28,24 @@ impl Default for SerializeOptions { } } +fn primitive_write<'a, T: NativeType + ToLexical>( + array: &'a PrimitiveArray, +) -> Box + 'a> { + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + lexical_to_bytes_mut(*x, buf) + } + }, + vec![], + )) +} + macro_rules! dyn_primitive { - ($ty:ident, $array:expr) => {{ - let array = $array - .as_any() - .downcast_ref::>() - .unwrap(); - let iter = array - .iter() - .map(move |x| x.map(|x| lexical_to_bytes(*x)).unwrap_or(vec![])); - Box::new(iter) + ($ty:ty, $array:expr) => {{ + let array = $array.as_any().downcast_ref().unwrap(); + primitive_write::<$ty>(array) }}; } @@ -42,11 +55,15 @@ macro_rules! dyn_date { .as_any() .downcast_ref::>() .unwrap(); - let iter = array.iter().map(move |x| { - x.map(|x| ($fn)(*x).format($format).to_string().into_bytes()) - .unwrap_or_default() - }); - Box::new(iter) + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(($fn)(*x).format($format).to_string().as_bytes()) + } + }, + vec![], + )) }}; } @@ -62,15 +79,23 @@ macro_rules! dyn_date { pub fn new_serializer<'a>( array: &'a dyn Array, options: &'a SerializeOptions, -) -> Result> + 'a>> { +) -> Result + 'a>> { Ok(match array.data_type() { DataType::Boolean => { let array = array.as_any().downcast_ref::().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + if x { + buf.extend_from_slice(b"true"); + } else { + buf.extend_from_slice(b"false"); + } + } + }, + vec![], + )) } DataType::UInt8 => { dyn_primitive!(u8, array) @@ -184,35 +209,51 @@ pub fn new_serializer<'a>( } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) } DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_string().into_bytes()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) } DataType::Binary => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_vec()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) } DataType::LargeBinary => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new( - array - .iter() - .map(|x| x.map(|x| x.to_vec()).unwrap_or_default()), - ) + Box::new(BufStreamingIterator::new( + array.iter(), + |x, buf| { + if let Some(x) = x { + buf.extend_from_slice(x); + } + }, + vec![], + )) } _ => todo!(), }) diff --git a/src/util/lexical.rs b/src/util/lexical.rs index bbeeed6c15a..970c972e0af 100644 --- a/src/util/lexical.rs +++ b/src/util/lexical.rs @@ -2,6 +2,15 @@ #[inline] pub fn lexical_to_bytes(n: N) -> Vec { let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); + lexical_to_bytes_mut(n, &mut buf); + buf +} + +/// Converts numeric type to a `String` +#[inline] +pub fn lexical_to_bytes_mut(n: N, buf: &mut Vec) { + buf.clear(); + buf.reserve(N::FORMATTED_SIZE_DECIMAL); unsafe { // JUSTIFICATION // Benefit @@ -13,7 +22,6 @@ pub fn lexical_to_bytes(n: N) -> Vec { let len = lexical_core::write(n, slice).len(); buf.set_len(len); } - buf } /// Converts numeric type to a `String`