From 2805aeab552c071f976ffdcc420072e4e720bd63 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Fri, 25 Feb 2022 11:31:57 +0100 Subject: [PATCH] update examples --- examples/csv_write.rs | 4 ++-- examples/csv_write_parallel.rs | 13 ++++++------- src/io/csv/write/mod.rs | 33 +++++++++++++++++++++++---------- src/io/csv/write/serialize.rs | 2 +- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/examples/csv_write.rs b/examples/csv_write.rs index 70bc4f61835..732840383f4 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -6,10 +6,10 @@ use arrow2::{ }; fn write_batch>(path: &str, columns: &[Chunk]) -> Result<()> { - let writer = &mut write::WriterBuilder::new().from_path(path)?; + let mut writer = std::fs::File::create(path)?; let options = write::SerializeOptions::default(); - write::write_header(writer, &["c1"], &options)?; + write::write_header(&mut writer, &["c1"], &options)?; columns .iter() diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs index f616a1b8292..5611d39b1b3 100644 --- a/examples/csv_write_parallel.rs +++ b/examples/csv_write_parallel.rs @@ -1,3 +1,4 @@ +use std::io::Write; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = write::SerializeOptions::default(); // write a header - let writer = &mut write::WriterBuilder::new().from_path(path)?; - write::write_header(writer, &["c1"])?; + let mut writer = std::fs::File::create(path)?; + write::write_header(&mut writer, &["c1"], &options)?; // prepare a channel to send serialized records from threads let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> let options = options.clone(); let batch = batches[id].clone(); // note: this is cheap let child = thread::spawn(move || { - let records = write::serialize(&batch, &options).unwrap(); - thread_tx.send(records).unwrap(); + let rows = write::serialize(&batch, &options).unwrap(); + thread_tx.send(rows).unwrap(); }); children.push(child); @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> for _ in 0..2 { // block: assumes that the order of batches matter. let records = rx.recv().unwrap(); - records - .iter() - .try_for_each(|record| writer.write_byte_record(record))? + records.iter().try_for_each(|row| writer.write_all(&row))? } for child in children { diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index 72369db040c..2a1fb313ae2 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -26,24 +26,37 @@ fn new_serializers<'a, A: AsRef>( .collect() } -/// Serializes [`Chunk`] to a vector of `ByteRecord`. +/// Serializes [`Chunk`] to a vector of rows. /// The vector is guaranteed to have `columns.len()` entries. -/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields. +/// Each `row` is guaranteed to have `columns.array().len()` fields. pub fn serialize>( columns: &Chunk, options: &SerializeOptions, -) -> Result> { +) -> Result>> { let mut serializers = new_serializers(columns, options)?; - let rows = columns.len(); - let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows]; - records.iter_mut().for_each(|record| { + let mut rows = Vec::with_capacity(columns.len()); + let mut row = vec![]; + + // this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns + (0..columns.len()).try_for_each(|_| { serializers .iter_mut() - // `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len` - .for_each(|iter| record.push_field(iter.next().unwrap())); - }); - Ok(records) + // `unwrap` is infalible because `array.len()` equals `Chunk::len` + .for_each(|iter| { + let field = iter.next().unwrap(); + row.extend_from_slice(field); + row.push(options.delimiter); + }); + // replace last delimiter with new line + let last_byte = row.len() - 1; + row[last_byte] = b'\n'; + rows.push(row.clone()); + row.clear(); + Result::Ok(()) + })?; + + Ok(rows) } /// Writes [`Chunk`] to `writer` according to the serialization options `options`. diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 9ac2581a20c..f1857f91025 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -31,7 +31,7 @@ pub struct SerializeOptions { pub timestamp_format: Option, /// used as separator/delimiter pub delimiter: u8, - /// used as separator/delimiter + /// quoting character pub quote: u8, }