diff --git a/examples/json_read.rs b/examples/json_read.rs index 308646e001b..b6a25a74ef6 100644 --- a/examples/json_read.rs +++ b/examples/json_read.rs @@ -3,37 +3,23 @@ use std::io::BufReader; use std::sync::Arc; use arrow2::array::Array; -use arrow2::chunk::Chunk; -use arrow2::error::Result; +use arrow2::error::{ArrowError, Result}; use arrow2::io::json::read; -fn read_path(path: &str, projection: Option>) -> Result>> { +fn read_path(path: &str) -> Result> { // Example of reading a JSON file. - let mut reader = BufReader::new(File::open(path)?); + let reader = BufReader::new(File::open(path)?); + let data = serde_json::from_reader(reader)?; - let fields = read::infer_and_reset(&mut reader, None)?; - - let fields = if let Some(projection) = projection { - fields - .into_iter() - .filter(|field| projection.contains(&field.name.as_ref())) - .collect() + let values = if let serde_json::Value::Array(values) = data { + Ok(values) } else { - fields - }; - - // at most 1024 rows. This container can be re-used across batches. - let mut rows = vec![String::default(); 1024]; + Err(ArrowError::InvalidArgumentError("".to_string())) + }?; - // Reads up to 1024 rows. - // this is IO-intensive and performs minimal CPU work. In particular, - // no deserialization is performed. - let read = read::read_rows(&mut reader, &mut rows)?; - let rows = &rows[..read]; + let data_type = read::infer_rows(&values)?; - // deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO, - // and can be performed on a different thread pool via a channel. - read::deserialize(rows, &fields) + Ok(read::deserialize_json(&values, data_type)) } fn main() -> Result<()> { @@ -42,7 +28,7 @@ fn main() -> Result<()> { let file_path = &args[1]; - let batch = read_path(file_path, None)?; + let batch = read_path(file_path)?; println!("{:#?}", batch); Ok(()) } diff --git a/examples/ndjson_read.rs b/examples/ndjson_read.rs new file mode 100644 index 00000000000..1df6e2b6e59 --- /dev/null +++ b/examples/ndjson_read.rs @@ -0,0 +1,48 @@ +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; + +use arrow2::array::Array; +use arrow2::chunk::Chunk; +use arrow2::error::Result; +use arrow2::io::json::read; + +fn read_path(path: &str, projection: Option>) -> Result>> { + // Example of reading a NDJSON file. + let mut reader = BufReader::new(File::open(path)?); + + let fields = read::infer_and_reset(&mut reader, None)?; + + let fields = if let Some(projection) = projection { + fields + .into_iter() + .filter(|field| projection.contains(&field.name.as_ref())) + .collect() + } else { + fields + }; + + // at most 1024 rows. This container can be re-used across batches. + let mut rows = vec![String::default(); 1024]; + + // Reads up to 1024 rows. + // this is IO-intensive and performs minimal CPU work. In particular, + // no deserialization is performed. + let read = read::read_rows(&mut reader, &mut rows)?; + let rows = &rows[..read]; + + // deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO, + // and can be performed on a different thread pool via a channel. + read::deserialize(rows, &fields) +} + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + + let batch = read_path(file_path, None)?; + println!("{:#?}", batch); + Ok(()) +} diff --git a/guide/src/io/README.md b/guide/src/io/README.md index cac3d6017fe..f8e8bb8ca64 100644 --- a/guide/src/io/README.md +++ b/guide/src/io/README.md @@ -5,7 +5,7 @@ This crate offers optional features that enable interoperability with different * Arrow (`io_ipc`) * CSV (`io_csv`) * Parquet (`io_parquet`) -* Json (`io_json`) +* JSON and NDJSON (`io_json`) * Avro (`io_avro` and `io_avro_async`) In this section you can find a guide and examples for each one of them. diff --git a/guide/src/io/json_read.md b/guide/src/io/json_read.md index 66b70952707..cd3a19f4b22 100644 --- a/guide/src/io/json_read.md +++ b/guide/src/io/json_read.md @@ -1,10 +1,16 @@ # JSON read -When compiled with feature `io_json`, you can use this crate to read JSON files. +When compiled with feature `io_json`, you can use this crate to read NDJSON files: ```rust -{{#include ../../../examples/json_read.rs}} +{{#include ../../../examples/ndjson_read.rs}} ``` Note how deserialization can be performed on a separate thread pool to avoid blocking the runtime (see also [here](https://ryhl.io/blog/async-what-is-blocking/)). + +This crate also supports reading JSON, at the expense of being unable to read the file in chunks. + +```rust +{{#include ../../../examples/json_read.rs}} +``` diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 297dfdc36fa..4a1b2255b29 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -271,3 +271,10 @@ pub fn deserialize>( let (_, columns, _) = deserialize_struct(&rows, data_type).into_data(); Ok(Chunk::new(columns)) } + +/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`]. +/// +/// This function allows consuming deserialized JSON to Arrow. +pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc { + _deserialize(rows, data_type) +} diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index 150cd5f26de..81f48335d01 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -104,18 +104,21 @@ fn infer_value(value: &Value) -> Result { }) } -fn infer_array(array: &[Value]) -> Result { - let types = array.iter().map(|a| { +/// Infers a [`DataType`] from a list of JSON values +pub fn infer_rows(rows: &[Value]) -> Result { + let types = rows.iter().map(|a| { Ok(match a { Value::Null => None, Value::Number(n) => Some(infer_number(n)), Value::Bool(_) => Some(DataType::Boolean), Value::String(_) => Some(DataType::Utf8), Value::Array(array) => Some(infer_array(array)?), - Value::Object(_) => { - return Err(ArrowError::NotYetImplemented( - "Nested structs not yet supported".to_string(), - )) + Value::Object(inner) => { + let fields = inner + .iter() + .map(|(key, value)| infer_value(value).map(|dt| Field::new(key, dt, true))) + .collect::>>()?; + Some(DataType::Struct(fields)) } }) }); @@ -126,17 +129,26 @@ fn infer_array(array: &[Value]) -> Result { .flatten() .collect::>>()?; - // if a record contains only nulls, it is not - // added to values Ok(if !types.is_empty() { let types = types.into_iter().collect::>(); - let dt = coerce_data_type(&types); - DataType::List(Box::new(Field::new(ITEM_NAME, dt, true))) + coerce_data_type(&types) } else { DataType::Null }) } +fn infer_array(values: &[Value]) -> Result { + let dt = infer_rows(values)?; + + // if a record contains only nulls, it is not + // added to values + Ok(if dt == DataType::Null { + dt + } else { + DataType::List(Box::new(Field::new(ITEM_NAME, dt, true))) + }) +} + fn infer_number(n: &serde_json::Number) -> DataType { if n.is_f64() { DataType::Float64 diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index 0e534a78358..741f375480d 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -5,7 +5,7 @@ mod iterator; use crate::error::{ArrowError, Result}; -pub use deserialize::deserialize; +pub use deserialize::{deserialize, deserialize_json}; pub use infer_schema::*; /// Reads rows from `reader` into `rows`. Returns the number of read items. diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index cab14a58380..66caf83a302 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -2,6 +2,7 @@ use std::io::Cursor; use arrow2::array::*; use arrow2::datatypes::*; +use arrow2::error::ArrowError; use arrow2::error::Result; use arrow2::io::json::read; @@ -171,3 +172,40 @@ fn infer_nested_struct() -> Result<()> { assert_eq!(result, fields); Ok(()) } + +#[test] +fn read_json() -> Result<()> { + let data = r#"[ + { + "a": 1 + }, + { + "a": 2 + }, + { + "a": 3 + } + ]"#; + + let data = serde_json::from_slice(data.as_bytes())?; + + let values = if let serde_json::Value::Array(values) = data { + Ok(values) + } else { + Err(ArrowError::InvalidArgumentError("".to_string())) + }?; + + let data_type = read::infer_rows(&values)?; + + let result = read::deserialize_json(&values, data_type); + + let expected = StructArray::from_data( + DataType::Struct(vec![Field::new("a", DataType::Int64, true)]), + vec![Arc::new(Int64Array::from_slice([1, 2, 3])) as _], + None, + ); + + assert_eq!(expected, result.as_ref()); + + Ok(()) +}