Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved performance of writing to CSV (20-25%) (#382)
Browse files Browse the repository at this point in the history
* Added bench of csv write.

* Optimized writing of CSV.
  • Loading branch information
jorgecarleitao authored Sep 5, 2021
1 parent 3af5ffe commit 4f8d793
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 63 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hash_hasher = "^2.0.3"
csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -86,7 +87,7 @@ full = [
"compute",
]
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"]
io_json = ["serde", "serde_json", "indexmap"]
io_ipc = ["flatbuffers"]
io_ipc_compression = ["lz4", "zstd"]
Expand Down Expand Up @@ -187,3 +188,7 @@ harness = false
[[bench]]
name = "bitmap_ops"
harness = false

[[bench]]
name = "write_csv"
harness = false
4 changes: 2 additions & 2 deletions benches/comparison_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_op_scalar(&arr_a, &BooleanScalar::from(Some(true)), Operator::Eq))
});

let arr_a = create_string_array::<i32>(size, 0.1, 42);
let arr_b = create_string_array::<i32>(size, 0.1, 43);
let arr_a = create_string_array::<i32>(size, 4, 0.1, 42);
let arr_b = create_string_array::<i32>(size, 4, 0.1, 43);
c.bench_function(&format!("utf8 2^{}", log2_size), |b| {
b.iter(|| bench_op(&arr_a, &arr_b, Operator::Eq))
});
Expand Down
2 changes: 1 addition & 1 deletion benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});

let data_array = create_string_array::<i32>(size, 0.5, 42);
let data_array = create_string_array::<i32>(size, 4, 0.5, 42);
c.bench_function("filter context string", |b| {
b.iter(|| bench_built_filter(&filter, &data_array))
});
Expand Down
2 changes: 1 addition & 1 deletion benches/sort_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_lexsort(&arr_a, &arr_b))
});

let arr_a = create_string_array::<i32>(size, 0.1, 42);
let arr_a = create_string_array::<i32>(size, 4, 0.1, 42);
c.bench_function(&format!("sort utf8 null 2^{}", log2_size), |b| {
b.iter(|| bench_sort(&arr_a))
});
Expand Down
12 changes: 6 additions & 6 deletions benches/take_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,37 @@ fn add_benchmark(c: &mut Criterion) {
b.iter(|| bench_take(&values, &indices_nulls))
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
c.bench_function(&format!("take str 2^{}", log2_size), |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
c.bench_function(&format!("take str nulls 2^{}", log2_size), |b| {
b.iter(|| bench_take(&values, &indices_nulls))
});
});

let values = create_string_array::<i32>(512, 0.0, 42);
let values = create_string_array::<i32>(512, 4, 0.0, 42);
let indices = create_random_index(512, 0.5);
c.bench_function("take str null indices 512", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.0, 42);
let values = create_string_array::<i32>(1024, 4, 0.0, 42);
let indices = create_random_index(1024, 0.5);
c.bench_function("take str null indices 1024", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.5, 42);
let values = create_string_array::<i32>(1024, 4, 0.5, 42);

let indices = create_random_index(1024, 0.0);
c.bench_function("take str null values 1024", |b| {
b.iter(|| bench_take(&values, &indices))
});

let values = create_string_array::<i32>(1024, 0.5, 42);
let values = create_string_array::<i32>(1024, 4, 0.5, 42);
let indices = create_random_index(1024, 0.5);
c.bench_function("take str null values null indices 1024", |b| {
b.iter(|| bench_take(&values, &indices))
Expand Down
58 changes: 58 additions & 0 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use arrow2::util::bench_util::*;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::record_batch::RecordBatch;

fn write_batch(batch: &RecordBatch) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);

write::write_header(writer, batch.schema())?;

let options = write::SerializeOptions::default();
write::write_batch(writer, batch, &options)
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
array.data_type().clone(),
true,
)]));
RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, DataType::Int32, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);

c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, DataType::Float64, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn add_benchmark(c: &mut Criterion) {
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 2^{}", 10 + i);
c.bench_function(&a, |b| b.iter(|| write(array).unwrap()));
});
Expand Down
4 changes: 2 additions & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ fn add_benchmark(c: &mut Criterion) {
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 2^{}", 10 + i);
c.bench_function(&a, |b| b.iter(|| write(array, Encoding::Plain).unwrap()));
});

(0..=10).step_by(2).for_each(|i| {
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 0.1, 42);
let array = &create_string_array::<i32>(1024 * 2usize.pow(i), 4, 0.1, 42);
let a = format!("write utf8 delta 2^{}", 10 + i);
c.bench_function(&a, |b| {
b.iter(|| write(array, Encoding::DeltaLengthByteArray).unwrap())
Expand Down
65 changes: 65 additions & 0 deletions src/io/csv/write/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
pub use streaming_iterator::StreamingIterator;

/// A [`StreamingIterator`] with an internal buffer of [`Vec<u8>`] used to efficiently
/// present items of type `T` as `&[u8]`.
/// It is generic over the type `T` and the transformation `F: T -> &[u8]`.
pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
iterator: I,
f: F,
buffer: Vec<u8>,
is_valid: bool,
}

impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Self {
iterator,
f,
buffer,
is_valid: false,
}
}
}

impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
{
type Item = [u8];

#[inline]
fn advance(&mut self) {
let a = self.iterator.next();
if let Some(a) = a {
self.is_valid = true;
self.buffer.clear();
(self.f)(a, &mut self.buffer);
} else {
self.is_valid = false;
}
}

#[inline]
fn get(&self) -> Option<&Self::Item> {
if self.is_valid {
Some(&self.buffer)
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.iterator.size_hint()
}
}
9 changes: 6 additions & 3 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod iterator;
mod serialize;

use iterator::StreamingIterator;

use std::io::Write;

// re-export necessary public APIs from csv
Expand All @@ -15,7 +18,7 @@ use crate::{datatypes::Schema, error::Result};
fn new_serializers<'a>(
batch: &'a RecordBatch,
options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn Iterator<Item = Vec<u8>> + 'a>>> {
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
batch
.columns()
.iter()
Expand All @@ -34,7 +37,7 @@ pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result<Vec<
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
}
Expand All @@ -54,7 +57,7 @@ pub fn write_batch<W: Write>(
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `num_rows` on a `RecordBatch`
.for_each(|iter| record.push_field(&iter.next().unwrap()));
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
Result::Ok(())
Expand Down
Loading

0 comments on commit 4f8d793

Please sign in to comment.