diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index d125cf6924bb..1beb7f1d3506 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -380,6 +380,18 @@ impl ArrayReader for PrimitiveArrayReader { } Arc::new(builder.finish()) as ArrayRef } + ArrowType::UInt64 => match array.data_type() { + ArrowType::Int64 => { + // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map + // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` + let values = array.as_any().downcast_ref::().unwrap(); + Arc::new(arrow::compute::unary::<_, _, ArrowUInt64Type>( + values, + |x| x as u64, + )) + } + _ => arrow::compute::cast(&array, target_type)?, + }, _ => arrow::compute::cast(&array, target_type)?, }; diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 2543efa2d34b..9b3b260edd41 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -248,6 +248,19 @@ fn write_leaf( .expect("Unable to get i64 array"); get_numeric_array_slice::(&array, &indices) } + ArrowDataType::UInt64 => { + // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map + // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0` + let array = column + .as_any() + .downcast_ref::() + .expect("Unable to get u64 array"); + let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>( + array, + |x| x as i64, + ); + get_numeric_array_slice::(&array, &indices) + } _ => { let array = arrow::compute::cast(column, &ArrowDataType::Int64)?; let array = array @@ -507,7 +520,11 @@ mod tests { use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor}; + use crate::file::{ + reader::{FileReader, SerializedFileReader}, + statistics::Statistics, + writer::InMemoryWriteableCursor, + }; use crate::util::test_common::get_temp_file; #[test] @@ -1451,4 +1468,35 @@ mod tests { expected_batch, ); } + + #[test] + fn u64_min_max() { + // check values roundtrip through parquet + let values = Arc::new(UInt64Array::from_iter_values(vec![ + u64::MIN, + u64::MIN + 1, + (i64::MAX as u64) - 1, + i64::MAX as u64, + (i64::MAX as u64) + 1, + u64::MAX - 1, + u64::MAX, + ])); + let file = one_column_roundtrip("u64_min_max_single_column", values, false); + + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + let stats = column.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::Int64(stats) = stats { + assert_eq!(*stats.min() as u64, u64::MIN); + assert_eq!(*stats.max() as u64, u64::MAX); + } else { + panic!("Statistics::Int64 missing") + } + } } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 64e4880bbd65..57ccda3a3e61 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -18,9 +18,10 @@ //! Contains column writer API. use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc}; -use crate::basic::{Compression, Encoding, PageType, Type}; +use crate::basic::{Compression, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; use crate::compression::{create_codec, Codec}; +use crate::data_type::private::ParquetValueType; use crate::data_type::AsBytes; use crate::data_type::*; use crate::encodings::{ @@ -300,10 +301,18 @@ impl ColumnWriterImpl { // Process pre-calculated statistics match (min, max) { (Some(min), Some(max)) => { - if self.min_column_value.as_ref().map_or(true, |v| v > min) { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, min)) + { self.min_column_value = Some(min.clone()); } - if self.max_column_value.as_ref().map_or(true, |v| v < max) { + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(max, v)) + { self.max_column_value = Some(max.clone()); } } @@ -925,31 +934,51 @@ impl ColumnWriterImpl { fn update_page_min_max(&mut self, val: &T::T) { // simple "isNaN" check that works for all types if val == val { - if self.min_page_value.as_ref().map_or(true, |min| min > val) { + if self + .min_page_value + .as_ref() + .map_or(true, |min| self.compare_greater(min, val)) + { self.min_page_value = Some(val.clone()); } - if self.max_page_value.as_ref().map_or(true, |max| max < val) { + if self + .max_page_value + .as_ref() + .map_or(true, |max| self.compare_greater(val, max)) + { self.max_page_value = Some(val.clone()); } } } fn update_column_min_max(&mut self) { - if self - .min_column_value - .as_ref() - .map_or(true, |min| min > self.min_page_value.as_ref().unwrap()) - { + let update_min = self.min_column_value.as_ref().map_or(true, |min| { + let page_value = self.min_page_value.as_ref().unwrap(); + self.compare_greater(min, page_value) + }); + if update_min { self.min_column_value = self.min_page_value.clone(); } - if self - .max_column_value - .as_ref() - .map_or(true, |max| max < self.max_page_value.as_ref().unwrap()) - { + + let update_max = self.max_column_value.as_ref().map_or(true, |max| { + let page_value = self.max_page_value.as_ref().unwrap(); + self.compare_greater(page_value, max) + }); + if update_max { self.max_column_value = self.max_page_value.clone(); } } + + /// Evaluate `a > b` according to underlying logical type. + fn compare_greater(&self, a: &T::T, b: &T::T) -> bool { + if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() { + if !int_type.is_signed { + // need to compare unsigned + return a.as_u64().unwrap() > b.as_u64().unwrap(); + } + } + a > b + } } // ----------------------------------------------------------------------