Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safely Write IntervalMonthDayNanoArray to parquet or Throw error #6299

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
IntervalDayTimeArray, IntervalYearMonthArray,
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray
};
use arrow_buffer::{i256, Buffer, IntervalDayTime};
use arrow_buffer::{i256, Buffer, IntervalDayTime, IntervalMonthDayNano};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, IntervalUnit};
use bytes::Bytes;
Expand Down Expand Up @@ -195,7 +194,14 @@ impl ArrayReader for FixedLenByteArrayReader {
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
}
IntervalUnit::MonthDayNano => {
return Err(nyi_err!("MonthDayNano intervals not supported"));
let f = |b: &[u8]| {
IntervalMonthDayNano::new(
i32::from_le_bytes(b[0..4].try_into().unwrap()),
i32::from_le_bytes(b[4..8].try_into().unwrap()),
i32::from_le_bytes(b[8..12].try_into().unwrap()) as i64,
)
};
Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f)) as ArrayRef
}
}
}
Expand Down
41 changes: 39 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,13 +949,15 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
Time32MillisecondType, Time64MicrosecondType,
IntervalMonthDayNano, IntervalMonthDayNanoType, Time32MillisecondType,
Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{
ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
ArrowError, DataType as ArrowDataType, Field, Fields, IntervalUnit, Schema, SchemaRef,
TimeUnit,
};
use arrow_select::concat::concat_batches;

Expand Down Expand Up @@ -1288,6 +1290,41 @@ mod tests {
Ok(())
}

#[test]
fn test_interval_month_day_nano_roundtrip() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"interval_nano",
ArrowDataType::Interval(IntervalUnit::MonthDayNano),
true,
)]));

let mut buf = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(IntervalMonthDayNanoArray::from(vec![
Some(IntervalMonthDayNano::new(0, 1, 5_000_000)),
Some(IntervalMonthDayNano::new(0, 3, i32::MAX as i64)),
Some(IntervalMonthDayNano::new(3, -2, 4_000)),
None,
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;

assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0).as_primitive::<IntervalMonthDayNanoType>();

Ok(())
}

struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
43 changes: 35 additions & 8 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,12 +921,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
.unwrap();
get_interval_dt_array_slice(array, indices)
}
_ => {
return Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow interval type {interval_unit:?} to parquet that is not yet implemented"
)
));
IntervalUnit::MonthDayNano => {
let array = column
.as_any()
.downcast_ref::<arrow_array::IntervalMonthDayNanoArray>()
.unwrap();
get_interval_mdn_array_slice(array, indices)?
}
},
ArrowDataType::FixedSizeBinary(_) => {
Expand Down Expand Up @@ -1019,6 +1019,33 @@ fn get_interval_dt_array_slice(
values
}

/// Returns 12-byte values representing 3 values of months, days and milliseconds (4-bytes each).
/// An Arrow MonthDayNano interval stores months, days and nanoseconds. These are 16 bytes in total,
/// but we return error when the nanoseconds value is greater than i32::MAX. Otherwise, we can
/// safely truncate the nanoseconds value to 4 bytes.
fn get_interval_mdn_array_slice(
array: &arrow_array::IntervalMonthDayNanoArray,
indices: &[usize],
) -> Result<Vec<FixedLenByteArray>> {
let mut values = Vec::with_capacity(indices.len());
for i in indices {
let mut out = [0; 12];
let value = array.value(*i);

if value.nanoseconds > i32::MAX as i64 {
return Err(ParquetError::ArrowError(
"IntervalMonthDayNano value cannot be written to parquet since it will lose precision.".to_string(),
));
}

out[0..4].copy_from_slice(&value.months.to_le_bytes());
out[4..8].copy_from_slice(&value.days.to_le_bytes());
out[8..12].copy_from_slice(&(value.nanoseconds as i32).to_le_bytes());
values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
}
Ok(values)
}

fn get_decimal_128_array_slice(
array: &arrow_array::Decimal128Array,
indices: &[usize],
Expand Down Expand Up @@ -2145,11 +2172,11 @@ mod tests {

#[test]
#[should_panic(
expected = "Attempting to write an Arrow interval type MonthDayNano to parquet that is not yet implemented"
expected = "IntervalMonthDayNano value cannot be written to parquet since it will lose precision."
)]
fn interval_month_day_nano_single_column() {
required_and_optional::<IntervalMonthDayNanoArray, _>(vec![
IntervalMonthDayNano::new(0, 1, 5),
IntervalMonthDayNano::new(0, 1, i32::MAX as i64 + 1),
IntervalMonthDayNano::new(0, 3, 2),
IntervalMonthDayNano::new(3, -2, -5),
IntervalMonthDayNano::new(-200, 4, -1),
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,8 @@ mod tests {
false, // fails to roundtrip keys_sorted
false,
),
Field::new("42", DataType::Interval(IntervalUnit::MonthDayNano), false),
Field::new("43", DataType::Interval(IntervalUnit::MonthDayNano), true),
],
meta(&[("Key", "Value")]),
);
Expand Down
Loading