diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index b3c11740abf9..0d701eaad283 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -30,7 +30,7 @@ use crate::cast::{ }; use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err}; use crate::hash_utils::create_hashes; -use crate::utils::wrap_into_list_array; +use crate::utils::array_into_list_array; use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::compute::kernels::numeric::*; use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder}; @@ -1667,7 +1667,7 @@ impl ScalarValue { } else { Self::iter_to_array(values.iter().cloned()).unwrap() }; - Arc::new(wrap_into_list_array(values)) + Arc::new(array_into_list_array(values)) } /// Converts a scalar value into an array of `size` rows. @@ -2058,7 +2058,7 @@ impl ScalarValue { let list_array = as_list_array(array); let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. - let arr = Arc::new(wrap_into_list_array(nested_array)); + let arr = Arc::new(array_into_list_array(nested_array)); ScalarValue::List(arr) } @@ -2067,7 +2067,7 @@ impl ScalarValue { let list_array = as_fixed_size_list_array(array)?; let nested_array = list_array.value(index); // Produces a single element `ListArray` with the value at `index`. - let arr = Arc::new(wrap_into_list_array(nested_array)); + let arr = Arc::new(array_into_list_array(nested_array)); ScalarValue::List(arr) } @@ -3052,7 +3052,7 @@ mod tests { let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8); - let expected = wrap_into_list_array(Arc::new(StringArray::from(vec![ + let expected = array_into_list_array(Arc::new(StringArray::from(vec![ "rust", "arrow", "data-fusion", @@ -3091,9 +3091,9 @@ mod tests { #[test] fn iter_to_array_string_test() { let arr1 = - wrap_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); + array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"]))); let arr2 = - wrap_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"]))); + array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"]))); let scalars = vec![ ScalarValue::List(Arc::new(arr1)), @@ -4335,13 +4335,13 @@ mod tests { // Define list-of-structs scalars let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap(); - let nl0 = ScalarValue::List(Arc::new(wrap_into_list_array(nl0_array))); + let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array))); let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap(); - let nl1 = ScalarValue::List(Arc::new(wrap_into_list_array(nl1_array))); + let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array))); let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap(); - let nl2 = ScalarValue::List(Arc::new(wrap_into_list_array(nl2_array))); + let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array))); // iter_to_array for list-of-struct let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap(); diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index b2f71e86f21e..f031f7880436 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -17,6 +17,7 @@ //! This module provides the bisect function, which implements binary search. +use crate::error::_internal_err; use crate::{DataFusionError, Result, ScalarValue}; use arrow::array::{ArrayRef, PrimitiveArray}; use arrow::buffer::OffsetBuffer; @@ -24,7 +25,7 @@ use arrow::compute; use arrow::compute::{partition, SortColumn, SortOptions}; use arrow::datatypes::{Field, SchemaRef, UInt32Type}; use arrow::record_batch::RecordBatch; -use arrow_array::ListArray; +use arrow_array::{Array, ListArray}; use sqlparser::ast::Ident; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; @@ -338,7 +339,7 @@ pub fn longest_consecutive_prefix>( /// Wrap an array into a single element `ListArray`. /// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]` -pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray { +pub fn array_into_list_array(arr: ArrayRef) -> ListArray { let offsets = OffsetBuffer::from_lengths([arr.len()]); ListArray::new( Arc::new(Field::new("item", arr.data_type().to_owned(), true)), @@ -348,6 +349,47 @@ pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray { ) } +/// Wrap arrays into a single element `ListArray`. +/// +/// Example: +/// ``` +/// use arrow::array::{Int32Array, ListArray, ArrayRef}; +/// use arrow::datatypes::{Int32Type, Field}; +/// use std::sync::Arc; +/// +/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; +/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef; +/// +/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap(); +/// +/// let expected = ListArray::from_iter_primitive::( +/// vec![ +/// Some(vec![Some(1), Some(2), Some(3)]), +/// Some(vec![Some(4), Some(5), Some(6)]), +/// ] +/// ); +/// +/// assert_eq!(list_arr, expected); +pub fn arrays_into_list_array( + arr: impl IntoIterator, +) -> Result { + let arr = arr.into_iter().collect::>(); + if arr.is_empty() { + return _internal_err!("Cannot wrap empty array into list array"); + } + + let lens = arr.iter().map(|x| x.len()).collect::>(); + // Assume data type is consistent + let data_type = arr[0].data_type().to_owned(); + let values = arr.iter().map(|x| x.as_ref()).collect::>(); + Ok(ListArray::new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::from_lengths(lens), + arrow::compute::concat(values.as_slice())?, + None, + )) +} + /// An extension trait for smart pointers. Provides an interface to get a /// raw pointer to the data (with metadata stripped away). /// diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs index 834925b8d554..4dccbfef07f8 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg.rs @@ -24,7 +24,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field}; use arrow_array::Array; use datafusion_common::cast::as_list_array; -use datafusion_common::utils::wrap_into_list_array; +use datafusion_common::utils::array_into_list_array; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::Accumulator; @@ -161,7 +161,7 @@ impl Accumulator for ArrayAggAccumulator { } let concated_array = arrow::compute::concat(&element_arrays)?; - let list_array = wrap_into_list_array(concated_array); + let list_array = array_into_list_array(concated_array); Ok(ScalarValue::List(Arc::new(list_array))) } diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs index 21143ce54a20..9b391b0c42cf 100644 --- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs @@ -185,7 +185,7 @@ mod tests { use arrow_array::types::Int32Type; use arrow_array::{Array, ListArray}; use arrow_buffer::OffsetBuffer; - use datafusion_common::utils::wrap_into_list_array; + use datafusion_common::utils::array_into_list_array; use datafusion_common::{internal_err, DataFusionError}; // arrow::compute::sort cann't sort ListArray directly, so we need to sort the inner primitive array and wrap it back into ListArray. @@ -201,7 +201,7 @@ mod tests { }; let arr = arrow::compute::sort(&arr, None).unwrap(); - let list_arr = wrap_into_list_array(arr); + let list_arr = array_into_list_array(arr); ScalarValue::List(Arc::new(list_arr)) } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 84fd301b84de..18d8c60fe7bd 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -29,7 +29,7 @@ use arrow_buffer::NullBuffer; use datafusion_common::cast::{ as_generic_string_array, as_int64_array, as_list_array, as_string_array, }; -use datafusion_common::utils::wrap_into_list_array; +use datafusion_common::utils::array_into_list_array; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, }; @@ -412,7 +412,7 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result { // Either an empty array or all nulls: DataType::Null => { let array = new_null_array(&DataType::Null, arrays.len()); - Ok(Arc::new(wrap_into_list_array(array))) + Ok(Arc::new(array_into_list_array(array))) } data_type => array_array(arrays, data_type), }