Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for deserializing JSON from iterator #989

Merged
merged 12 commits into from
May 17, 2022

Conversation

cjermain
Copy link
Contributor

This PR adds support for deserializing JSON for each row in a Utf8Array. It mimics the interface of arrow2::io::json::read and arrow2::io::ndjson::read, allowing you to apply the transformation on the full array. Currently polars supports JsonPath traversal of an underlying Utf8Array in DataFrame.str.json_path_match, but is not able to return a properly typed Series -- it only can handle the first element as a string type. This change opens support for getting the correct type. It also raises the question as to whether the reverse operations would be valuable. For now the scope of this PR is to support Utf8Array => ArrayRef of the appropriate underlying type based on the inferred or supplied schema.

In 7f00de1, I also added validity support for StructArray in the JSON deserializer. This is important to keep consistency on the null count.

I'm still getting familiar with Rust, so happy to hear any suggestions for improvements.

@cjermain
Copy link
Contributor Author

I've fixed the previous tests by adding the correct validity. I also moved the tests to be more consistent with the rest of the package.

@cjermain
Copy link
Contributor Author

@jorgecarleitao, @ritchie46, what do you think?

@ritchie46
Copy link
Collaborator

@jorgecarleitao, @ritchie46, what do you think?

I am off for the weekend but cn review after. I like the feature so thanks for the PR @cjermain.

@codecov
Copy link

codecov bot commented May 13, 2022

Codecov Report

Merging #989 (e9a1e30) into main (b9aa8e8) will increase coverage by 0.02%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #989      +/-   ##
==========================================
+ Coverage   71.37%   71.39%   +0.02%     
==========================================
  Files         357      357              
  Lines       19781    19795      +14     
==========================================
+ Hits        14118    14133      +15     
+ Misses       5663     5662       -1     
Impacted Files Coverage Δ
src/io/json/read/deserialize.rs 68.18% <100.00%> (+0.63%) ⬆️
src/io/ndjson/read/deserialize.rs 100.00% <100.00%> (ø)
src/io/ndjson/read/file.rs 83.33% <100.00%> (+3.33%) ⬆️
src/bitmap/utils/slice_iterator.rs 87.93% <0.00%> (+1.72%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b9aa8e8...e9a1e30. Read the comment docs.

Copy link
Owner

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, really sorry for the delay, have been sick for the past 3 days and I am catching up.

This looks very good. Great work!

3 suggestions:

  1. The rest of the crate declares external formats under io/ (json, csv, etc.), and keeps array for the arrow array declarations. Would it be possible to move this implementation to under io/json/read like the rest?
  2. Could we move it to outside the namespace of Utf8Array? json is quite specific format, and maybe we could just keep it under io::json::read?
  3. Reading through both implementations, it seems to me that the only thing that we need is an I: Iterator<Item=Option<&str>>. Thus, could it make sense to refactor our implementation of json to
    have two new public functions (infer and deserialize of I), and have our currently public API depend on that? Users would then use
let array: Utf8Array = ...;
let dt = infer(array.iter())?;
let array = deserialize(array.iter())?;

this way we do not need to commit to a specific implementation of the iterator (Utf8Array.iter() in this case).

@cjermain
Copy link
Contributor Author

Thanks @jorgecarleitao! Hope you are feeling better. I agree that moving it into io::json::read makes sense. There is a lot of overlap with io::ndjson::read (much of the code is based on that). I looked at adapting io::ndjson::read::infer to support iterators broadly, but was not sure how to have the FallibleStreamingIterator coexist with Iterator -- I'm still getting familiar with Rust. In C++ I would implement this with overloaded functions -- I'm not sure of the best solution in Rust. Do you have any suggestions for function signatures?

@jorgecarleitao
Copy link
Owner

So, this is what I was able to come up on the deserialization path (we need something similar for the inference path)

use std::sync::Arc;

use serde_json::Value;

use crate::array::Array;
use crate::datatypes::DataType;
use crate::error::ArrowError;

use super::super::super::json::read::_deserialize;

/// Deserializes rows into an [`Array`] of [`DataType`].
/// # Implementation
/// This function is CPU-bounded.
/// This function is guaranteed to return an array of length equal to `rows.len()`.
/// # Errors
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
pub fn deserialize(rows: &[String], data_type: DataType) -> Result<Arc<dyn Array>, ArrowError> {
    deserialize_iter(rows.iter(), data_type)
}

/// Deserializes rows into an [`Array`] of [`DataType`].
/// # Implementation
/// This function is CPU-bounded.
/// This function is guaranteed to return an array of length equal to the leng
/// # Errors
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
pub fn deserialize_iter<A: AsRef<str>>(
    rows: impl Iterator<Item = A>,
    data_type: DataType,
) -> Result<Arc<dyn Array>, ArrowError> {
    // deserialize strings to `Value`s
    let rows = rows
        .map(|row| serde_json::from_str(row.as_ref()).map_err(ArrowError::from))
        .collect::<Result<Vec<Value>, ArrowError>>()?;

    // deserialize &[Value] to Array
    Ok(_deserialize(&rows, data_type))
}


// ----------------------------
// test that it compiles for concrete Utf8Array ^_^; no need to expose it in the public API imo
// ----------------------------
fn des_array(
    array: &crate::array::Utf8Array<i32>,
    data_type: DataType,
) -> Result<Arc<dyn Array>, ArrowError> {
    deserialize_iter(array.iter().map(|x| x.unwrap_or("null")), data_type)
}

in other words, instead of requiring &[String], which assumes a very concrete memory layout (a pointer of pointers to strings in C++), expose a more generic version based on a generic (template in C++) over an iterator, so that users can "templatise" it according to their needs.


On a separate note: a StreamingIterator is an iterator-like idiom where the item is only exposed to the consumers by reference. A common use-case we use it is in decompressing data. There we usually have Fn(compressed: &[u8]) -> Vec<u8>. When we need to decompress multiple items, this is an issue because we need to allocate one vec per item.

We can avoid multiple allocations of Vec by writing "something" where an item compressed: &[u8] is decompressed to an internal buffer Vec<u8> and we only expose a reference to the internal buffer. I.e. we need an iterator of Option<&'a[u8]>, where 'a is "self". The signature of the Iterator trait does not support this atm in Rust (something solved by generic associated types / GAT). Thus, for now Rust de-facto uses a different trait for this (the StreamingIterator). FallibleStreamingIterator is the fallible version of StreamingIterator, which is a different trait because ... lack of GAT also does not allow to write a trait of StreamingIterator that supports Result<'a ...> xD

There are some aux methods to convert Iterator -> StreamingIterator; but I am not following - where do we use them here?

@cjermain
Copy link
Contributor Author

Thanks, this gives me a clear idea of what to change. The FallibleStreamingIterator is used in the FileReader in io::ndjson::read::infer, which is used in a similar way to an iterator of Option<&str>.

/// A [`FallibleStreamingIterator`] of NDJSON rows.
///
/// This iterator is used to read chunks of an NDJSON in batches.
/// This iterator is guaranteed to yield at least one row.
/// # Implementantion
/// Advancing this iterator is IO-bounded, but does require parsing each byte to find end of lines.
/// # Error
/// Advancing this iterator errors iff the reader errors.
pub struct FileReader<R: BufRead> {
reader: R,
rows: Vec<String>,
number_of_rows: usize,
remaining: usize,
}

let mut reader = FileReader::new(reader, rows, number_of_rows);
let mut data_types = HashSet::new();
while let Some(rows) = reader.next()? {
let value: Value = serde_json::from_str(&rows[0])?; // 0 because it is row by row

@cjermain
Copy link
Contributor Author

I've made the updates.

Copy link
Owner

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this 🙇, it is super.

@jorgecarleitao jorgecarleitao added the feature A new feature label May 16, 2022
@jorgecarleitao jorgecarleitao merged commit aafba7b into jorgecarleitao:main May 17, 2022
@jorgecarleitao jorgecarleitao changed the title Support for deserializing JSON from Utf8Array Added support for deserializing JSON from iterator May 17, 2022
@cjermain cjermain deleted the utf8array_json branch May 20, 2022 11:50
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature A new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants