Skip to content

Commit

Permalink
ARROW-10168: [Rust] [Parquet] Schema roundtrip - use Arrow schema fro…
Browse files Browse the repository at this point in the history
…m Parquet metadata when available

@nevi-me This is one commit on top of #8330 that I'm opening to get some feedback from you on about whether this will help with ARROW-10168. I *think* this will bring the Rust implementation more in line with C++, but I'm not certain.

I tried removing the `#[ignore]` attributes from the `LargeArray` and `LargeUtf8` tests, but they're still failing because the schemas don't match yet-- it looks like [this code](https://github.com/apache/arrow/blob/b2842ab2eb0d7a7a633049a5591e1eaa254d4446/rust/parquet/src/arrow/array_reader.rs#L595-L638) will need to be changed as well.

That `build_array_reader` function's code looks very similar to the code I've changed here, is there a possibility for the code to be shared or is there a reason they're separate?

Closes #8354 from carols10cents/schema-roundtrip

Lead-authored-by: Carol (Nichols || Goulding) <carol.nichols@gmail.com>
Co-authored-by: Neville Dipale <nevilledips@gmail.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
  • Loading branch information
carols10cents and nevi-me committed Oct 12, 2020
1 parent 7bfff71 commit 12add42
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 73 deletions.
4 changes: 3 additions & 1 deletion rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ pub(crate) fn build_field<'a: 'b, 'b>(

let mut field_builder = ipc::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary));
if let Some(dictionary) = fb_dictionary {
field_builder.add_dictionary(dictionary)
}
field_builder.add_type_type(field_type.type_type);
field_builder.add_nullable(field.is_nullable());
match field_type.children {
Expand Down
106 changes: 80 additions & 26 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ use arrow::array::{
Int16BufferBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{DataType as ArrowType, DateUnit, Field, IntervalUnit, TimeUnit};
use arrow::datatypes::{
DataType as ArrowType, DateUnit, Field, IntervalUnit, Schema, TimeUnit,
};

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter,
Converter, Date32Converter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter,
Int8Converter, Int96ArrayConverter, Int96Converter, Time32MillisecondConverter,
Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter,
TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter,
UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter,
Int8Converter, Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter,
LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter,
Time32MillisecondConverter, Time32SecondConverter, Time64MicrosecondConverter,
Time64NanosecondConverter, TimestampMicrosecondConverter,
TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter,
UInt8Converter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -612,6 +616,7 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
column_indices: T,
file_reader: Rc<dyn FileReader>,
) -> Result<Box<dyn ArrayReader>>
Expand Down Expand Up @@ -650,13 +655,19 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(Rc::new(proj), Rc::new(leaves), file_reader)
.build_array_reader()
ArrayReaderBuilder::new(
Rc::new(proj),
Rc::new(arrow_schema),
Rc::new(leaves),
file_reader,
)
.build_array_reader()
}

/// Used to build array reader.
struct ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Rc<Schema>,
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Rc<HashMap<*const Type, usize>>,
Expand Down Expand Up @@ -790,11 +801,13 @@ impl<'a> ArrayReaderBuilder {
/// Construct array reader builder.
fn new(
root_schema: TypePtr,
arrow_schema: Rc<Schema>,
columns_included: Rc<HashMap<*const Type, usize>>,
file_reader: Rc<dyn FileReader>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
}
Expand Down Expand Up @@ -835,6 +848,12 @@ impl<'a> ArrayReaderBuilder {
self.file_reader.clone(),
)?);

let arrow_type = self
.arrow_schema
.field_with_name(cur_type.name())
.ok()
.map(|f| f.data_type());

match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
page_iterator,
Expand Down Expand Up @@ -866,21 +885,43 @@ impl<'a> ArrayReaderBuilder {
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
if let Some(ArrowType::LargeUtf8) = arrow_type {
let converter =
LargeUtf8Converter::new(LargeUtf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeUtf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
}
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -918,11 +959,15 @@ impl<'a> ArrayReaderBuilder {

for child in cur_type.get_fields() {
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
fields.push(Field::new(
child.name(),
child_reader.get_data_type().clone(),
child.is_optional(),
));
let field = match self.arrow_schema.field_with_name(child.name()) {
Ok(f) => f.to_owned(),
_ => Field::new(
child.name(),
child_reader.get_data_type().clone(),
child.is_optional(),
),
};
fields.push(field);
children_reader.push(child_reader);
}
}
Expand All @@ -945,6 +990,7 @@ impl<'a> ArrayReaderBuilder {
mod tests {
use super::*;
use crate::arrow::converter::Utf8Converter;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
Expand Down Expand Up @@ -1591,8 +1637,16 @@ mod tests {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Rc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.unwrap();

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
vec![0usize].into_iter(),
file_reader,
)
Expand Down
36 changes: 28 additions & 8 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
use crate::file::reader::FileReader;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
Expand All @@ -40,7 +42,12 @@ pub trait ArrowReader {

/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `column_indices`.
fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
/// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true`
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>;

Expand Down Expand Up @@ -84,16 +91,28 @@ impl ArrowReader for ParquetFileArrowReader {
)
}

fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
if leaf_columns {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
}
}

fn get_record_reader(
Expand Down Expand Up @@ -123,6 +142,7 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
column_indices,
self.file_reader.clone(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ mod tests {
}

#[test]
#[ignore] // Large Binary support isn't correct yet
#[ignore] // Large binary support isn't correct yet - buffers don't match
fn large_binary_single_column() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
Expand All @@ -1035,7 +1035,7 @@ mod tests {
}

#[test]
#[ignore] // Large String support isn't correct yet - null_bitmap and buffers don't match
#[ignore] // Large string support isn't correct yet - null_bitmap doesn't match
fn large_string_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
let raw_strs = raw_values.iter().map(|s| s.as_str());
Expand Down
52 changes: 48 additions & 4 deletions rust/parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::data_type::{ByteArray, DataType, Int96};
use arrow::{
array::{
Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder,
TimestampNanosecondBuilder,
BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder,
LargeStringBuilder, StringBuilder, TimestampNanosecondBuilder,
},
datatypes::Time32MillisecondType,
};
Expand All @@ -38,8 +38,8 @@ use arrow::datatypes::{ArrowPrimitiveType, DataType as ArrowDataType};

use arrow::array::ArrayDataBuilder;
use arrow::array::{
BinaryArray, FixedSizeBinaryArray, PrimitiveArray, StringArray,
TimestampNanosecondArray,
BinaryArray, FixedSizeBinaryArray, LargeBinaryArray, LargeStringArray,
PrimitiveArray, StringArray, TimestampNanosecondArray,
};
use std::marker::PhantomData;

Expand Down Expand Up @@ -200,6 +200,27 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
}
}

pub struct LargeUtf8ArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, LargeStringArray> for LargeUtf8ArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeStringArray> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();

let mut builder = LargeStringBuilder::with_capacity(source.len(), data_size);
for v in source {
match v {
Some(array) => builder.append_value(array.as_utf8()?),
None => builder.append_null(),
}?
}

Ok(builder.finish())
}
}

pub struct BinaryArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, BinaryArray> for BinaryArrayConverter {
Expand All @@ -216,6 +237,22 @@ impl Converter<Vec<Option<ByteArray>>, BinaryArray> for BinaryArrayConverter {
}
}

pub struct LargeBinaryArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, LargeBinaryArray> for LargeBinaryArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeBinaryArray> {
let mut builder = LargeBinaryBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(array.data()),
None => builder.append_null(),
}?
}

Ok(builder.finish())
}
}

pub type BoolConverter<'a> = ArrayRefConverter<
&'a mut RecordReader<BoolType>,
BooleanArray,
Expand Down Expand Up @@ -246,8 +283,15 @@ pub type Float32Converter = CastConverter<ParquetFloatType, Float32Type, Float32
pub type Float64Converter = CastConverter<ParquetDoubleType, Float64Type, Float64Type>;
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
pub type LargeUtf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, LargeStringArray, LargeUtf8ArrayConverter>;
pub type BinaryConverter =
ArrayRefConverter<Vec<Option<ByteArray>>, BinaryArray, BinaryArrayConverter>;
pub type LargeBinaryConverter = ArrayRefConverter<
Vec<Option<ByteArray>>,
LargeBinaryArray,
LargeBinaryArrayConverter,
>;
pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;
pub type FixedLenBinaryConverter = ArrayRefConverter<
Expand Down
3 changes: 2 additions & 1 deletion rust/parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//!
//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
//! println!("Arrow schema after projection is: {}",
//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap());
//! arrow_reader.get_schema_by_columns(vec![2, 4, 6], true).unwrap());
//!
//! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap();
//!
Expand All @@ -61,6 +61,7 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
parquet_to_arrow_schema_by_root_columns,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
1 change: 1 addition & 0 deletions rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, T> FatPtr<'a, T> {
self.ptr
}

#[allow(clippy::wrong_self_convention)]
fn to_slice_mut(&mut self) -> &mut [T] {
self.ptr
}
Expand Down
Loading

0 comments on commit 12add42

Please sign in to comment.