Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support full u32 and u64 roundtrip through parquet #258

Merged
merged 5 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arrow/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod util;

pub use self::kernels::aggregate::*;
pub use self::kernels::arithmetic::*;
pub use self::kernels::arity::*;
pub use self::kernels::boolean::*;
pub use self::kernels::cast::*;
pub use self::kernels::comparison::*;
Expand Down
30 changes: 24 additions & 6 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,29 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
}

let target_type = self.get_data_type().clone();
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE,
PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE,
PhysicalType::INT32 => {
match target_type {
ArrowType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
// `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
ArrowUInt32Type::DATA_TYPE
}
_ => ArrowInt32Type::DATA_TYPE,
}
}
PhysicalType::INT64 => {
match target_type {
ArrowType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
// `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
ArrowUInt64Type::DATA_TYPE
}
_ => ArrowInt64Type::DATA_TYPE,
}
}
PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE,
PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE,
PhysicalType::INT96
Expand Down Expand Up @@ -343,15 +362,14 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
let target_type = self.get_data_type();
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow::compute::cast(&array, &ArrowType::Date32)?;
arrow::compute::cast(&a, target_type)?
arrow::compute::cast(&a, &target_type)?
}
ArrowType::Decimal(p, s) => {
let mut builder = DecimalBuilder::new(array.len(), *p, *s);
let mut builder = DecimalBuilder::new(array.len(), p, s);
match array.data_type() {
ArrowType::Int32 => {
let values = array.as_any().downcast_ref::<Int32Array>().unwrap();
Expand Down Expand Up @@ -380,7 +398,7 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
}
Arc::new(builder.finish()) as ArrayRef
}
_ => arrow::compute::cast(&array, target_type)?,
_ => arrow::compute::cast(&array, &target_type)?,
};

// save definition and repetition buffers
Expand Down
141 changes: 124 additions & 17 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,45 @@ fn write_leaf(
let indices = levels.filter_array_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
let values = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
ArrowDataType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.expect("Unable to get u32 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
array,
|x| x as i32,
);
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
typed.write_batch(
get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
Expand All @@ -248,6 +274,19 @@ fn write_leaf(
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("Unable to get u64 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
array,
|x| x as i64,
);
get_numeric_array_slice::<Int64Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
Expand Down Expand Up @@ -498,16 +537,20 @@ fn get_fsb_array_slice(
mod tests {
use super::*;

use std::io::Seek;
use std::sync::Arc;
use std::{fs::File, io::Seek};

use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, buffer::Buffer};

use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use crate::file::{reader::SerializedFileReader, writer::InMemoryWriteableCursor};
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
writer::InMemoryWriteableCursor,
};
use crate::util::test_common::get_temp_file;

#[test]
Expand Down Expand Up @@ -956,7 +999,7 @@ mod tests {

const SMALL_SIZE: usize = 4;

fn roundtrip(filename: &str, expected_batch: RecordBatch) {
fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File {
let file = get_temp_file(filename, &[]);

let mut writer = ArrowWriter::try_new(
Expand All @@ -968,7 +1011,7 @@ mod tests {
writer.write(&expected_batch).unwrap();
writer.close().unwrap();

let reader = SerializedFileReader::new(file).unwrap();
let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();

Expand All @@ -986,9 +1029,11 @@ mod tests {

assert_eq!(expected_data, actual_data);
}

file
}

fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) {
fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File {
let schema = Schema::new(vec![Field::new(
"col",
values.data_type().clone(),
Expand All @@ -997,7 +1042,7 @@ mod tests {
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();

roundtrip(filename, expected_batch);
roundtrip(filename, expected_batch)
}

fn values_required<A, I>(iter: I, filename: &str)
Expand Down Expand Up @@ -1449,4 +1494,66 @@ mod tests {
expected_batch,
);
}

#[test]
fn u32_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt32Array::from_iter_values(vec![
u32::MIN,
u32::MIN + 1,
(i32::MAX as u32) - 1,
i32::MAX as u32,
(i32::MAX as u32) + 1,
u32::MAX - 1,
u32::MAX,
]));
let file = one_column_roundtrip("u32_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(*stats.min() as u32, u32::MIN);
assert_eq!(*stats.max() as u32, u32::MAX);
} else {
panic!("Statistics::Int32 missing")
}
}

#[test]
fn u64_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt64Array::from_iter_values(vec![
u64::MIN,
u64::MIN + 1,
(i64::MAX as u64) - 1,
i64::MAX as u64,
(i64::MAX as u64) + 1,
u64::MAX - 1,
u64::MAX,
]));
let file = one_column_roundtrip("u64_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int64(stats) = stats {
assert_eq!(*stats.min() as u64, u64::MIN);
assert_eq!(*stats.max() as u64, u64::MAX);
} else {
panic!("Statistics::Int64 missing")
}
}
}
59 changes: 44 additions & 15 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};

use crate::basic::{Compression, Encoding, PageType, Type};
use crate::basic::{Compression, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::AsBytes;
use crate::data_type::*;
use crate::encodings::{
Expand Down Expand Up @@ -300,10 +301,18 @@ impl<T: DataType> ColumnWriterImpl<T> {
// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
if self.min_column_value.as_ref().map_or(true, |v| v > min) {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
}
if self.max_column_value.as_ref().map_or(true, |v| v < max) {
if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
}
}
Expand Down Expand Up @@ -925,31 +934,51 @@ impl<T: DataType> ColumnWriterImpl<T> {
fn update_page_min_max(&mut self, val: &T::T) {
// simple "isNaN" check that works for all types
if val == val {
if self.min_page_value.as_ref().map_or(true, |min| min > val) {
if self
.min_page_value
.as_ref()
.map_or(true, |min| self.compare_greater(min, val))
{
self.min_page_value = Some(val.clone());
}
if self.max_page_value.as_ref().map_or(true, |max| max < val) {
if self
.max_page_value
.as_ref()
.map_or(true, |max| self.compare_greater(val, max))
{
self.max_page_value = Some(val.clone());
}
}
}

fn update_column_min_max(&mut self) {
if self
.min_column_value
.as_ref()
.map_or(true, |min| min > self.min_page_value.as_ref().unwrap())
{
let update_min = self.min_column_value.as_ref().map_or(true, |min| {
let page_value = self.min_page_value.as_ref().unwrap();
self.compare_greater(min, page_value)
});
if update_min {
self.min_column_value = self.min_page_value.clone();
}
if self
.max_column_value
.as_ref()
.map_or(true, |max| max < self.max_page_value.as_ref().unwrap())
{

let update_max = self.max_column_value.as_ref().map_or(true, |max| {
let page_value = self.max_page_value.as_ref().unwrap();
self.compare_greater(page_value, max)
});
if update_max {
self.max_column_value = self.max_page_value.clone();
}
}

/// Evaluate `a > b` according to underlying logical type.
fn compare_greater(&self, a: &T::T, b: &T::T) -> bool {
if let Some(LogicalType::INTEGER(int_type)) = self.descr.logical_type() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: not all implementations might write LogicalType, even though ConvertedType is deprecated. I had to think a bit about this, but it's fine because it's on the write-side, where we will always write LogicalType going forward.

If this was on the read side, we'd have to also check ConvertedType as a fallback.

if !int_type.is_signed {
// need to compare unsigned
return a.as_u64().unwrap() > b.as_u64().unwrap();
}
}
a > b
}
}

// ----------------------------------------------------------------------
Expand Down