Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
change csv-writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 25, 2022
1 parent 10e6cd5 commit 35134f7
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 33 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ simdutf8 = "0.1.3"

# for csv io
csv = { version = "^1.1", optional = true }
csv-core = { version = "0.1", optional = true }

# for csv async io
csv-async = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -129,7 +130,7 @@ io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
Expand Down
4 changes: 2 additions & 2 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ fn write_batch(columns: &ChunkArc) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);

assert_eq!(columns.arrays().len(), 1);
write::write_header(writer, &["a"])?;

let options = write::SerializeOptions::default();
write::write_header(writer, &["a"], &options)?;

write::write_chunk(writer, columns, &options)
}

Expand Down
4 changes: 2 additions & 2 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use arrow2::{
fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, &["c1"])?;

let options = write::SerializeOptions::default();
write::write_header(writer, &["c1"], &options)?;

columns
.iter()
.try_for_each(|batch| write::write_chunk(writer, batch, &options))
Expand Down
33 changes: 25 additions & 8 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::super::iterator::StreamingIterator;
use std::io::Write;

// re-export necessary public APIs from csv
pub use csv::{ByteRecord, Writer, WriterBuilder};
pub use csv::{ByteRecord, WriterBuilder};

pub use serialize::*;

Expand Down Expand Up @@ -48,33 +48,50 @@ pub fn serialize<A: AsRef<dyn Array>>(

/// Writes [`Chunk`] to `writer` according to the serialization options `options`.
pub fn write_chunk<W: Write, A: AsRef<dyn Array>>(
writer: &mut Writer<W>,
writer: &mut W,
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<()> {
let mut serializers = new_serializers(columns.arrays(), options)?;

let rows = columns.len();
let mut record = ByteRecord::with_capacity(0, columns.arrays().len());
let mut row = Vec::with_capacity(columns.arrays().len() * 10);

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..rows).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
writer.write_all(&row)?;
row.clear();
Result::Ok(())
})?;
Ok(())
}

/// Writes a CSV header to `writer`
pub fn write_header<W: Write, T>(writer: &mut Writer<W>, names: &[T]) -> Result<()>
pub fn write_header<W: Write, T>(
writer: &mut W,
names: &[T],
options: &SerializeOptions,
) -> Result<()>
where
T: AsRef<str>,
{
writer.write_record(names.iter().map(|x| x.as_ref().as_bytes()))?;
let names = names.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
writer.write_all(
names
.join(std::str::from_utf8(&[options.delimiter]).unwrap())
.as_bytes(),
)?;
writer.write_all(&[b'\n'])?;
Ok(())
}
56 changes: 48 additions & 8 deletions src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use crate::{

use super::super::super::iterator::{BufStreamingIterator, StreamingIterator};
use crate::array::{DictionaryArray, DictionaryKey, Offset};
use csv_core::WriteResult;
use std::any::Any;

/// Options to serialize logical types to CSV
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Default)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SerializeOptions {
/// used for [`DataType::Date32`]
pub date32_format: Option<String>,
Expand All @@ -28,6 +29,21 @@ pub struct SerializeOptions {
pub time64_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub timestamp_format: Option<String>,
/// used as separator/delimiter
pub delimiter: u8,
}

impl Default for SerializeOptions {
fn default() -> Self {
SerializeOptions {
date32_format: None,
date64_format: None,
time32_format: None,
time64_format: None,
timestamp_format: None,
delimiter: b',',
}
}
}

fn primitive_write<'a, T: NativeType + ToLexical>(
Expand Down Expand Up @@ -337,11 +353,35 @@ pub fn new_serializer<'a>(
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
let mut local_buf = vec![0u8; 64];
let mut ser_writer = csv_core::Writer::new();

Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
move |x, buf| {
match x {
// Empty strings are quoted.
// This will ensure a csv parser will not read them as missing
// in a delimited field
Some("") => buf.extend_from_slice(b"\"\""),
Some(s) => {
let bytes = s.as_bytes();
buf.reserve(bytes.len() * 2);

loop {
match ser_writer.field(s.as_bytes(), &mut local_buf) {
(WriteResult::OutputFull, _, _) => {
let additional = local_buf.len();
local_buf.extend(std::iter::repeat(0u8).take(additional))
}
(WriteResult::InputEmpty, _, n_out) => {
buf.extend_from_slice(&local_buf[..n_out]);
break;
}
}
}
}
_ => {}
}
},
vec![],
Expand All @@ -351,10 +391,10 @@ pub fn new_serializer<'a>(
let array = array.as_any().downcast_ref::<Utf8Array<i64>>().unwrap();
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
if let Some(x) = x {
buf.extend_from_slice(x.as_bytes());
}
|x, buf| match x {
Some("") => buf.extend_from_slice(b"\""),
Some(s) => buf.extend_from_slice(s.as_bytes()),
_ => {}
},
vec![],
))
Expand Down
43 changes: 31 additions & 12 deletions tests/it/io/csv/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ fn data() -> Chunk<Box<dyn Array>> {
fn write_csv() -> Result<()> {
let columns = data();

let write = Cursor::new(Vec::<u8>::new());
let mut writer = WriterBuilder::new().from_writer(write);

write_header(&mut writer, &["c1", "c2", "c3", "c4", "c5", "c6", "c7"])?;
let mut writer = Cursor::new(Vec::<u8>::new());
let options = SerializeOptions::default();

write_header(
&mut writer,
&["c1", "c2", "c3", "c4", "c5", "c6", "c7"],
&options,
)?;
write_chunk(&mut writer, &columns, &options)?;

// check
let buffer = writer.into_inner().unwrap().into_inner();
let buffer = writer.into_inner();
assert_eq!(
r#"c1,c2,c3,c4,c5,c6,c7
a b,123.564532,3,true,,00:20:34,d
Expand All @@ -59,18 +62,18 @@ d,-556132.25,1,,2019-04-18 02:45:55.555,23:46:03,c
fn write_csv_custom_options() -> Result<()> {
let batch = data();

let write = Cursor::new(Vec::<u8>::new());
let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write);
let mut writer = Cursor::new(Vec::<u8>::new());

let options = SerializeOptions {
time32_format: Some("%r".to_string()),
time64_format: Some("%r".to_string()),
delimiter: b'|',
..Default::default()
};
write_chunk(&mut writer, &batch, &options)?;

// check
let buffer = writer.into_inner().unwrap().into_inner();
let buffer = writer.into_inner();
assert_eq!(
r#"a b|123.564532|3|true||12:20:34 AM|d
c||2|false|2019-04-18 10:54:47.378|06:51:20 AM|a b
Expand Down Expand Up @@ -230,14 +233,13 @@ fn test_array(
data: Vec<&'static str>,
options: SerializeOptions,
) -> Result<()> {
let write = Cursor::new(Vec::<u8>::new());
let mut writer = WriterBuilder::new().delimiter(b'|').from_writer(write);
let mut writer = Cursor::new(Vec::<u8>::new());

write_header(&mut writer, &["c1"])?;
write_header(&mut writer, &["c1"], &options)?;
write_chunk(&mut writer, &columns, &options)?;

// check
let buffer = writer.into_inner().unwrap().into_inner();
let buffer = writer.into_inner();

let mut expected = "c1\n".to_owned();
expected.push_str(&data.join("\n"));
Expand Down Expand Up @@ -314,3 +316,20 @@ fn write_tz_timezone_formatted_tz() -> Result<()> {
},
)
}

#[test]
fn write_empty_and_missing() {
let a = Utf8Array::<i32>::from(&[Some(""), None]);
let b = Utf8Array::<i32>::from(&[None, Some("")]);
let columns = Chunk::new(vec![
Arc::new(a) as Arc<dyn Array>,
Arc::new(b) as Arc<dyn Array>,
]);

let mut writer = vec![];
let options = SerializeOptions::default();
write_chunk(&mut writer, &columns, &options).unwrap();
let csv = std::str::from_utf8(&writer).unwrap();

assert_eq!(csv, "\"\",\n,\"\"\n");
}

0 comments on commit 35134f7

Please sign in to comment.