Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Rust structures --> RecordBatch by adding Serde support to RawDecoder (#3949) #3979

Merged
merged 8 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::raw::struct_array::StructArrayDecoder;
use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
use crate::raw::timestamp_array::TimestampArrayDecoder;
use arrow_array::timezone::Tz;
use arrow_array::types::Float32Type;
use arrow_array::types::*;
use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
use arrow_data::ArrayData;
Expand Down Expand Up @@ -237,12 +238,14 @@ impl RawDecoder {

/// Serialize `rows` to this [`RawDecoder`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a more compelling example would be to implement some struct

struct MyRow  {
  field: i32,
  name:  String
}

And demonstrate how to turn that into a RecordBatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a fairly comprehensive example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is very cool. However, I think it is very hard to find (given that it is on "RawDecoder" that is part of arrow_json).

I suggest we add a sentence / link to this example from the front page (and maybe even bring a simpler example to the front page): https://docs.rs/arrow/latest/arrow/index.html#io

///
/// Whilst this provides a simple way to convert [`serde`]-compatible datastructures into arrow
/// [`RecordBatch`], performance-critical applications, especially where the schema is known at
/// compile-time, may benefit from instead implementing custom conversion logic as described
/// in [arrow_array::builder]
/// This provides a simple way to convert [serde]-compatible datastructures into arrow
/// [`RecordBatch`].
///
/// This can be used with [`serde_json::Value`]
/// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
/// especially where the schema is known at compile-time, however, this provides a mechanism
/// to get something up and running quickly
///
/// It can be used with [`serde_json::Value`]
///
/// ```
/// # use std::sync::Arc;
Expand All @@ -264,17 +267,56 @@ impl RawDecoder {
/// assert_eq!(values, &[2.3, 5.7])
/// ```
///
/// It can also be used with arbitrary [`Serialize`] types
/// Or with arbitrary [`Serialize`] types
///
/// ```
/// use std::collections::BTreeMap;
/// use std::sync::Arc;
/// use arrow_array::StructArray;
/// use arrow_cast::display::{ArrayFormatter, FormatOptions};
/// use arrow_json::RawReaderBuilder;
/// use arrow_schema::{DataType, Field, Fields, Schema};
/// use serde::Serialize;
/// # use std::sync::Arc;
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Schema};
/// # use serde::Serialize;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::{Float32Type, Int32Type};
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
/// float: f32,
/// }
///
/// let schema = Schema::new(vec![
/// Field::new("int32", DataType::Int32, false),
/// Field::new("float", DataType::Float32, false),
/// ]);
///
/// let rows = vec![
/// MyStruct{ int32: 0, float: 3. },
/// MyStruct{ int32: 4, float: 67.53 },
/// ];
///
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
/// decoder.serialize(&rows).unwrap();
///
/// let batch = decoder.flush().unwrap().unwrap();
///
/// // Expect batch containing two columns
/// let int32 = batch.column(0).as_primitive::<Int32Type>();
/// assert_eq!(int32.values(), &[0, 4]);
///
/// let float = batch.column(1).as_primitive::<Float32Type>();
/// assert_eq!(float.values(), &[3., 67.53]);
/// ```
///
/// Or even complex nested types
///
/// ```
/// # use std::collections::BTreeMap;
/// # use std::sync::Arc;
/// # use arrow_array::StructArray;
/// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Fields, Schema};
/// # use serde::Serialize;
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
Expand Down
47 changes: 26 additions & 21 deletions arrow-json/src/raw/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ impl<'a> TapeSerializer<'a> {
}
}

/// The tape stores all values as strings, and so must serialize numeric types
///
/// Formatting to a string only to parse it back again is rather wasteful,
/// it may be possible to tweak the tape representation to avoid this
///
/// Need to use macro as const generic expressions are unstable
/// <https://github.com/rust-lang/rust/issues/76560>
macro_rules! serialize_lexical {
macro_rules! serialize_numeric {
($s:ident, $t:ty, $v:ident) => {{
let mut buffer = [0_u8; <$t>::FORMATTED_SIZE];
let s = lexical_core::write($v, &mut buffer);
Expand Down Expand Up @@ -102,43 +107,43 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {
}

fn serialize_i8(self, v: i8) -> Result<(), SerializerError> {
serialize_lexical!(self, i8, v)
serialize_numeric!(self, i8, v)
}

fn serialize_i16(self, v: i16) -> Result<(), SerializerError> {
serialize_lexical!(self, i16, v)
serialize_numeric!(self, i16, v)
}

fn serialize_i32(self, v: i32) -> Result<(), SerializerError> {
serialize_lexical!(self, i32, v)
serialize_numeric!(self, i32, v)
}

fn serialize_i64(self, v: i64) -> Result<(), SerializerError> {
serialize_lexical!(self, i64, v)
serialize_numeric!(self, i64, v)
}

fn serialize_u8(self, v: u8) -> Result<(), SerializerError> {
serialize_lexical!(self, u8, v)
serialize_numeric!(self, u8, v)
}

fn serialize_u16(self, v: u16) -> Result<(), SerializerError> {
serialize_lexical!(self, u16, v)
serialize_numeric!(self, u16, v)
}

fn serialize_u32(self, v: u32) -> Result<(), SerializerError> {
serialize_lexical!(self, u32, v)
serialize_numeric!(self, u32, v)
}

fn serialize_u64(self, v: u64) -> Result<(), SerializerError> {
serialize_lexical!(self, u64, v)
serialize_numeric!(self, u64, v)
}

fn serialize_f32(self, v: f32) -> Result<(), SerializerError> {
serialize_lexical!(self, f32, v)
serialize_numeric!(self, f32, v)
}

fn serialize_f64(self, v: f64) -> Result<(), SerializerError> {
serialize_lexical!(self, f64, v)
serialize_numeric!(self, f64, v)
}

fn serialize_char(self, v: char) -> Result<(), SerializerError> {
Expand Down Expand Up @@ -238,14 +243,14 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {

fn serialize_tuple_variant(
self,
_name: &'static str,
name: &'static str,
_variant_index: u32,
_variant: &'static str,
variant: &'static str,
_len: usize,
) -> Result<Self::SerializeTupleVariant, SerializerError> {
Err(SerializerError(
"serializing tuple variants is not currently supported".to_string(),
))
Err(SerializerError(format!(
"serializing tuple variants is not currently supported: {name}::{variant}"
)))
}

// Maps are represented in JSON as `{ K: V, K: V, ... }`.
Expand All @@ -266,14 +271,14 @@ impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> {

fn serialize_struct_variant(
self,
_name: &'static str,
name: &'static str,
_variant_index: u32,
_variant: &'static str,
variant: &'static str,
_len: usize,
) -> Result<Self::SerializeStructVariant, SerializerError> {
Err(SerializerError(
"serializing struct variants is not currently supported".to_string(),
))
Err(SerializerError(format!(
"serializing struct variants is not currently supported: {name}::{variant}"
)))
}
}

Expand Down
1 change: 1 addition & 0 deletions arrow-json/src/raw/tape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ impl TapeDecoder {
Ok(buf.len() - iter.len())
}

/// Writes any type that implements [`Serialize`] into this [`TapeDecoder`]
pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
if let Some(b) = self.stack.last() {
return Err(ArrowError::JsonError(format!(
Expand Down
1 change: 1 addition & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ criterion = { version = "0.4", default-features = false }
half = { version = "2.1", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
tempfile = { version = "3", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive"] }

[build-dependencies]

Expand Down
46 changes: 46 additions & 0 deletions arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,52 @@
//!
//! Parquet is published as a [separate crate](https://crates.io/crates/parquet)
//!
//! # Serde Compatibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

//!
//! [`arrow_json::RawDecoder`] provides a mechanism to convert arbitrary, serde-compatible
//! structures into [`RecordBatch`].
//!
//! Whilst likely less performant than implementing a custom builder, as described in
//! [arrow_array::builder], this provides a simple mechanism to get up and running quickly
//!
//! ```
//! # use std::sync::Arc;
//! # use arrow_json::RawReaderBuilder;
//! # use arrow_schema::{DataType, Field, Schema};
//! # use serde::Serialize;
//! # use arrow_array::cast::AsArray;
//! # use arrow_array::types::{Float32Type, Int32Type};
//! #
//! #[derive(Serialize)]
//! struct MyStruct {
//! int32: i32,
//! string: String,
//! }
//!
//! let schema = Schema::new(vec![
//! Field::new("int32", DataType::Int32, false),
//! Field::new("string", DataType::Utf8, false),
//! ]);
//!
//! let rows = vec![
//! MyStruct{ int32: 5, string: "bar".to_string() },
//! MyStruct{ int32: 8, string: "foo".to_string() },
//! ];
//!
//! let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
//! decoder.serialize(&rows).unwrap();
//!
//! let batch = decoder.flush().unwrap().unwrap();
//!
//! // Expect batch containing two columns
//! let int32 = batch.column(0).as_primitive::<Int32Type>();
//! assert_eq!(int32.values(), &[5, 8]);
//!
//! let string = batch.column(1).as_string::<i32>();
//! assert_eq!(string.value(0), "bar");
//! assert_eq!(string.value(1), "foo");
//! ```
//!
//! # Memory and Buffers
//!
//! Advanced users may wish to interact with the underlying buffers of an [`Array`], for example,
Expand Down