Skip to content

Commit

Permalink
support full arrow u64 through parquet
Browse files Browse the repository at this point in the history
- updates arrow to parquet type mapping to use reinterpret/overflow cast
  for u64<->i64 similar to what the C++ stack does
- changes statistics calculation to account for the fact that u64 should
  be compared unsigned (as per spec)

Fixes #254.
  • Loading branch information
crepererum committed May 10, 2021
1 parent dceb539 commit 432299b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 16 deletions.
12 changes: 12 additions & 0 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,18 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
Arc::new(builder.finish()) as ArrayRef
}
ArrowType::UInt64 => match array.data_type() {
ArrowType::Int64 => {
// follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
// `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
let values = array.as_any().downcast_ref::<Int64Array>().unwrap();
Arc::new(arrow::compute::unary::<_, _, ArrowUInt64Type>(
values,
|x| x as u64,
))
}
_ => arrow::compute::cast(&array, target_type)?,
},
_ => arrow::compute::cast(&array, target_type)?,
};

Expand Down
50 changes: 49 additions & 1 deletion parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ fn write_leaf(
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("Unable to get u64 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
array,
|x| x as i64,
);
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
Expand Down Expand Up @@ -507,7 +520,11 @@ mod tests {
use arrow::{array::*, buffer::Buffer};

use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor};
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
writer::InMemoryWriteableCursor,
};
use crate::util::test_common::get_temp_file;

#[test]
Expand Down Expand Up @@ -1451,4 +1468,35 @@ mod tests {
expected_batch,
);
}

#[test]
fn u64_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt64Array::from_iter_values(vec![
u64::MIN,
u64::MIN + 1,
(i64::MAX as u64) - 1,
i64::MAX as u64,
(i64::MAX as u64) + 1,
u64::MAX - 1,
u64::MAX,
]));
let file = one_column_roundtrip("u64_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int64(stats) = stats {
assert_eq!(*stats.min() as u64, u64::MIN);
assert_eq!(*stats.max() as u64, u64::MAX);
} else {
panic!("Statistics::Int64 missing")
}
}
}
59 changes: 44 additions & 15 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};

use crate::basic::{Compression, Encoding, PageType, Type};
use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::AsBytes;
use crate::data_type::*;
use crate::encodings::{
Expand Down Expand Up @@ -300,10 +301,18 @@ impl<T: DataType> ColumnWriterImpl<T> {
// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
if self.min_column_value.as_ref().map_or(true, |v| v > min) {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
}
if self.max_column_value.as_ref().map_or(true, |v| v < max) {
if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
}
}
Expand Down Expand Up @@ -925,31 +934,51 @@ impl<T: DataType> ColumnWriterImpl<T> {
fn update_page_min_max(&mut self, val: &T::T) {
// simple "isNaN" check that works for all types
if val == val {
if self.min_page_value.as_ref().map_or(true, |min| min > val) {
if self
.min_page_value
.as_ref()
.map_or(true, |min| self.compare_greater(min, val))
{
self.min_page_value = Some(val.clone());
}
if self.max_page_value.as_ref().map_or(true, |max| max < val) {
if self
.max_page_value
.as_ref()
.map_or(true, |max| self.compare_greater(val, max))
{
self.max_page_value = Some(val.clone());
}
}
}

fn update_column_min_max(&mut self) {
if self
.min_column_value
.as_ref()
.map_or(true, |min| min > self.min_page_value.as_ref().unwrap())
{
let update_min = self.min_column_value.as_ref().map_or(true, |min| {
let page_value = self.min_page_value.as_ref().unwrap();
self.compare_greater(min, page_value)
});
if update_min {
self.min_column_value = self.min_page_value.clone();
}
if self
.max_column_value
.as_ref()
.map_or(true, |max| max < self.max_page_value.as_ref().unwrap())
{

let update_max = self.max_column_value.as_ref().map_or(true, |max| {
let page_value = self.max_page_value.as_ref().unwrap();
self.compare_greater(page_value, max)
});
if update_max {
self.max_column_value = self.max_page_value.clone();
}
}

/// Evaluate `a > b` according to underlying logical type.
fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() {
if !int_type.is_signed {
// need to compare unsigned
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
a > b
}
}

// ----------------------------------------------------------------------
Expand Down

0 comments on commit 432299b

Please sign in to comment.