Skip to content

Commit

Permalink
support full arrow u32 through parquet
Browse files Browse the repository at this point in the history
This is idential to the solution we now have for u64.
  • Loading branch information
crepererum committed May 10, 2021
1 parent d1a9e0b commit 2710f31
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 12 deletions.
11 changes: 10 additions & 1 deletion parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,16 @@ impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
let target_type = self.get_data_type().clone();
let arrow_data_type = match T::get_physical_type() {
PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE,
PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE,
PhysicalType::INT32 => {
match target_type {
ArrowType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map
// `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
ArrowUInt32Type::DATA_TYPE
}
_ => ArrowInt32Type::DATA_TYPE,
}
}
PhysicalType::INT64 => {
match target_type {
ArrowType::UInt64 => {
Expand Down
79 changes: 68 additions & 11 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,45 @@ fn write_leaf(
let indices = levels.filter_array_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
let values = match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
ArrowDataType::UInt32 => {
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.expect("Unable to get u32 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
array,
|x| x as i32,
);
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
get_numeric_array_slice::<Int32Type, _>(&array, &indices)
}
};
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
typed.write_batch(
get_numeric_array_slice::<Int32Type, _>(&array, &indices).as_slice(),
values.as_slice(),
Some(levels.definition.as_slice()),
levels.repetition.as_deref(),
)?
Expand Down Expand Up @@ -1469,6 +1495,37 @@ mod tests {
);
}

#[test]
fn u32_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt32Array::from_iter_values(vec![
u32::MIN,
u32::MIN + 1,
(i32::MAX as u32) - 1,
i32::MAX as u32,
(i32::MAX as u32) + 1,
u32::MAX - 1,
u32::MAX,
]));
let file = one_column_roundtrip("u32_min_max_single_column", values, false);

// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);
let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(*stats.min() as u32, u32::MIN);
assert_eq!(*stats.max() as u32, u32::MAX);
} else {
panic!("Statistics::Int32 missing")
}
}

#[test]
fn u64_min_max() {
// check values roundtrip through parquet
Expand Down

0 comments on commit 2710f31

Please sign in to comment.