Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10334: [Rust] [Parquet] NullArray roundtrip #8484

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rust/arrow/src/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl From<ArrayDataRef> for NullArray {

impl fmt::Debug for NullArray {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NullArray")
write!(f, "NullArray({})", self.len())
}
}

Expand Down Expand Up @@ -146,4 +146,10 @@ mod tests {
assert_eq!(array2.null_count(), 16);
assert_eq!(array2.offset(), 8);
}

#[test]
fn test_debug_null_array() {
let array = NullArray::new(1024 * 1024);
assert_eq!(format!("{:?}", array), "NullArray(1048576)");
}
}
81 changes: 50 additions & 31 deletions rust/arrow/src/compute/kernels/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
(Timestamp(_, _), Date32(_)) => true,
(Timestamp(_, _), Date64(_)) => true,
// date64 to timestamp might not make sense,

// end temporal casts
(Null, Int32) => true,
(_, _) => false,
}
}
Expand Down Expand Up @@ -729,25 +728,31 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result<ArrayRef> {
// single integer operation, but need to avoid integer
// math rounding down to zero

if to_size > from_size {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(multiply(
&time_array,
&Date64Array::from(vec![to_size / from_size; array.len()]),
)?) as ArrayRef)
} else if to_size < from_size {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(divide(
&time_array,
&Date64Array::from(vec![from_size / to_size; array.len()]),
)?) as ArrayRef)
} else {
cast_array_data::<Date64Type>(array, to_type.clone())
match to_size.cmp(&from_size) {
std::cmp::Ordering::Less => {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(divide(
&time_array,
&Date64Array::from(vec![from_size / to_size; array.len()]),
)?) as ArrayRef)
}
std::cmp::Ordering::Equal => {
cast_array_data::<Date64Type>(array, to_type.clone())
}
std::cmp::Ordering::Greater => {
let time_array = Date64Array::from(array.data());
Ok(Arc::new(multiply(
&time_array,
&Date64Array::from(vec![to_size / from_size; array.len()]),
)?) as ArrayRef)
}
}
}
// date64 to timestamp might not make sense,

// end temporal casts
// null to primitive/flat types
(Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; array.len()]))),

(_, _) => Err(ArrowError::ComputeError(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -2476,44 +2481,44 @@ mod tests {

// Test casting TO StringArray
let cast_type = Utf8;
let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 succeeded");
let cast_array = cast(&array, &cast_type).expect("cast to UTF-8 failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

// Test casting TO Dictionary (with different index sizes)

let cast_type = Dictionary(Box::new(Int16), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(Int32), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(Int64), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt16), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt32), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

let cast_type = Dictionary(Box::new(UInt64), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}
Expand Down Expand Up @@ -2598,11 +2603,11 @@ mod tests {
let expected = vec!["1", "null", "3"];

// Test casting TO PrimitiveArray, different dictionary type
let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 succeeded");
let cast_array = cast(&array, &Utf8).expect("cast to UTF-8 failed");
assert_eq!(array_to_strings(&cast_array), expected);
assert_eq!(cast_array.data_type(), &Utf8);

let cast_array = cast(&array, &Int64).expect("cast to int64 succeeded");
let cast_array = cast(&array, &Int64).expect("cast to int64 failed");
assert_eq!(array_to_strings(&cast_array), expected);
assert_eq!(cast_array.data_type(), &Int64);
}
Expand All @@ -2621,13 +2626,13 @@ mod tests {

// Cast to a dictionary (same value type, Int32)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Int32));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);

// Cast to a dictionary (different value type, Int8)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Int8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}
Expand All @@ -2646,11 +2651,25 @@ mod tests {

// Cast to a dictionary (same value type, Utf8)
let cast_type = Dictionary(Box::new(UInt8), Box::new(Utf8));
let cast_array = cast(&array, &cast_type).expect("cast succeeded");
let cast_array = cast(&array, &cast_type).expect("cast failed");
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(array_to_strings(&cast_array), expected);
}

#[test]
fn test_cast_null_array_to_int32() {
let array = Arc::new(NullArray::new(6)) as ArrayRef;

let expected = Int32Array::from(vec![None; 6]);

// Cast to a dictionary (same value type, Utf8)
let cast_type = DataType::Int32;
let cast_array = cast(&array, &cast_type).expect("cast failed");
let cast_array = as_primitive_array::<Int32Type>(&cast_array);
assert_eq!(cast_array.data_type(), &cast_type);
assert_eq!(cast_array, &expected);
}

/// Print the `DictionaryArray` `array` as a vector of strings
fn array_to_strings(array: &ArrayRef) -> Vec<String> {
(0..array.len())
Expand Down Expand Up @@ -2768,7 +2787,7 @@ mod tests {
)),
Arc::new(TimestampNanosecondArray::from_vec(
vec![1000, 2000],
Some(tz_name.clone()),
Some(tz_name),
)),
Arc::new(Date32Array::from(vec![1000, 2000])),
Arc::new(Date64Array::from(vec![1000, 2000])),
Expand Down
142 changes: 120 additions & 22 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,97 @@ pub trait ArrayReader {
fn get_rep_levels(&self) -> Option<&[i16]>;
}

/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow
/// NullArray type.
pub struct NullArrayReader<T: DataType> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
column_desc: ColumnDescPtr,
record_reader: RecordReader<T>,
_type_marker: PhantomData<T>,
}

impl<T: DataType> NullArrayReader<T> {
/// Construct null array reader.
pub fn new(
mut pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
) -> Result<Self> {
let mut record_reader = RecordReader::<T>::new(column_desc.clone());
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
}

Ok(Self {
data_type: ArrowType::Null,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
column_desc,
record_reader,
_type_marker: PhantomData,
})
}
}

/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for NullArrayReader<T> {
fn as_any(&self) -> &dyn Any {
self
}

/// Returns data type of primitive array.
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}

/// Reads at most `batch_size` records into array.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;

// NB can be 0 if at end of page
let records_read_once = self.record_reader.read_records(records_to_read)?;
records_read += records_read_once;

// Record reader exhausted
if records_read_once < records_to_read {
if let Some(page_reader) = self.pages.next() {
// Read from new page reader
self.record_reader.set_page_reader(page_reader?)?;
} else {
// Page reader also exhausted
break;
}
}
}

// convert to arrays
let array = arrow::array::NullArray::new(records_read);

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.record_reader.reset();
Ok(Arc::new(array))
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
}
}

/// Primitive array readers are leaves of array reader tree. They accept page iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T: DataType> {
Expand Down Expand Up @@ -859,10 +950,19 @@ impl<'a> ArrayReaderBuilder {
page_iterator,
column_desc,
)?)),
PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?)),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
} else {
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
}
}
PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
Expand Down Expand Up @@ -903,25 +1003,23 @@ impl<'a> ArrayReaderBuilder {
page_iterator, column_desc, converter
)?))
}
} else if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down
Loading