Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified API for writing to JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 25, 2022
1 parent 10e6cd5 commit 4b2dff6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 30 deletions.
2 changes: 1 addition & 1 deletion benches/write_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::util::bench_util::*;

fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();
let format = write::Format::Json;

let batches = vec![Ok(columns.clone())].into_iter();

Expand Down
2 changes: 1 addition & 1 deletion examples/json_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::{

fn write_batches(path: &str, names: Vec<String>, batches: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
let mut writer = File::create(path)?;
let format = write::JsonArray::default();
let format = write::Format::Json;

let batches = batches.iter().cloned().map(Ok);

Expand Down
43 changes: 31 additions & 12 deletions src/io/json/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
//! APIs to write to JSON
mod format;
mod serialize;

pub use fallible_streaming_iterator::*;
pub use format::*;
pub use serialize::serialize;

use crate::{
array::Array,
chunk::Chunk,
error::{ArrowError, Result},
};
use format::*;

/// The supported variations of JSON supported
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Format {
/// JSON
Json,
/// NDJSON (http://ndjson.org/)
NewlineDelimitedJson,
}

/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written
/// JSON has the expected `format`
pub fn write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
fn _write<W, F, I>(writer: &mut W, format: F, mut blocks: I) -> Result<()>
where
W: std::io::Write,
F: JsonFormat,
Expand All @@ -30,28 +38,40 @@ where
Ok(())
}

/// Writes blocks of JSON-encoded data into `writer` according to format [`Format`].
/// # Implementation
/// This is IO-bounded
pub fn write<W, I>(writer: &mut W, format: Format, blocks: I) -> Result<()>
where
W: std::io::Write,
I: FallibleStreamingIterator<Item = [u8], Error = ArrowError>,
{
match format {
Format::Json => _write(writer, JsonArray::default(), blocks),
Format::NewlineDelimitedJson => _write(writer, LineDelimited::default(), blocks),
}
}

/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] to bytes.
/// Advancing it is CPU-bounded
pub struct Serializer<F, A, I>
pub struct Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
batches: I,
names: Vec<String>,
buffer: Vec<u8>,
format: F,
format: Format,
}

impl<F, A, I> Serializer<F, A, I>
impl<A, I> Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
/// Creates a new [`Serializer`].
pub fn new(batches: I, names: Vec<String>, buffer: Vec<u8>, format: F) -> Self {
pub fn new(batches: I, names: Vec<String>, buffer: Vec<u8>, format: Format) -> Self {
Self {
batches,
names,
Expand All @@ -61,9 +81,8 @@ where
}
}

impl<F, A, I> FallibleStreamingIterator for Serializer<F, A, I>
impl<A, I> FallibleStreamingIterator for Serializer<A, I>
where
F: JsonFormat,
A: AsRef<dyn Array>,
I: Iterator<Item = Result<Chunk<A>>>,
{
Expand Down
20 changes: 18 additions & 2 deletions src/io/json/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::temporal_conversions::{
use crate::util::lexical_to_bytes_mut;
use crate::{array::*, datatypes::DataType, types::NativeType};

use super::{JsonArray, JsonFormat};
use super::format::{JsonArray, JsonFormat, LineDelimited};
use super::Format;

fn boolean_serializer<'a>(
array: &'a BooleanArray,
Expand Down Expand Up @@ -249,7 +250,7 @@ fn serialize_item<F: JsonFormat>(

/// Serializes a (name, array) to a valid JSON to `buffer`
/// This is CPU-bounded
pub fn serialize<N, A, F>(names: &[N], columns: &Chunk<A>, format: F, buffer: &mut Vec<u8>)
fn _serialize<N, A, F>(names: &[N], columns: &Chunk<A>, format: F, buffer: &mut Vec<u8>)
where
N: AsRef<str>,
A: AsRef<dyn Array>,
Expand Down Expand Up @@ -278,3 +279,18 @@ where
is_first_row = false;
})
}

/// Serializes a (name, array) to a valid JSON to `buffer`
/// This is CPU-bounded
pub fn serialize<N, A>(names: &[N], columns: &Chunk<A>, format: Format, buffer: &mut Vec<u8>)
where
N: AsRef<str>,
A: AsRef<dyn Array>,
{
match format {
Format::Json => _serialize(names, columns, JsonArray::default(), buffer),
Format::NewlineDelimitedJson => {
_serialize(names, columns, LineDelimited::default(), buffer)
}
}
}
6 changes: 3 additions & 3 deletions tests/it/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ fn read_batch(data: String, fields: &[Field]) -> Result<Chunk<Arc<dyn Array>>> {
json_read::deserialize(rows, fields)
}

fn write_batch<F: json_write::JsonFormat, A: AsRef<dyn Array>>(
fn write_batch<A: AsRef<dyn Array>>(
batch: Chunk<A>,
names: Vec<String>,
format: F,
format: json_write::Format,
) -> Result<Vec<u8>> {
let batches = vec![Ok(batch)].into_iter();

Expand All @@ -46,7 +46,7 @@ fn round_trip(data: String) -> Result<()> {
let buf = write_batch(
columns.clone(),
fields.iter().map(|x| x.name.clone()).collect(),
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

let new_chunk = read_batch(String::from_utf8(buf).unwrap(), &fields)?;
Expand Down
22 changes: 11 additions & 11 deletions tests/it/io/json/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn write_simple_rows() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -45,7 +45,7 @@ fn write_simple_rows_array() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::JsonArray::default(),
json_write::Format::Json,
)?;

assert_eq!(
Expand Down Expand Up @@ -88,7 +88,7 @@ fn write_nested_struct_with_validity() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -133,7 +133,7 @@ fn write_nested_structs() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -169,7 +169,7 @@ fn write_struct_with_list_field() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -213,7 +213,7 @@ fn write_nested_list() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down Expand Up @@ -274,7 +274,7 @@ fn write_list_of_struct() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string(), "c2".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -296,7 +296,7 @@ fn write_escaped_utf8() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -315,7 +315,7 @@ fn write_quotation_marks_in_utf8() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -334,7 +334,7 @@ fn write_date32() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand All @@ -356,7 +356,7 @@ fn write_timestamp() -> Result<()> {
let buf = write_batch(
batch,
vec!["c1".to_string()],
json_write::LineDelimited::default(),
json_write::Format::NewlineDelimitedJson,
)?;

assert_eq!(
Expand Down

0 comments on commit 4b2dff6

Please sign in to comment.