Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12411: [Rust] Create RecordBatches from Iterators #7

Merged
merged 1 commit into from
Apr 27, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 131 additions & 1 deletion arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,90 @@ impl RecordBatch {
pub fn columns(&self) -> &[ArrayRef] {
&self.columns[..]
}

/// Create a `RecordBatch` from an iterable list of pairs of the
/// form `(field_name, array)`, with the same requirements on
/// fields and arrays as [`RecordBatch::try_new`]. This method is
/// often used to create a single `RecordBatch` from arrays,
/// e.g. for testing.
///
/// The resulting schema is marked as nullable for each column if
/// the array for that column is has any nulls. To explicitly
/// specify nullibility, use [`RecordBatch::try_from_iter_with_nullable`]
///
/// Example:
/// ```
/// use std::sync::Arc;
/// use arrow::array::{ArrayRef, Int32Array, StringArray};
/// use arrow::datatypes::{Schema, Field, DataType};
/// use arrow::record_batch::RecordBatch;
///
/// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
/// let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
///
/// let record_batch = RecordBatch::try_from_iter(vec![
/// ("a", a),
/// ("b", b),
/// ]);
///
pub fn try_from_iter<I, F>(value: I) -> Result<Self>
where
I: IntoIterator<Item = (F, ArrayRef)>,
F: AsRef<str>,
{
// TODO: implement `TryFrom` trait, once
// https://github.com/rust-lang/rust/issues/50133 is no longer an
// issue
let iter = value.into_iter().map(|(field_name, array)| {
let nullable = array.null_count() > 0;
(field_name, array, nullable)
});

Self::try_from_iter_with_nullable(iter)
}

/// Create a `RecordBatch` from an iterable list of tuples of the
/// form `(field_name, array, nullable)`, with the same requirements on
/// fields and arrays as [`RecordBatch::try_new`]. This method is often
/// used to create a single `RecordBatch` from arrays, e.g. for
/// testing.
///
/// Example:
/// ```
/// use std::sync::Arc;
/// use arrow::array::{ArrayRef, Int32Array, StringArray};
/// use arrow::datatypes::{Schema, Field, DataType};
/// use arrow::record_batch::RecordBatch;
///
/// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
/// let b: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")]));
///
/// // Note neither `a` nor `b` has any actual nulls, but we mark
/// // b an nullable
/// let record_batch = RecordBatch::try_from_iter_with_nullable(vec![
/// ("a", a, false),
/// ("b", b, true),
/// ]);
pub fn try_from_iter_with_nullable<I, F>(value: I) -> Result<Self>
where
I: IntoIterator<Item = (F, ArrayRef, bool)>,
F: AsRef<str>,
{
// TODO: implement `TryFrom` trait, once
// https://github.com/rust-lang/rust/issues/50133 is no longer an
// issue
let (fields, columns) = value
.into_iter()
.map(|(field_name, array, nullable)| {
let field_name = field_name.as_ref();
let field = Field::new(field_name, array.data_type().clone(), nullable);
(field, array)
})
.unzip();

let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns)
}
}

/// Options that control the behaviour used when creating a [`RecordBatch`].
Expand All @@ -261,7 +345,8 @@ impl Default for RecordBatchOptions {
}

impl From<&StructArray> for RecordBatch {
/// Create a record batch from struct array.
/// Create a record batch from struct array, where each field of
/// the `StructArray` becomes a `Field` in the schema.
///
/// This currently does not flatten and nested struct types
fn from(struct_array: &StructArray) -> Self {
Expand Down Expand Up @@ -328,7 +413,10 @@ mod tests {
let record_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])
.unwrap();
check_batch(record_batch)
}

fn check_batch(record_batch: RecordBatch) {
assert_eq!(5, record_batch.num_rows());
assert_eq!(2, record_batch.num_columns());
assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type());
Expand All @@ -337,6 +425,48 @@ mod tests {
assert_eq!(5, record_batch.column(1).data().len());
}

#[test]
fn create_record_batch_try_from_iter() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
None,
Some(4),
Some(5),
]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));

let record_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])
.expect("valid conversion");

let expected_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, false),
]);
assert_eq!(record_batch.schema().as_ref(), &expected_schema);
check_batch(record_batch);
}

#[test]
fn create_record_batch_try_from_iter_with_nullable() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"]));

// Note there are no nulls in a or b, but we specify that b is nullable
let record_batch = RecordBatch::try_from_iter_with_nullable(vec![
("a", a, false),
("b", b, true),
])
.expect("valid conversion");

let expected_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, true),
]);
assert_eq!(record_batch.schema().as_ref(), &expected_schema);
check_batch(record_batch);
}

#[test]
fn create_record_batch_schema_mismatch() {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
Expand Down