diff --git a/rust/arrow/src/array/null.rs b/rust/arrow/src/array/null.rs index 190d2fa78fc81..08c7cf1f21ef8 100644 --- a/rust/arrow/src/array/null.rs +++ b/rust/arrow/src/array/null.rs @@ -113,7 +113,7 @@ impl From for NullArray { impl fmt::Debug for NullArray { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "NullArray") + write!(f, "NullArray({})", self.len()) } } @@ -146,4 +146,10 @@ mod tests { assert_eq!(array2.null_count(), 16); assert_eq!(array2.offset(), 8); } + + #[test] + fn test_debug_null_array() { + let array = NullArray::new(1024 * 1024); + assert_eq!(format!("{:?}", array), "NullArray(1048576)"); + } } diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 4e1bc852d426d..ab34c6a095007 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -200,8 +200,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { (Timestamp(_, _), Date32(_)) => true, (Timestamp(_, _), Date64(_)) => true, // date64 to timestamp might not make sense, - - // end temporal casts + (Null, Int32) => true, (_, _) => false, } } @@ -729,25 +728,31 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { // single integer operation, but need to avoid integer // math rounding down to zero - if to_size > from_size { - let time_array = Date64Array::from(array.data()); - Ok(Arc::new(multiply( - &time_array, - &Date64Array::from(vec![to_size / from_size; array.len()]), - )?) as ArrayRef) - } else if to_size < from_size { - let time_array = Date64Array::from(array.data()); - Ok(Arc::new(divide( - &time_array, - &Date64Array::from(vec![from_size / to_size; array.len()]), - )?) as ArrayRef) - } else { - cast_array_data::(array, to_type.clone()) + match to_size.cmp(&from_size) { + std::cmp::Ordering::Less => { + let time_array = Date64Array::from(array.data()); + Ok(Arc::new(divide( + &time_array, + &Date64Array::from(vec![from_size / to_size; array.len()]), + )?) as ArrayRef) + } + std::cmp::Ordering::Equal => { + cast_array_data::(array, to_type.clone()) + } + std::cmp::Ordering::Greater => { + let time_array = Date64Array::from(array.data()); + Ok(Arc::new(multiply( + &time_array, + &Date64Array::from(vec![to_size / from_size; array.len()]), + )?) as ArrayRef) + } } } // date64 to timestamp might not make sense, - // end temporal casts + // null to primitive/flat types + (Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; array.len()]))), + (_, _) => Err(ArrowError::ComputeError(format!( "Casting from {:?} to {:?} not supported", from_type, to_type, @@ -2476,44 +2481,44 @@ mod tests { // Test casting TO StringArray let cast_type = Utf8; - let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); // Test casting TO Dictionary (with different index sizes) let cast_type = Dictionary(Box::new(Int16), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(Int32), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(Int64), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(UInt16), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(UInt32), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); let cast_type = Dictionary(Box::new(UInt64), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); } @@ -2598,11 +2603,11 @@ mod tests { let expected = vec!["1", "null", "3"]; // Test casting TO PrimitiveArray, different dictionary type - let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 succeeded"); + let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 failed"); assert_eq!(array_to_strings(&cast_array), expected); assert_eq!(cast_array.data_type(), &Utf8); - let cast_array = cast(&array, &Int64).expect("cast to int64 succeeded"); + let cast_array = cast(&array, &Int64).expect("cast to int64 failed"); assert_eq!(array_to_strings(&cast_array), expected); assert_eq!(cast_array.data_type(), &Int64); } @@ -2621,13 +2626,13 @@ mod tests { // Cast to a dictionary (same value type, Int32) let cast_type = Dictionary(Box::new(UInt8), Box::new(Int32)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); // Cast to a dictionary (different value type, Int8) let cast_type = Dictionary(Box::new(UInt8), Box::new(Int8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); } @@ -2646,11 +2651,25 @@ mod tests { // Cast to a dictionary (same value type, Utf8) let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8)); - let cast_array = cast(&array, &cast_type).expect("cast succeeded"); + let cast_array = cast(&array, &cast_type).expect("cast failed"); assert_eq!(cast_array.data_type(), &cast_type); assert_eq!(array_to_strings(&cast_array), expected); } + #[test] + fn test_cast_null_array_to_int32() { + let array = Arc::new(NullArray::new(6)) as ArrayRef; + + let expected = Int32Array::from(vec![None; 6]); + + // Cast to a dictionary (same value type, Utf8) + let cast_type = DataType::Int32; + let cast_array = cast(&array, &cast_type).expect("cast failed"); + let cast_array = as_primitive_array::(&cast_array); + assert_eq!(cast_array.data_type(), &cast_type); + assert_eq!(cast_array, &expected); + } + /// Print the `DictionaryArray` `array` as a vector of strings fn array_to_strings(array: &ArrayRef) -> Vec { (0..array.len()) @@ -2768,7 +2787,7 @@ mod tests { )), Arc::new(TimestampNanosecondArray::from_vec( vec![1000, 2000], - Some(tz_name.clone()), + Some(tz_name), )), Arc::new(Date32Array::from(vec![1000, 2000])), Arc::new(Date64Array::from(vec![1000, 2000])), diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 49e652d8cb347..bee608c53154f 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -82,6 +82,97 @@ pub trait ArrayReader { fn get_rep_levels(&self) -> Option<&[i16]>; } +/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow +/// NullArray type. +pub struct NullArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: RecordReader, + _type_marker: PhantomData, +} + +impl NullArrayReader { + /// Construct null array reader. + pub fn new( + mut pages: Box, + column_desc: ColumnDescPtr, + ) -> Result { + let mut record_reader = RecordReader::::new(column_desc.clone()); + if let Some(page_reader) = pages.next() { + record_reader.set_page_reader(page_reader?)?; + } + + Ok(Self { + data_type: ArrowType::Null, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + _type_marker: PhantomData, + }) + } +} + +/// Implementation of primitive array reader. +impl ArrayReader for NullArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type of primitive array. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Reads at most `batch_size` records into array. + fn next_batch(&mut self, batch_size: usize) -> Result { + let mut records_read = 0usize; + while records_read < batch_size { + let records_to_read = batch_size - records_read; + + // NB can be 0 if at end of page + let records_read_once = self.record_reader.read_records(records_to_read)?; + records_read += records_read_once; + + // Record reader exhausted + if records_read_once < records_to_read { + if let Some(page_reader) = self.pages.next() { + // Read from new page reader + self.record_reader.set_page_reader(page_reader?)?; + } else { + // Page reader also exhausted + break; + } + } + } + + // convert to arrays + let array = arrow::array::NullArray::new(records_read); + + // save definition and repetition buffers + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + Ok(Arc::new(array)) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + /// Primitive array readers are leaves of array reader tree. They accept page iterator /// and read them into primitive arrays. pub struct PrimitiveArrayReader { @@ -859,10 +950,19 @@ impl<'a> ArrayReaderBuilder { page_iterator, column_desc, )?)), - PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)), + PhysicalType::INT32 => { + if let Some(ArrowType::Null) = arrow_type { + Ok(Box::new(NullArrayReader::::new( + page_iterator, + column_desc, + )?)) + } else { + Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)) + } + } PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( page_iterator, column_desc, @@ -903,25 +1003,23 @@ impl<'a> ArrayReaderBuilder { page_iterator, column_desc, converter )?)) } + } else if let Some(ArrowType::LargeBinary) = arrow_type { + let converter = + LargeBinaryConverter::new(LargeBinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + LargeBinaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) } else { - if let Some(ArrowType::LargeBinary) = arrow_type { - let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeBinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } + let converter = BinaryConverter::new(BinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + BinaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) } } PhysicalType::FIXED_LEN_BYTE_ARRAY => { diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index a17e4244d3566..ff535dcb0a702 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -128,7 +128,8 @@ fn write_leaves( mut levels: &mut Vec, ) -> Result<()> { match array.data_type() { - ArrowDataType::Int8 + ArrowDataType::Null + | ArrowDataType::Int8 | ArrowDataType::Int16 | ArrowDataType::Int32 | ArrowDataType::Int64 @@ -179,7 +180,6 @@ fn write_leaves( "Float16 arrays not supported".to_string(), )), ArrowDataType::FixedSizeList(_, _) - | ArrowDataType::Null | ArrowDataType::Boolean | ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Union(_) @@ -279,7 +279,10 @@ fn get_levels( parent_rep_levels: Option<&[i16]>, ) -> Vec { match array.data_type() { - ArrowDataType::Null => unimplemented!(), + ArrowDataType::Null => vec![Levels { + definition: parent_def_levels.iter().map(|v| (v - 1).max(0)).collect(), + repetition: None, + }], ArrowDataType::Boolean | ArrowDataType::Int8 | ArrowDataType::Int16 @@ -356,7 +359,11 @@ fn get_levels( // if datatype is a primitive, we can construct levels of the child array match child_array.data_type() { - ArrowDataType::Null => unimplemented!(), + // TODO: The behaviour of a > is untested + ArrowDataType::Null => vec![Levels { + definition: list_def_levels, + repetition: Some(list_rep_levels), + }], ArrowDataType::Boolean => unimplemented!(), ArrowDataType::Int8 | ArrowDataType::Int16 @@ -701,7 +708,7 @@ mod tests { expected_batch.schema(), None, ) - .unwrap(); + .expect("Unable to write file"); writer.write(&expected_batch).unwrap(); writer.close().unwrap(); @@ -709,7 +716,10 @@ mod tests { let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); - let actual_batch = record_batch_reader.next().unwrap().unwrap(); + let actual_batch = record_batch_reader + .next() + .expect("No batch found") + .expect("Unable to get batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); @@ -778,11 +788,15 @@ mod tests { } #[test] - #[should_panic(expected = "Null arrays not supported")] + fn all_null_primitive_single_column() { + let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE])); + one_column_roundtrip("all_null_primitive_single_column", values, true); + } + #[test] fn null_single_column() { let values = Arc::new(NullArray::new(SMALL_SIZE)); - one_column_roundtrip("null_single_column", values.clone(), true); - one_column_roundtrip("null_single_column", values, false); + one_column_roundtrip("null_single_column", values, true); + // null arrays are always nullable, a test with non-nullable nulls fails } #[test] diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 0cd41fe592539..10270fff4644c 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -308,7 +308,10 @@ fn arrow_to_parquet_type(field: &Field) -> Result { }; // create type from field match field.data_type() { - DataType::Null => Err(ArrowError("Null arrays not supported".to_string())), + DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32) + .with_logical_type(LogicalType::NONE) + .with_repetition(repetition) + .build(), DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN) .with_repetition(repetition) .build(), @@ -1501,6 +1504,7 @@ mod tests { // )))), // true, // ), + Field::new("c35", DataType::Null, true), ], metadata, );