diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs index be1aa277ca4f..166f1568359c 100644 --- a/arrow/src/compute/mod.rs +++ b/arrow/src/compute/mod.rs @@ -23,6 +23,7 @@ mod util; pub use self::kernels::aggregate::*; pub use self::kernels::arithmetic::*; +pub use self::kernels::arity::*; pub use self::kernels::boolean::*; pub use self::kernels::cast::*; pub use self::kernels::comparison::*; diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index d125cf6924bb..f209b8baabf0 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -268,10 +268,29 @@ impl ArrayReader for PrimitiveArrayReader { } } + let target_type = self.get_data_type().clone(); let arrow_data_type = match T::get_physical_type() { PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE, - PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE, - PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE, + PhysicalType::INT32 => { + match target_type { + ArrowType::UInt32 => { + // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map + // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX` + ArrowUInt32Type::DATA_TYPE + } + _ => ArrowInt32Type::DATA_TYPE, + } + } + PhysicalType::INT64 => { + match target_type { + ArrowType::UInt64 => { + // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map + // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` + ArrowUInt64Type::DATA_TYPE + } + _ => ArrowInt64Type::DATA_TYPE, + } + } PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE, PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE, PhysicalType::INT96 @@ -343,15 +362,14 @@ impl ArrayReader for PrimitiveArrayReader { // are datatypes which we must convert explicitly. // These are: // - date64: we should cast int32 to date32, then date32 to date64. - let target_type = self.get_data_type(); let array = match target_type { ArrowType::Date64 => { // this is cheap as it internally reinterprets the data let a = arrow::compute::cast(&array, &ArrowType::Date32)?; - arrow::compute::cast(&a, target_type)? + arrow::compute::cast(&a, &target_type)? } ArrowType::Decimal(p, s) => { - let mut builder = DecimalBuilder::new(array.len(), *p, *s); + let mut builder = DecimalBuilder::new(array.len(), p, s); match array.data_type() { ArrowType::Int32 => { let values = array.as_any().downcast_ref::().unwrap(); @@ -380,7 +398,7 @@ impl ArrayReader for PrimitiveArrayReader { } Arc::new(builder.finish()) as ArrayRef } - _ => arrow::compute::cast(&array, target_type)?, + _ => arrow::compute::cast(&array, &target_type)?, }; // save definition and repetition buffers diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index b4915a3111d3..df6ce98c6d04 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -211,19 +211,45 @@ fn write_leaf( let indices = levels.filter_array_indices(); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { - // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 - let array = if let ArrowDataType::Date64 = column.data_type() { - let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; - arrow::compute::cast(&array, &ArrowDataType::Int32)? - } else { - arrow::compute::cast(column, &ArrowDataType::Int32)? + let values = match column.data_type() { + ArrowDataType::Date64 => { + // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 + let array = if let ArrowDataType::Date64 = column.data_type() { + let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; + arrow::compute::cast(&array, &ArrowDataType::Int32)? + } else { + arrow::compute::cast(column, &ArrowDataType::Int32)? + }; + let array = array + .as_any() + .downcast_ref::() + .expect("Unable to get int32 array"); + get_numeric_array_slice::(&array, &indices) + } + ArrowDataType::UInt32 => { + // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map + // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0` + let array = column + .as_any() + .downcast_ref::() + .expect("Unable to get u32 array"); + let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>( + array, + |x| x as i32, + ); + get_numeric_array_slice::(&array, &indices) + } + _ => { + let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; + let array = array + .as_any() + .downcast_ref::() + .expect("Unable to get i32 array"); + get_numeric_array_slice::(&array, &indices) + } }; - let array = array - .as_any() - .downcast_ref::() - .expect("Unable to get int32 array"); typed.write_batch( - get_numeric_array_slice::(&array, &indices).as_slice(), + values.as_slice(), Some(levels.definition.as_slice()), levels.repetition.as_deref(), )? @@ -248,6 +274,19 @@ fn write_leaf( .expect("Unable to get i64 array"); get_numeric_array_slice::(&array, &indices) } + ArrowDataType::UInt64 => { + // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map + // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0` + let array = column + .as_any() + .downcast_ref::() + .expect("Unable to get u64 array"); + let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>( + array, + |x| x as i64, + ); + get_numeric_array_slice::(&array, &indices) + } _ => { let array = arrow::compute::cast(column, &ArrowDataType::Int64)?; let array = array @@ -498,8 +537,8 @@ fn get_fsb_array_slice( mod tests { use super::*; - use std::io::Seek; use std::sync::Arc; + use std::{fs::File, io::Seek}; use arrow::datatypes::ToByteSlice; use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; @@ -507,7 +546,11 @@ mod tests { use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor}; + use crate::file::{ + reader::{FileReader, SerializedFileReader}, + statistics::Statistics, + writer::InMemoryWriteableCursor, + }; use crate::util::test_common::get_temp_file; #[test] @@ -956,7 +999,7 @@ mod tests { const SMALL_SIZE: usize = 4; - fn roundtrip(filename: &str, expected_batch: RecordBatch) { + fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File { let file = get_temp_file(filename, &[]); let mut writer = ArrowWriter::try_new( @@ -968,7 +1011,7 @@ mod tests { writer.write(&expected_batch).unwrap(); writer.close().unwrap(); - let reader = SerializedFileReader::new(file).unwrap(); + let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); @@ -986,9 +1029,11 @@ mod tests { assert_eq!(expected_data, actual_data); } + + file } - fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) { + fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File { let schema = Schema::new(vec![Field::new( "col", values.data_type().clone(), @@ -997,7 +1042,7 @@ mod tests { let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); - roundtrip(filename, expected_batch); + roundtrip(filename, expected_batch) } fn values_required(iter: I, filename: &str) @@ -1449,4 +1494,66 @@ mod tests { expected_batch, ); } + + #[test] + fn u32_min_max() { + // check values roundtrip through parquet + let values = Arc::new(UInt32Array::from_iter_values(vec![ + u32::MIN, + u32::MIN + 1, + (i32::MAX as u32) - 1, + i32::MAX as u32, + (i32::MAX as u32) + 1, + u32::MAX - 1, + u32::MAX, + ])); + let file = one_column_roundtrip("u32_min_max_single_column", values, false); + + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + let stats = column.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::Int32(stats) = stats { + assert_eq!(*stats.min() as u32, u32::MIN); + assert_eq!(*stats.max() as u32, u32::MAX); + } else { + panic!("Statistics::Int32 missing") + } + } + + #[test] + fn u64_min_max() { + // check values roundtrip through parquet + let values = Arc::new(UInt64Array::from_iter_values(vec![ + u64::MIN, + u64::MIN + 1, + (i64::MAX as u64) - 1, + i64::MAX as u64, + (i64::MAX as u64) + 1, + u64::MAX - 1, + u64::MAX, + ])); + let file = one_column_roundtrip("u64_min_max_single_column", values, false); + + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + let stats = column.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::Int64(stats) = stats { + assert_eq!(*stats.min() as u64, u64::MIN); + assert_eq!(*stats.max() as u64, u64::MAX); + } else { + panic!("Statistics::Int64 missing") + } + } } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 64e4880bbd65..57ccda3a3e61 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -18,9 +18,10 @@ //! Contains column writer API. use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc}; -use crate::basic::{Compression, Encoding, PageType, Type}; +use crate::basic::{Compression, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::compression::{create_codec, Codec}; +use crate::data_type::private::ParquetValueType; use crate::data_type::AsBytes; use crate::data_type::*; use crate::encodings::{ @@ -300,10 +301,18 @@ impl ColumnWriterImpl { // Process pre-calculated statistics match (min, max) { (Some(min), Some(max)) => { - if self.min_column_value.as_ref().map_or(true, |v| v > min) { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, min)) + { self.min_column_value = Some(min.clone()); } - if self.max_column_value.as_ref().map_or(true, |v| v < max) { + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(max, v)) + { self.max_column_value = Some(max.clone()); } } @@ -925,31 +934,51 @@ impl ColumnWriterImpl { fn update_page_min_max(&mut self, val: &T::T) { // simple "isNaN" check that works for all types if val == val { - if self.min_page_value.as_ref().map_or(true, |min| min > val) { + if self + .min_page_value + .as_ref() + .map_or(true, |min| self.compare_greater(min, val)) + { self.min_page_value = Some(val.clone()); } - if self.max_page_value.as_ref().map_or(true, |max| max < val) { + if self + .max_page_value + .as_ref() + .map_or(true, |max| self.compare_greater(val, max)) + { self.max_page_value = Some(val.clone()); } } } fn update_column_min_max(&mut self) { - if self - .min_column_value - .as_ref() - .map_or(true, |min| min > self.min_page_value.as_ref().unwrap()) - { + let update_min = self.min_column_value.as_ref().map_or(true, |min| { + let page_value = self.min_page_value.as_ref().unwrap(); + self.compare_greater(min, page_value) + }); + if update_min { self.min_column_value = self.min_page_value.clone(); } - if self - .max_column_value - .as_ref() - .map_or(true, |max| max < self.max_page_value.as_ref().unwrap()) - { + + let update_max = self.max_column_value.as_ref().map_or(true, |max| { + let page_value = self.max_page_value.as_ref().unwrap(); + self.compare_greater(page_value, max) + }); + if update_max { self.max_column_value = self.max_page_value.clone(); } } + + /// Evaluate `a > b` according to underlying logical type. + fn compare_greater(&self, a: &T::T, b: &T::T) -> bool { + if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() { + if !int_type.is_signed { + // need to compare unsigned + return a.as_u64().unwrap() > b.as_u64().unwrap(); + } + } + a > b + } } // ----------------------------------------------------------------------