Skip to content

Commit

Permalink
change csv-writer (jorgecarleitao#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and Dexter Duckworth committed Mar 2, 2022
1 parent e76775e commit e174a23
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 65 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ simdutf8 = "0.1.3"

# for csv io
csv = { version = "^1.1", optional = true }
csv-core = { version = "0.1", optional = true }

# for csv async io
csv-async = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
Expand Down
8 changes: 4 additions & 4 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use arrow2::util::bench_util::*;
type ChunkArc = Chunk<Arc<dyn Array>>;

fn write_batch(columns: &ChunkArc) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);
let mut writer = vec![];

assert_eq!(columns.arrays().len(), 1);
write::write_header(writer, &["a"])?;

let options = write::SerializeOptions::default();
write::write_chunk(writer, columns, &options)
write::write_header(&mut writer, &["a"], &options)?;

write::write_chunk(&mut writer, columns, &options)
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Expand Down
8 changes: 4 additions & 4 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use arrow2::{
};

fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, &["c1"])?;
let mut writer = std::fs::File::create(path)?;

let options = write::SerializeOptions::default();
write::write_header(&mut writer, &["c1"], &options)?;

columns
.iter()
.try_for_each(|batch| write::write_chunk(writer, batch, &options))
.try_for_each(|batch| write::write_chunk(&mut writer, batch, &options))
}

fn main() -> Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
Expand All @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, &["c1"])?;
let mut writer = std::fs::File::create(path)?;
write::write_header(&mut writer, &["c1"], &options)?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
Expand All @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = options.clone();
let batch = batches[id].clone(); // note: this is cheap
let child = thread::spawn(move || {
let records = write::serialize(&batch, &options).unwrap();
thread_tx.send(records).unwrap();
let rows = write::serialize(&batch, &options).unwrap();
thread_tx.send(rows).unwrap();
});

children.push(child);
Expand All @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
for _ in 0..2 {
// block: assumes that the order of batches matter.
let records = rx.recv().unwrap();
records
.iter()
.try_for_each(|record| writer.write_byte_record(record))?
records.iter().try_for_each(|row| writer.write_all(&row))?
}

for child in children {
Expand Down
68 changes: 50 additions & 18 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::super::iterator::StreamingIterator;
use std::io::Write;

// re-export necessary public APIs from csv
pub use csv::{ByteRecord, Writer, WriterBuilder};
pub use csv::{ByteRecord, WriterBuilder};

pub use serialize::*;

Expand All @@ -26,55 +26,87 @@ fn new_serializers<'a, A: AsRef<dyn Array>>(
.collect()
}

/// Serializes [`Chunk`] to a vector of `ByteRecord`.
/// Serializes [`Chunk`] to a vector of rows.
/// The vector is guaranteed to have `columns.len()` entries.
/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields.
/// Each `row` is guaranteed to have `columns.array().len()` fields.
pub fn serialize<A: AsRef<dyn Array>>(
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<Vec<ByteRecord>> {
) -> Result<Vec<Vec<u8>>> {
let mut serializers = new_serializers(columns, options)?;

let rows = columns.len();
let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows];
records.iter_mut().for_each(|record| {
let mut rows = Vec::with_capacity(columns.len());
let mut row = vec![];

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..columns.len()).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
if !row.is_empty() {
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
rows.push(row.clone());
row.clear();
}
Result::Ok(())
})?;

Ok(rows)
}

/// Writes [`Chunk`] to `writer` according to the serialization options `options`.
pub fn write_chunk<W: Write, A: AsRef<dyn Array>>(
writer: &mut Writer<W>,
writer: &mut W,
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<()> {
let mut serializers = new_serializers(columns.arrays(), options)?;

let rows = columns.len();
let mut record = ByteRecord::with_capacity(0, columns.arrays().len());
let mut row = Vec::with_capacity(columns.arrays().len() * 10);

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..rows).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
writer.write_all(&row)?;
row.clear();
Result::Ok(())
})?;
Ok(())
}

/// Writes a CSV header to `writer`
pub fn write_header<W: Write, T>(writer: &mut Writer<W>, names: &[T]) -> Result<()>
pub fn write_header<W: Write, T>(
writer: &mut W,
names: &[T],
options: &SerializeOptions,
) -> Result<()>
where
T: AsRef<str>,
{
writer.write_record(names.iter().map(|x| x.as_ref().as_bytes()))?;
let names = names.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
writer.write_all(
names
.join(std::str::from_utf8(&[options.delimiter]).unwrap())
.as_bytes(),
)?;
writer.write_all(&[b'\n'])?;
Ok(())
}
75 changes: 56 additions & 19 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use crate::{

use super::super::super::iterator::{BufStreamingIterator, StreamingIterator};
use crate::array::{DictionaryArray, DictionaryKey, Offset};
use csv_core::WriteResult;
use std::any::Any;

/// Options to serialize logical types to CSV
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SerializeOptions {
/// used for [`DataType::Date32`]
pub date32_format: Option<String>,
Expand All @@ -28,6 +29,24 @@ pub struct SerializeOptions {
pub time64_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub timestamp_format: Option<String>,
/// used as separator/delimiter
pub delimiter: u8,
/// quoting character
pub quote: u8,
}

impl Default for SerializeOptions {
fn default() -> Self {
SerializeOptions {
date32_format: None,
date64_format: None,
time32_format: None,
time64_format: None,
timestamp_format: None,
delimiter: b',',
quote: b'"',
}
}
}

fn primitive_write<'a, T: NativeType + ToLexical>(
Expand Down Expand Up @@ -187,6 +206,40 @@ fn timestamp_with_tz<'a>(
}
}

fn new_utf8_serializer<'a, O: Offset>(
array: &'a Utf8Array<O>,
options: &'a SerializeOptions,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a> {
let mut local_buf = vec![0u8; 64];
let mut ser_writer = csv_core::WriterBuilder::new().quote(options.quote).build();

Box::new(BufStreamingIterator::new(
array.iter(),
move |x, buf| {
match x {
// Empty strings are quoted.
// This will ensure a csv parser will not read them as missing
// in a delimited field
Some("") => buf.extend_from_slice(b"\"\""),
Some(s) => loop {
match ser_writer.field(s.as_bytes(), &mut local_buf) {
(WriteResult::OutputFull, _, _) => {
let additional = local_buf.len();
local_buf.extend(std::iter::repeat(0u8).take(additional))
}
(WriteResult::InputEmpty, _, n_out) => {
buf.extend_from_slice(&local_buf[..n_out]);
break;
}
}
},
_ => {}
}
},
vec![],
))
}

/// Returns a [`StreamingIterator`] that yields `&[u8]` serialized from `array` according to `options`.
/// For numeric types, this serializes as usual. For dates, times and timestamps, it uses `options` to
/// Supported types:
Expand Down Expand Up @@ -337,27 +390,11 @@ pub fn new_serializer<'a>(
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
new_utf8_serializer(array, options)
}
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
new_utf8_serializer(array, options)
}
DataType::Binary => {
let array = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Expand Down
Loading

0 comments on commit e174a23

Please sign in to comment.