From e174a23fe32d1c2227787cc4a3d29f9ac4c64192 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 26 Feb 2022 11:14:05 +0100 Subject: [PATCH] change csv-writer (#866) --- Cargo.toml | 3 +- benches/write_csv.rs | 8 ++-- examples/csv_write.rs | 8 ++-- examples/csv_write_parallel.rs | 13 +++--- src/io/csv/write/mod.rs | 68 ++++++++++++++++++++++-------- src/io/csv/write/serialize.rs | 75 +++++++++++++++++++++++++--------- tests/it/io/csv/write.rs | 43 +++++++++++++------ 7 files changed, 153 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c51dd6f4479..17ae94193c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ simdutf8 = "0.1.3" # for csv io csv = { version = "^1.1", optional = true } +csv-core = { version = "0.1", optional = true } # for csv async io csv-async = { version = "^1.1", optional = true } @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_async = ["io_csv_read_async"] io_csv_read = ["csv", "lexical-core"] io_csv_read_async = ["csv-async", "lexical-core", "futures"] -io_csv_write = ["csv", "streaming-iterator", "lexical-core"] +io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"] io_ipc = ["arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] diff --git a/benches/write_csv.rs b/benches/write_csv.rs index 0d341cea9d7..faba0701c65 100644 --- a/benches/write_csv.rs +++ b/benches/write_csv.rs @@ -11,13 +11,13 @@ use arrow2::util::bench_util::*; type ChunkArc = Chunk>; fn write_batch(columns: &ChunkArc) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_writer(vec![]); + let mut writer = vec![]; assert_eq!(columns.arrays().len(), 1); - write::write_header(writer, &["a"])?; - let options = write::SerializeOptions::default(); - write::write_chunk(writer, columns, &options) + write::write_header(&mut writer, &["a"], &options)?; + + write::write_chunk(&mut writer, columns, &options) } fn make_chunk(array: impl Array + 'static) -> Chunk> { diff --git a/examples/csv_write.rs b/examples/csv_write.rs index f9f73cfef53..6a40fb7b515 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -6,14 +6,14 @@ use arrow2::{ }; fn write_batch>(path: &str, columns: &[Chunk]) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_path(path)?; - - write::write_header(writer, &["c1"])?; + let mut writer = std::fs::File::create(path)?; let options = write::SerializeOptions::default(); + write::write_header(&mut writer, &["c1"], &options)?; + columns .iter() - .try_for_each(|batch| write::write_chunk(writer, batch, &options)) + .try_for_each(|batch| write::write_chunk(&mut writer, batch, &options)) } fn main() -> Result<()> { diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs index f616a1b8292..5611d39b1b3 100644 --- a/examples/csv_write_parallel.rs +++ b/examples/csv_write_parallel.rs @@ -1,3 +1,4 @@ +use std::io::Write; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = write::SerializeOptions::default(); // write a header - let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, &["c1"])?; + let mut writer = std::fs::File::create(path)?; + write::write_header(&mut writer, &["c1"], &options)?; // prepare a channel to send serialized records from threads let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = options.clone(); let batch = batches[id].clone(); // note: this is cheap let child = thread::spawn(move || { - let records = write::serialize(&batch, &options).unwrap(); - thread_tx.send(records).unwrap(); + let rows = write::serialize(&batch, &options).unwrap(); + thread_tx.send(rows).unwrap(); }); children.push(child); @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> for _ in 0..2 { // block: assumes that the order of batches matter. let records = rx.recv().unwrap(); - records - .iter() - .try_for_each(|record| writer.write_byte_record(record))? + records.iter().try_for_each(|row| writer.write_all(&row))? } for child in children { diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 37d926f482e..275705c744e 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -6,7 +6,7 @@ use super::super::iterator::StreamingIterator; use std::io::Write; // re-export necessary public APIs from csv -pub use csv::{ByteRecord, Writer, WriterBuilder}; +pub use csv::{ByteRecord, WriterBuilder}; pub use serialize::*; @@ -26,55 +26,87 @@ fn new_serializers<'a, A: AsRef>( .collect() } -/// Serializes [`Chunk`] to a vector of `ByteRecord`. +/// Serializes [`Chunk`] to a vector of rows. /// The vector is guaranteed to have `columns.len()` entries. -/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields. +/// Each `row` is guaranteed to have `columns.array().len()` fields. pub fn serialize>( columns: &Chunk, options: &SerializeOptions, -) -> Result> { +) -> Result>> { let mut serializers = new_serializers(columns, options)?; - let rows = columns.len(); - let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows]; - records.iter_mut().for_each(|record| { + let mut rows = Vec::with_capacity(columns.len()); + let mut row = vec![]; + + // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns + (0..columns.len()).try_for_each(|_| { serializers .iter_mut() - // `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len` - .for_each(|iter| record.push_field(iter.next().unwrap())); - }); - Ok(records) + // `unwrap` is infalible because `array.len()` equals `Chunk::len` + .for_each(|iter| { + let field = iter.next().unwrap(); + row.extend_from_slice(field); + row.push(options.delimiter); + }); + if !row.is_empty() { + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + rows.push(row.clone()); + row.clear(); + } + Result::Ok(()) + })?; + + Ok(rows) } /// Writes [`Chunk`] to `writer` according to the serialization options `options`. pub fn write_chunk>( - writer: &mut Writer, + writer: &mut W, columns: &Chunk, options: &SerializeOptions, ) -> Result<()> { let mut serializers = new_serializers(columns.arrays(), options)?; let rows = columns.len(); - let mut record = ByteRecord::with_capacity(0, columns.arrays().len()); + let mut row = Vec::with_capacity(columns.arrays().len() * 10); // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns (0..rows).try_for_each(|_| { serializers .iter_mut() // `unwrap` is infalible because `array.len()` equals `Chunk::len` - .for_each(|iter| record.push_field(iter.next().unwrap())); - writer.write_byte_record(&record)?; - record.clear(); + .for_each(|iter| { + let field = iter.next().unwrap(); + row.extend_from_slice(field); + row.push(options.delimiter); + }); + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + writer.write_all(&row)?; + row.clear(); Result::Ok(()) })?; Ok(()) } /// Writes a CSV header to `writer` -pub fn write_header(writer: &mut Writer, names: &[T]) -> Result<()> +pub fn write_header( + writer: &mut W, + names: &[T], + options: &SerializeOptions, +) -> Result<()> where T: AsRef, { - writer.write_record(names.iter().map(|x| x.as_ref().as_bytes()))?; + let names = names.iter().map(|x| x.as_ref()).collect::>(); + writer.write_all( + names + .join(std::str::from_utf8(&[options.delimiter]).unwrap()) + .as_bytes(), + )?; + writer.write_all(&[b'\n'])?; Ok(()) } diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 660bde3da1b..22f26e5a27a 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -12,11 +12,12 @@ use crate::{ use super::super::super::iterator::{BufStreamingIterator, StreamingIterator}; use crate::array::{DictionaryArray, DictionaryKey, Offset}; +use csv_core::WriteResult; use std::any::Any; /// Options to serialize logical types to CSV /// The default is to format times and dates as `chrono` crate formats them. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct SerializeOptions { /// used for [`DataType::Date32`] pub date32_format: Option, @@ -28,6 +29,24 @@ pub struct SerializeOptions { pub time64_format: Option, /// used for [`DataType::Timestamp`] pub timestamp_format: Option, + /// used as separator/delimiter + pub delimiter: u8, + /// quoting character + pub quote: u8, +} + +impl Default for SerializeOptions { + fn default() -> Self { + SerializeOptions { + date32_format: None, + date64_format: None, + time32_format: None, + time64_format: None, + timestamp_format: None, + delimiter: b',', + quote: b'"', + } + } } fn primitive_write<'a, T: NativeType + ToLexical>( @@ -187,6 +206,40 @@ fn timestamp_with_tz<'a>( } } +fn new_utf8_serializer<'a, O: Offset>( + array: &'a Utf8Array, + options: &'a SerializeOptions, +) -> Box + 'a> { + let mut local_buf = vec![0u8; 64]; + let mut ser_writer = csv_core::WriterBuilder::new().quote(options.quote).build(); + + Box::new(BufStreamingIterator::new( + array.iter(), + move |x, buf| { + match x { + // Empty strings are quoted. + // This will ensure a csv parser will not read them as missing + // in a delimited field + Some("") => buf.extend_from_slice(b"\"\""), + Some(s) => loop { + match ser_writer.field(s.as_bytes(), &mut local_buf) { + (WriteResult::OutputFull, _, _) => { + let additional = local_buf.len(); + local_buf.extend(std::iter::repeat(0u8).take(additional)) + } + (WriteResult::InputEmpty, _, n_out) => { + buf.extend_from_slice(&local_buf[..n_out]); + break; + } + } + }, + _ => {} + } + }, + vec![], + )) +} + /// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`. /// For numeric types, this serializes as usual. For dates, times and timestamps, it uses `options` to /// Supported types: @@ -337,27 +390,11 @@ pub fn new_serializer<'a>( } DataType::Utf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - array.iter(), - |x, buf| { - if let Some(x) = x { - buf.extend_from_slice(x.as_bytes()); - } - }, - vec![], - )) + new_utf8_serializer(array, options) } DataType::LargeUtf8 => { let array = array.as_any().downcast_ref::>().unwrap(); - Box::new(BufStreamingIterator::new( - array.iter(), - |x, buf| { - if let Some(x) = x { - buf.extend_from_slice(x.as_bytes()); - } - }, - vec![], - )) + new_utf8_serializer(array, options) } DataType::Binary => { let array = array.as_any().downcast_ref::>().unwrap(); diff --git a/tests/it/io/csv/write.rs b/tests/it/io/csv/write.rs index 656a04f7839..6d8e27b9660 100644 --- a/tests/it/io/csv/write.rs +++ b/tests/it/io/csv/write.rs @@ -34,15 +34,18 @@ fn data() -> Chunk> { fn write_csv() -> Result<()> { let columns = data(); - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().from_writer(write); - - write_header(&mut writer, &["c1", "c2", "c3", "c4", "c5", "c6", "c7"])?; + let mut writer = Cursor::new(Vec::::new()); let options = SerializeOptions::default(); + + write_header( + &mut writer, + &["c1", "c2", "c3", "c4", "c5", "c6", "c7"], + &options, + )?; write_chunk(&mut writer, &columns, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); assert_eq!( r#"c1,c2,c3,c4,c5,c6,c7 a b,123.564532,3,true,,00:20:34,d @@ -59,18 +62,18 @@ d,-556132.25,1,,2019-04-18 02:45:55.555,23:46:03,c fn write_csv_custom_options() -> Result<()> { let batch = data(); - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); + let mut writer = Cursor::new(Vec::::new()); let options = SerializeOptions { time32_format: Some("%r".to_string()), time64_format: Some("%r".to_string()), + delimiter: b'|', ..Default::default() }; write_chunk(&mut writer, &batch, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); assert_eq!( r#"a b|123.564532|3|true||12:20:34 AM|d c||2|false|2019-04-18 10:54:47.378|06:51:20 AM|a b @@ -230,14 +233,13 @@ fn test_array( data: Vec<&'static str>, options: SerializeOptions, ) -> Result<()> { - let write = Cursor::new(Vec::::new()); - let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write); + let mut writer = Cursor::new(Vec::::new()); - write_header(&mut writer, &["c1"])?; + write_header(&mut writer, &["c1"], &options)?; write_chunk(&mut writer, &columns, &options)?; // check - let buffer = writer.into_inner().unwrap().into_inner(); + let buffer = writer.into_inner(); let mut expected = "c1\n".to_owned(); expected.push_str(&data.join("\n")); @@ -314,3 +316,20 @@ fn write_tz_timezone_formatted_tz() -> Result<()> { }, ) } + +#[test] +fn write_empty_and_missing() { + let a = Utf8Array::::from(&[Some(""), None]); + let b = Utf8Array::::from(&[None, Some("")]); + let columns = Chunk::new(vec![ + Arc::new(a) as Arc, + Arc::new(b) as Arc, + ]); + + let mut writer = vec![]; + let options = SerializeOptions::default(); + write_chunk(&mut writer, &columns, &options).unwrap(); + let csv = std::str::from_utf8(&writer).unwrap(); + + assert_eq!(csv, "\"\",\n,\"\"\n"); +}