Skip to content

Commit

Permalink
ARROW-10334: [Rust] [Parquet] NullArray roundtrip
Browse files Browse the repository at this point in the history
This allows writing an Arrow NullArray to Parquet.
Support was added a few years ago in Parquet, and the C++ implementation supports writing null arrays.
The array is stored as an int32 which has all values set as null.
In order to implement this, we introduce a `null -> int32` cast, which creates a null int32 of same length.
Semantically, the write is the same as writing an int32 that's all null, but we create a null writer to preserve the data type.

Closes #8484 from nevi-me/ARROW-10334

Authored-by: Neville Dipale <nevilledips@gmail.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
  • Loading branch information
nevi-me committed Oct 17, 2020
1 parent ead5e14 commit 453f978
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 64 deletions.
8 changes: 7 additions & 1 deletion rust/arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl From<ArrayDataRef> for NullArray {

impl fmt::Debug for NullArray {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NullArray")
write!(f, "NullArray({})", self.len())
}
}

Expand Down Expand Up @@ -146,4 +146,10 @@ mod tests {
assert_eq!(array2.null_count(), 16);
assert_eq!(array2.offset(), 8);
}

#[test]
fn test_debug_null_array() {
let array = NullArray::new(1024 * 1024);
assert_eq!(format!("{:?}", array), "NullArray(1048576)");
}
}
81 changes: 50 additions & 31 deletions rust/arrow/src/compute/kernels/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
(Timestamp(_, _), Date32(_)) => true,
(Timestamp(_, _), Date64(_)) => true,
// date64 to timestamp might not make sense,

// end temporal casts
(Null, Int32) => true,
(_, _) => false,
}
}
Expand Down Expand Up @@ -729,25 +728,31 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result<ArrayRef> {
// single integer operation, but need to avoid integer
// math rounding down to zero

if to_size > from_size {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(multiply(
&time_array,
&Date64Array::from(vec![to_size / from_size; array.len()]),
)?) as ArrayRef)
} else if to_size < from_size {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(divide(
&time_array,
&Date64Array::from(vec![from_size / to_size; array.len()]),
)?) as ArrayRef)
} else {
cast_array_data::<Date64Type>(array, to_type.clone())
match to_size.cmp(&from_size) {
std::cmp::Ordering::Less => {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(divide(
&time_array,
&Date64Array::from(vec![from_size / to_size; array.len()]),
)?) as ArrayRef)
}
std::cmp::Ordering::Equal => {
cast_array_data::<Date64Type>(array, to_type.clone())
}
std::cmp::Ordering::Greater => {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(multiply(
&time_array,
&Date64Array::from(vec![to_size / from_size; array.len()]),
)?) as ArrayRef)
}
}
}
// date64 to timestamp might not make sense,

// end temporal casts
// null to primitive/flat types
(Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; array.len()]))),

(_, _) => Err(ArrowError::ComputeError(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -2476,44 +2481,44 @@ mod tests {

// Test casting TO StringArray
let cast_type = Utf8;
let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 succeeded");
let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

// Test casting TO Dictionary (with different index sizes)

let cast_type = Dictionary(Box::new(Int16), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(Int32), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(Int64), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt16), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt32), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt64), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}
Expand Down Expand Up @@ -2598,11 +2603,11 @@ mod tests {
let expected = vec!["1", "null", "3"];

// Test casting TO PrimitiveArray, different dictionary type
let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 succeeded");
let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 failed");
assert_eq!(array_to_strings(&cast_array), expected);
assert_eq!(cast_array.data_type(), &Utf8);

let cast_array = cast(&array, &Int64).expect("cast to int64 succeeded");
let cast_array = cast(&array, &Int64).expect("cast to int64 failed");
assert_eq!(array_to_strings(&cast_array), expected);
assert_eq!(cast_array.data_type(), &Int64);
}
Expand All @@ -2621,13 +2626,13 @@ mod tests {

// Cast to a dictionary (same value type, Int32)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Int32));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

// Cast to a dictionary (different value type, Int8)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Int8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}
Expand All @@ -2646,11 +2651,25 @@ mod tests {

// Cast to a dictionary (same value type, Utf8)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}

#[test]
fn test_cast_null_array_to_int32() {
let array = Arc::new(NullArray::new(6)) as ArrayRef;

let expected = Int32Array::from(vec![None; 6]);

// Cast to a dictionary (same value type, Utf8)
let cast_type = DataType::Int32;
let cast_array = cast(&array, &cast_type).expect("cast failed");
let cast_array = as_primitive_array::<Int32Type>(&cast_array);
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(cast_array, &expected);
}

/// Print the `DictionaryArray` `array` as a vector of strings
fn array_to_strings(array: &ArrayRef) -> Vec<String> {
(0..array.len())
Expand Down Expand Up @@ -2768,7 +2787,7 @@ mod tests {
)),
Arc::new(TimestampNanosecondArray::from_vec(
vec![1000, 2000],
Some(tz_name.clone()),
Some(tz_name),
)),
Arc::new(Date32Array::from(vec![1000, 2000])),
Arc::new(Date64Array::from(vec![1000, 2000])),
Expand Down
142 changes: 120 additions & 22 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,97 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: DataType> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
column_desc: ColumnDescPtr,
record_reader: RecordReader<T>,
_type_marker: PhantomData<T>,
}

impl<T: DataType> NullArrayReader<T> {
/// Construct null array reader.
pub fn new(
mut pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
) -> Result<Self> {
let mut record_reader = RecordReader::<T>::new(column_desc.clone());
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
}

Ok(Self {
data_type: ArrowType::Null,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
column_desc,
record_reader,
_type_marker: PhantomData,
})
}
}

/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for NullArrayReader<T> {
fn as_any(&self) -> &dyn Any {
self
}

/// Returns data type of primitive array.
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}

/// Reads at most `batch_size` records into array.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;

// NB can be 0 if at end of page
let records_read_once = self.record_reader.read_records(records_to_read)?;
records_read += records_read_once;

// Record reader exhausted
if records_read_once < records_to_read {
if let Some(page_reader) = self.pages.next() {
// Read from new page reader
self.record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
}

// convert to arrays
let array = arrow::array::NullArray::new(records_read);

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.record_reader.reset();
Ok(Arc::new(array))
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
}
}

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T: DataType> {
Expand Down Expand Up @@ -859,10 +950,19 @@ impl<'a> ArrayReaderBuilder {
page_iterator,
column_desc,
)?)),
PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?)),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
} else {
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
}
}
PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
Expand Down Expand Up @@ -903,25 +1003,23 @@ impl<'a> ArrayReaderBuilder {
page_iterator, column_desc, converter
)?))
}
} else if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down
Loading

0 comments on commit 453f978

Please sign in to comment.