Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 11, 2022
1 parent 79bb05c commit e2720e4
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 39 deletions.
36 changes: 11 additions & 25 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,23 @@ use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::error::{ArrowError, Result};
use arrow2::io::json::read;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let mut reader = BufReader::new(File::open(path)?);
let reader = BufReader::new(File::open(path)?);
let data = serde_json::from_reader(reader)?;

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];
let data_type = read::infer_rows(&values)?;

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
Ok(read::deserialize_json(&values, data_type))
}

fn main() -> Result<()> {
Expand All @@ -42,7 +28,7 @@ fn main() -> Result<()> {

let file_path = &args[1];

let batch = read_path(file_path, None)?;
let batch = read_path(file_path)?;
println!("{:#?}", batch);
Ok(())
}
48 changes: 48 additions & 0 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
// Example of reading a NDJSON file.
let mut reader = BufReader::new(File::open(path)?);

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let batch = read_path(file_path, None)?;
println!("{:#?}", batch);
Ok(())
}
2 changes: 1 addition & 1 deletion guide/src/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This crate offers optional features that enable interoperability with different
* Arrow (`io_ipc`)
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* Json (`io_json`)
* JSON and NDJSON (`io_json`)
* Avro (`io_avro` and `io_avro_async`)

In this section you can find a guide and examples for each one of them.
10 changes: 8 additions & 2 deletions guide/src/io/json_read.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# JSON read

When compiled with feature `io_json`, you can use this crate to read JSON files.
When compiled with feature `io_json`, you can use this crate to read NDJSON files:

```rust
{{#include ../../../examples/json_read.rs}}
{{#include ../../../examples/ndjson_read.rs}}
```

Note how deserialization can be performed on a separate thread pool to avoid
blocking the runtime (see also [here](https://ryhl.io/blog/async-what-is-blocking/)).

This crate also supports reading JSON, at the expense of being unable to read the file in chunks.

```rust
{{#include ../../../examples/json_read.rs}}
```
7 changes: 7 additions & 0 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,10 @@ pub fn deserialize<A: AsRef<str>>(
let (_, columns, _) = deserialize_struct(&rows, data_type).into_data();
Ok(Chunk::new(columns))
}

/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`].
///
/// This function allows consuming deserialized JSON to Arrow.
pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc<dyn Array> {
_deserialize(rows, data_type)
}
32 changes: 22 additions & 10 deletions src/io/json/read/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,21 @@ fn infer_value(value: &Value) -> Result<DataType> {
})
}

fn infer_array(array: &[Value]) -> Result<DataType> {
let types = array.iter().map(|a| {
/// Infers a [`DataType`] from a list of JSON values
pub fn infer_rows(rows: &[Value]) -> Result<DataType> {
let types = rows.iter().map(|a| {
Ok(match a {
Value::Null => None,
Value::Number(n) => Some(infer_number(n)),
Value::Bool(_) => Some(DataType::Boolean),
Value::String(_) => Some(DataType::Utf8),
Value::Array(array) => Some(infer_array(array)?),
Value::Object(_) => {
return Err(ArrowError::NotYetImplemented(
"Nested structs not yet supported".to_string(),
))
Value::Object(inner) => {
let fields = inner
.iter()
.map(|(key, value)| infer_value(value).map(|dt| Field::new(key, dt, true)))
.collect::<Result<Vec<_>>>()?;
Some(DataType::Struct(fields))
}
})
});
Expand All @@ -126,17 +129,26 @@ fn infer_array(array: &[Value]) -> Result<DataType> {
.flatten()
.collect::<Result<HashSet<_>>>()?;

// if a record contains only nulls, it is not
// added to values
Ok(if !types.is_empty() {
let types = types.into_iter().collect::<Vec<_>>();
let dt = coerce_data_type(&types);
DataType::List(Box::new(Field::new(ITEM_NAME, dt, true)))
coerce_data_type(&types)
} else {
DataType::Null
})
}

fn infer_array(values: &[Value]) -> Result<DataType> {
let dt = infer_rows(values)?;

// if a record contains only nulls, it is not
// added to values
Ok(if dt == DataType::Null {
dt
} else {
DataType::List(Box::new(Field::new(ITEM_NAME, dt, true)))
})
}

fn infer_number(n: &serde_json::Number) -> DataType {
if n.is_f64() {
DataType::Float64
Expand Down
2 changes: 1 addition & 1 deletion src/io/json/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod iterator;

use crate::error::{ArrowError, Result};

pub use deserialize::deserialize;
pub use deserialize::{deserialize, deserialize_json};
pub use infer_schema::*;

/// Reads rows from `reader` into `rows`. Returns the number of read items.
Expand Down
38 changes: 38 additions & 0 deletions tests/it/io/json/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::Cursor;

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::ArrowError;
use arrow2::error::Result;
use arrow2::io::json::read;

Expand Down Expand Up @@ -171,3 +172,40 @@ fn infer_nested_struct() -> Result<()> {
assert_eq!(result, fields);
Ok(())
}

#[test]
fn read_json() -> Result<()> {
let data = r#"[
{
"a": 1
},
{
"a": 2
},
{
"a": 3
}
]"#;

let data = serde_json::from_slice(data.as_bytes())?;

let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;

let data_type = read::infer_rows(&values)?;

let result = read::deserialize_json(&values, data_type);

let expected = StructArray::from_data(
DataType::Struct(vec![Field::new("a", DataType::Int64, true)]),
vec![Arc::new(Int64Array::from_slice([1, 2, 3])) as _],
None,
);

assert_eq!(expected, result.as_ref());

Ok(())
}

0 comments on commit e2720e4

Please sign in to comment.