Skip to content

Commit

Permalink
Add ArrayData::new_null and DataType::primitive_width (#3676)
Browse files Browse the repository at this point in the history
* Add ArrayData::new_null and DataType::primitive_width

* Add FixedSizeBinary test

* Update arrow-data/src/data.rs

Co-authored-by: askoa <112126368+askoa@users.noreply.github.com>

* Only generate nulls for first UnionArray child

---------

Co-authored-by: askoa <112126368+askoa@users.noreply.github.com>
  • Loading branch information
tustvold and askoa authored Feb 9, 2023
1 parent 6ec7226 commit 7cd29d7
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 434 deletions.
279 changes: 75 additions & 204 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
mod binary_array;

use crate::types::*;
use arrow_buffer::{Buffer, MutableBuffer, ToByteSlice};
use arrow_data::ArrayData;
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use std::any::Any;
Expand Down Expand Up @@ -634,207 +633,7 @@ pub fn new_empty_array(data_type: &DataType) -> ArrayRef {
/// assert_eq!(&array, &null_array);
/// ```
pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
// context: https://github.com/apache/arrow/pull/9469#discussion_r574761687
match data_type {
DataType::Null => Arc::new(NullArray::new(length)),
DataType::Boolean => {
let null_buf: Buffer = MutableBuffer::new_null(length).into();
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(null_buf.clone()),
0,
vec![null_buf],
vec![],
)
})
}
DataType::Int8 => new_null_sized_array::<Int8Type>(data_type, length),
DataType::UInt8 => new_null_sized_array::<UInt8Type>(data_type, length),
DataType::Int16 => new_null_sized_array::<Int16Type>(data_type, length),
DataType::UInt16 => new_null_sized_array::<UInt16Type>(data_type, length),
DataType::Float16 => new_null_sized_array::<Float16Type>(data_type, length),
DataType::Int32 => new_null_sized_array::<Int32Type>(data_type, length),
DataType::UInt32 => new_null_sized_array::<UInt32Type>(data_type, length),
DataType::Float32 => new_null_sized_array::<Float32Type>(data_type, length),
DataType::Date32 => new_null_sized_array::<Date32Type>(data_type, length),
// expanding this into Date23{unit}Type results in needless branching
DataType::Time32(_) => new_null_sized_array::<Int32Type>(data_type, length),
DataType::Int64 => new_null_sized_array::<Int64Type>(data_type, length),
DataType::UInt64 => new_null_sized_array::<UInt64Type>(data_type, length),
DataType::Float64 => new_null_sized_array::<Float64Type>(data_type, length),
DataType::Date64 => new_null_sized_array::<Date64Type>(data_type, length),
// expanding this into Timestamp{unit}Type results in needless branching
DataType::Timestamp(_, _) => new_null_sized_array::<Int64Type>(data_type, length),
DataType::Time64(_) => new_null_sized_array::<Int64Type>(data_type, length),
DataType::Duration(_) => new_null_sized_array::<Int64Type>(data_type, length),
DataType::Interval(unit) => match unit {
IntervalUnit::YearMonth => {
new_null_sized_array::<IntervalYearMonthType>(data_type, length)
}
IntervalUnit::DayTime => {
new_null_sized_array::<IntervalDayTimeType>(data_type, length)
}
IntervalUnit::MonthDayNano => {
new_null_sized_array::<IntervalMonthDayNanoType>(data_type, length)
}
},
DataType::FixedSizeBinary(value_len) => make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![Buffer::from(vec![0u8; *value_len as usize * length])],
vec![],
)
}),
DataType::Binary | DataType::Utf8 => {
new_null_binary_array::<i32>(data_type, length)
}
DataType::LargeBinary | DataType::LargeUtf8 => {
new_null_binary_array::<i64>(data_type, length)
}
DataType::List(field) => {
new_null_list_array::<i32>(data_type, field.data_type(), length)
}
DataType::LargeList(field) => {
new_null_list_array::<i64>(data_type, field.data_type(), length)
}
DataType::FixedSizeList(field, value_len) => make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![],
vec![
new_null_array(field.data_type(), *value_len as usize * length)
.data()
.clone(),
],
)
}),
DataType::Struct(fields) => {
let fields: Vec<_> = fields
.iter()
.map(|field| (field.clone(), new_null_array(field.data_type(), length)))
.collect();

let null_buffer = MutableBuffer::new_null(length);
Arc::new(StructArray::from((fields, null_buffer.into())))
}
DataType::Map(field, _keys_sorted) => {
new_null_list_array::<i32>(data_type, field.data_type(), length)
}
DataType::Union(_, _, _) => {
unimplemented!("Creating null Union array not yet supported")
}
DataType::Dictionary(key, value) => {
let keys = new_null_array(key, length);
let keys = keys.data();

make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
keys.null_buffer().cloned(),
0,
keys.buffers().into(),
vec![new_empty_array(value.as_ref()).into_data()],
)
})
}
DataType::Decimal128(_, _) => {
new_null_sized_decimal(data_type, length, std::mem::size_of::<i128>())
}
DataType::Decimal256(_, _) => new_null_sized_decimal(data_type, length, 32),
DataType::RunEndEncoded(_, _) => todo!(),
}
}

#[inline]
fn new_null_list_array<OffsetSize: OffsetSizeTrait>(
data_type: &DataType,
child_data_type: &DataType,
length: usize,
) -> ArrayRef {
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![Buffer::from(
vec![OffsetSize::zero(); length + 1].to_byte_slice(),
)],
vec![ArrayData::new_empty(child_data_type)],
)
})
}

#[inline]
fn new_null_binary_array<OffsetSize: OffsetSizeTrait>(
data_type: &DataType,
length: usize,
) -> ArrayRef {
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![
Buffer::from(vec![OffsetSize::zero(); length + 1].to_byte_slice()),
MutableBuffer::new(0).into(),
],
vec![],
)
})
}

#[inline]
fn new_null_sized_array<T: ArrowPrimitiveType>(
data_type: &DataType,
length: usize,
) -> ArrayRef {
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![Buffer::from(vec![0u8; length * T::get_byte_width()])],
vec![],
)
})
}

#[inline]
fn new_null_sized_decimal(
data_type: &DataType,
length: usize,
byte_width: usize,
) -> ArrayRef {
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![Buffer::from(vec![0u8; length * byte_width])],
vec![],
)
})
make_array(ArrayData::new_null(data_type, length))
}

// Helper function for printing potentially long arrays.
Expand Down Expand Up @@ -881,8 +680,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::downcast_array;
use arrow_schema::Field;
use crate::cast::{as_union_array, downcast_array};
use crate::downcast_run_array;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_schema::{Field, UnionMode};

#[test]
fn test_empty_primitive() {
Expand Down Expand Up @@ -1012,6 +813,76 @@ mod tests {
);
}

#[test]
fn test_null_union() {
for mode in [UnionMode::Sparse, UnionMode::Dense] {
let data_type = DataType::Union(
vec![
Field::new("foo", DataType::Int32, true),
Field::new("bar", DataType::Int64, true),
],
vec![2, 1],
mode,
);
let array = new_null_array(&data_type, 4);

let array = as_union_array(array.as_ref());
assert_eq!(array.len(), 4);
assert_eq!(array.null_count(), 0);

for i in 0..4 {
let a = array.value(i);
assert_eq!(a.len(), 1);
assert_eq!(a.null_count(), 1);
assert!(a.is_null(0))
}
}
}

#[test]
#[allow(unused_parens)]
fn test_null_runs() {
for r in [DataType::Int16, DataType::Int32, DataType::Int64] {
let data_type = DataType::RunEndEncoded(
Box::new(Field::new("run_ends", r, false)),
Box::new(Field::new("values", DataType::Utf8, true)),
);

let array = new_null_array(&data_type, 4);
let array = array.as_ref();

downcast_run_array! {
array => {
assert_eq!(array.len(), 4);
assert_eq!(array.null_count(), 0);
assert_eq!(array.values().len(), 1);
assert_eq!(array.values().null_count(), 1);
assert_eq!(array.run_ends().values(), &[4]);

let idx = array.get_physical_indices(&[0, 1, 2, 3]).unwrap();
assert_eq!(idx, &[0,0,0,0]);
}
d => unreachable!("{d}")
}
}
}

#[test]
fn test_null_fixed_size_binary() {
for size in [1, 2, 7] {
let array = new_null_array(&DataType::FixedSizeBinary(size), 6);
let array = array
.as_ref()
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();

assert_eq!(array.len(), 6);
assert_eq!(array.null_count(), 6);
array.iter().for_each(|x| assert!(x.is_none()));
}
}

#[test]
fn test_memory_size_null() {
let null_arr = NullArray::new(32);
Expand Down
Loading

0 comments on commit 7cd29d7

Please sign in to comment.