From fea46151929a84b44cfc88c9224b458e5a860d43 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 19 Apr 2021 07:34:34 -0400 Subject: [PATCH] ARROW-12411: [Rust] Create RecordBatches from Iterators --- arrow/src/record_batch.rs | 132 +++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 1 deletion(-) diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index 93abb909d020..39ef227f37cd 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -243,6 +243,90 @@ impl RecordBatch { pub fn columns(&self) -> &[ArrayRef] { &self.columns[..] } + + /// Create a `RecordBatch` from an iterable list of pairs of the + /// form `(field_name, array)`, with the same requirements on + /// fields and arrays as [`RecordBatch::try_new`]. This method is + /// often used to create a single `RecordBatch` from arrays, + /// e.g. for testing. + /// + /// The resulting schema is marked as nullable for each column if + /// the array for that column is has any nulls. To explicitly + /// specify nullibility, use [`RecordBatch::try_from_iter_with_nullable`] + /// + /// Example: + /// ``` + /// use std::sync::Arc; + /// use arrow::array::{ArrayRef, Int32Array, StringArray}; + /// use arrow::datatypes::{Schema, Field, DataType}; + /// use arrow::record_batch::RecordBatch; + /// + /// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + /// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"])); + /// + /// let record_batch = RecordBatch::try_from_iter(vec![ + /// ("a", a), + /// ("b", b), + /// ]); + /// + pub fn try_from_iter(value: I) -> Result + where + I: IntoIterator, + F: AsRef, + { + // TODO: implement `TryFrom` trait, once + // https://github.com/rust-lang/rust/issues/50133 is no longer an + // issue + let iter = value.into_iter().map(|(field_name, array)| { + let nullable = array.null_count() > 0; + (field_name, array, nullable) + }); + + Self::try_from_iter_with_nullable(iter) + } + + /// Create a `RecordBatch` from an iterable list of tuples of the + /// form `(field_name, array, nullable)`, with the same requirements on + /// fields and arrays as [`RecordBatch::try_new`]. This method is often + /// used to create a single `RecordBatch` from arrays, e.g. for + /// testing. + /// + /// Example: + /// ``` + /// use std::sync::Arc; + /// use arrow::array::{ArrayRef, Int32Array, StringArray}; + /// use arrow::datatypes::{Schema, Field, DataType}; + /// use arrow::record_batch::RecordBatch; + /// + /// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + /// let b: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")])); + /// + /// // Note neither `a` nor `b` has any actual nulls, but we mark + /// // b an nullable + /// let record_batch = RecordBatch::try_from_iter_with_nullable(vec![ + /// ("a", a, false), + /// ("b", b, true), + /// ]); + pub fn try_from_iter_with_nullable(value: I) -> Result + where + I: IntoIterator, + F: AsRef, + { + // TODO: implement `TryFrom` trait, once + // https://github.com/rust-lang/rust/issues/50133 is no longer an + // issue + let (fields, columns) = value + .into_iter() + .map(|(field_name, array, nullable)| { + let field_name = field_name.as_ref(); + let field = Field::new(field_name, array.data_type().clone(), nullable); + (field, array) + }) + .unzip(); + + let schema = Arc::new(Schema::new(fields)); + RecordBatch::try_new(schema, columns) + } } /// Options that control the behaviour used when creating a [`RecordBatch`]. @@ -261,7 +345,8 @@ impl Default for RecordBatchOptions { } impl From<&StructArray> for RecordBatch { - /// Create a record batch from struct array. + /// Create a record batch from struct array, where each field of + /// the `StructArray` becomes a `Field` in the schema. /// /// This currently does not flatten and nested struct types fn from(struct_array: &StructArray) -> Self { @@ -328,7 +413,10 @@ mod tests { let record_batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) .unwrap(); + check_batch(record_batch) + } + fn check_batch(record_batch: RecordBatch) { assert_eq!(5, record_batch.num_rows()); assert_eq!(2, record_batch.num_columns()); assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type()); @@ -337,6 +425,48 @@ mod tests { assert_eq!(5, record_batch.column(1).data().len()); } + #[test] + fn create_record_batch_try_from_iter() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + Some(2), + None, + Some(4), + Some(5), + ])); + let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])); + + let record_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]) + .expect("valid conversion"); + + let expected_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, false), + ]); + assert_eq!(record_batch.schema().as_ref(), &expected_schema); + check_batch(record_batch); + } + + #[test] + fn create_record_batch_try_from_iter_with_nullable() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])); + + // Note there are no nulls in a or b, but we specify that b is nullable + let record_batch = RecordBatch::try_from_iter_with_nullable(vec![ + ("a", a, false), + ("b", b, true), + ]) + .expect("valid conversion"); + + let expected_schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + ]); + assert_eq!(record_batch.schema().as_ref(), &expected_schema); + check_batch(record_batch); + } + #[test] fn create_record_batch_schema_mismatch() { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);