Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Refactored JSON writing (5-10x) #709

Merged
merged 4 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
io_csv_read_async = ["csv-async", "lexical-core", "futures"]
io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "indexmap"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
Expand Down Expand Up @@ -300,3 +300,7 @@ harness = false
[[bench]]
name = "bitwise"
harness = false

[[bench]]
name = "write_json"
harness = false
58 changes: 58 additions & 0 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::error::Result;
use arrow2::io::json::write;
use arrow2::record_batch::RecordBatch;
use arrow2::util::bench_util::*;

fn write_batch(batch: &RecordBatch) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();

let batches = vec![Ok(batch.clone())].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn make_batch(array: impl Array + 'static) -> RecordBatch {
RecordBatch::try_from_iter([("a", Arc::new(array) as Arc<dyn Array>)]).unwrap()
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_batch(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_batch(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
34 changes: 34 additions & 0 deletions examples/json_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::{
array::Int32Array,
datatypes::{Field, Schema},
error::Result,
io::json::write,
record_batch::RecordBatch,
};

fn write_batches(path: &str, batches: &[RecordBatch]) -> Result<()> {
let mut writer = File::create(path)?;
let format = write::JsonArray::default();

let batches = batches.iter().cloned().map(Ok);

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec![], format);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;

Ok(())
}

fn main() -> Result<()> {
let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

write_batches("example.json", &[batch.clone(), batch])
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
- [Read Avro](./io/avro_read.md)
- [Write Avro](./io/avro_write.md)
- [Read JSON](./io/json_read.md)
- [Write JSON](./io/json_write.md)
8 changes: 8 additions & 0 deletions guide/src/io/json_write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Write JSON

When compiled with feature `io_json`, you can use this crate to write JSON files.
The following example writes a batch as a JSON file:

```rust
{{#include ../../../examples/json_write.rs}}
```
6 changes: 3 additions & 3 deletions src/io/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use streaming_iterator::StreamingIterator;
pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
iterator: I,
f: F,
Expand All @@ -17,7 +17,7 @@ where
impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Expand All @@ -33,7 +33,7 @@ where
impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: Fn(T, &mut Vec<u8>),
F: FnMut(T, &mut Vec<u8>),
{
type Item = [u8];

Expand Down
4 changes: 1 addition & 3 deletions src/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
//! Convert data between the Arrow memory format and JSON line-delimited records.

pub mod read;
mod write;

pub use write::*;
pub mod write;

use crate::error::ArrowError;

Expand Down
77 changes: 77 additions & 0 deletions src/io/json/write/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::{fmt::Debug, io::Write};

use crate::error::Result;

/// Trait defining how to format a sequence of JSON objects to a byte stream.
pub trait JsonFormat: Debug + Default + Copy {
#[inline]
/// write any bytes needed at the start of the file to the writer
fn start_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the start of each row
fn start_row<W: Write>(&self, _writer: &mut W, _is_first_row: bool) -> Result<()> {
Ok(())
}

#[inline]
/// write any bytes needed for the end of each row
fn end_row<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}

/// write any bytes needed for the start of each row
fn end_stream<W: Write>(&self, _writer: &mut W) -> Result<()> {
Ok(())
}
}

/// Produces JSON output with one record per line. For example
///
/// ```json
/// {"foo":1}
/// {"bar":1}
///
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct LineDelimited {}

impl JsonFormat for LineDelimited {
#[inline]
fn end_row<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"\n")?;
Ok(())
}
}

/// Produces JSON output as a single JSON array. For example
///
/// ```json
/// [{"foo":1},{"bar":1}]
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct JsonArray {}

impl JsonFormat for JsonArray {
#[inline]
fn start_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"[")?;
Ok(())
}

#[inline]
fn start_row<W: Write>(&self, writer: &mut W, is_first_row: bool) -> Result<()> {
if !is_first_row {
writer.write_all(b",")?;
}
Ok(())
}

#[inline]
fn end_stream<W: Write>(&self, writer: &mut W) -> Result<()> {
writer.write_all(b"]")?;
Ok(())
}
}
103 changes: 83 additions & 20 deletions src/io/json/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! APIs to write to JSON
mod format;
mod serialize;
mod writer;
pub use serialize::write_record_batches;
pub use writer::*;
pub use fallible_streaming_iterator::*;
pub use format::*;
pub use serialize::serialize;

use crate::{
error::{ArrowError, Result},
record_batch::RecordBatch,
};

/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written
/// JSON has the expected `format`
pub fn write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
where
W: std::io::Write,
F: JsonFormat,
I: FallibleStreamingIterator<Item = [u8], Error = ArrowError>,
{
format.start_stream(writer)?;
let mut is_first_row = true;
while let Some(block) = blocks.next()? {
format.start_row(writer, is_first_row)?;
is_first_row = false;
writer.write_all(block)?;
}
format.end_stream(writer)?;
Ok(())
}

/// [`FallibleStreamingIterator`] that serializes a [`RecordBatch`] to bytes.
/// Advancing it is CPU-bounded
pub struct Serializer<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> {
iter: I,
buffer: Vec<u8>,
format: F,
}

impl<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> Serializer<F, I> {
/// Creates a new [`Serializer`].
pub fn new(iter: I, buffer: Vec<u8>, format: F) -> Self {
Self {
iter,
buffer,
format,
}
}
}

impl<F: JsonFormat, I: Iterator<Item = Result<RecordBatch>>> FallibleStreamingIterator
for Serializer<F, I>
{
type Item = [u8];

type Error = ArrowError;

fn advance(&mut self) -> Result<()> {
self.buffer.clear();
self.iter
.next()
.map(|maybe_batch| {
maybe_batch.map(|batch| {
let names = batch
.schema()
.fields()
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<_>>();
serialize(&names, batch.columns(), self.format, &mut self.buffer)
})
})
.transpose()?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
if !self.buffer.is_empty() {
Some(&self.buffer)
} else {
None
}
}
}
Loading