From 69f73e1e472440fae88a5cf3d5bc1f67ffa58178 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 7 Oct 2020 15:14:51 -0400 Subject: [PATCH 01/28] ARROW-8426: [Rust] [Parquet] - Add more support for converting Dicts This adds more support for: - When converting Arrow -> Parquet containing an Arrow Dictionary, materialize the Dictionary values and send to Parquet to be encoded with a dictionary or not according to the Parquet settings (not supported: converting an Arrow Dictionary directly to Parquet DictEncoding, also only supports Int32 index types in this commit, also removes NULLs) - When converting Parquet -> Arrow, noticing that the Arrow schema metadata in a Parquet file has a Dictionary type and converting the data to an Arrow dictionary (right now this only supports String dictionaries --- rust/parquet/src/arrow/array_reader.rs | 70 ++++++++++++++-- rust/parquet/src/arrow/arrow_writer.rs | 112 ++++++++++++++++++++++++- rust/parquet/src/arrow/converter.rs | 44 +++++++++- 3 files changed, 211 insertions(+), 15 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 9646e7481aef4..a9e757792ba9d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -57,14 +57,14 @@ use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, - Converter, Date32Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter, - Int8Converter, Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, - LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter, - Time32MillisecondConverter, Time32SecondConverter, Time64MicrosecondConverter, - Time64NanosecondConverter, TimestampMicrosecondConverter, - TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, - UInt8Converter, Utf8ArrayConverter, Utf8Converter, + Converter, Date32Converter, DictionaryArrayConverter, DictionaryConverter, + FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, Float64Converter, + Int16Converter, Int32Converter, Int64Converter, Int8Converter, Int96ArrayConverter, + Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, + LargeUtf8ArrayConverter, LargeUtf8Converter, Time32MillisecondConverter, + Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter, + TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter, + UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -1488,6 +1488,60 @@ impl<'a> ArrayReaderBuilder { >::new( page_iterator, column_desc, converter )?)) + } else if let Some(ArrowType::Dictionary(index_type, _)) = arrow_type + { + match **index_type { + ArrowType::Int8 => { + let converter = + DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int16 => { + let converter = + DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int32 => { + let converter = + DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int64 => { + let converter = + DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ref other => { + return Err(general_err!( + "Invalid/Unsupported index type for dictionary: {:?}", + other + )) + } + } } else { let converter = Utf8Converter::new(Utf8ArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index d5e2db40fea27..1f4dfacb7c123 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -176,14 +176,60 @@ fn write_leaves( } Ok(()) } + ArrowDataType::Dictionary(k, v) => { + // Materialize the packed dictionary and let the writer repack it + let any_array = array.as_any(); + let (k2, v2) = match &**k { + ArrowDataType::Int32 => { + let typed_array = any_array + .downcast_ref::() + .expect("Unable to get dictionary array"); + + (typed_array.keys(), typed_array.values()) + } + o => unimplemented!("Unknown key type {:?}", o), + }; + + let k3 = k2; + let v3 = v2 + .as_any() + .downcast_ref::() + .unwrap(); + + // TODO: This removes NULL values; what _should_ be done? + // FIXME: Don't use `as` + let materialized: Vec<_> = k3 + .flatten() + .map(|k| v3.value(k as usize)) + .map(ByteArray::from) + .collect(); + // + + let mut col_writer = get_col_writer(&mut row_group_writer)?; + let levels = levels.pop().unwrap(); + + use ColumnWriter::*; + match (&mut col_writer, &**v) { + (ByteArrayColumnWriter(typed), ArrowDataType::Utf8) => { + typed.write_batch( + &materialized, + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )?; + } + o => unimplemented!("ColumnWriter not supported for {:?}", o.1), + } + row_group_writer.close_column(col_writer)?; + + Ok(()) + } ArrowDataType::Float16 => Err(ParquetError::ArrowError( "Float16 arrays not supported".to_string(), )), ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Boolean | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::Union(_) - | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI( + | ArrowDataType::Union(_) => Err(ParquetError::NYI( "Attempting to write an Arrow type that is not yet implemented".to_string(), )), } @@ -430,7 +476,15 @@ fn get_levels( struct_levels } ArrowDataType::Union(_) => unimplemented!(), - ArrowDataType::Dictionary(_, _) => unimplemented!(), + ArrowDataType::Dictionary(_, _) => { + // Need to check for these cases not implemented in C++: + // - "Writing DictionaryArray with nested dictionary type not yet supported" + // - "Writing DictionaryArray with null encoded in dictionary type not yet supported" + vec![Levels { + definition: get_primitive_def_levels(array, parent_def_levels), + repetition: None, + }] + } } } @@ -1118,4 +1172,56 @@ mod tests { let values = Arc::new(s); one_column_roundtrip("struct_single_column", values, false); } + + #[test] + #[ignore] // Dictionary support isn't correct yet - child_data buffers don't match + fn arrow_writer_dictionary() { + // define schema + let schema = Arc::new(Schema::new(vec![Field::new_dict( + "dictionary", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + 42, + true, + )])); + + // create some data + use Int32DictionaryArray; + let d: Int32DictionaryArray = + ["alpha", "beta", "alpha"].iter().copied().collect(); + + // build a record batch + let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + + // write to parquet + let file = get_temp_file("test_arrow_writer_dictionary.parquet", &[]); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema, None).unwrap(); + writer.write(&expected_batch).unwrap(); + writer.close().unwrap(); + + // read from parquet + let reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); + let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); + + let actual_batch = record_batch_reader.next().unwrap().unwrap(); + + for i in 0..expected_batch.num_columns() { + let expected_data = expected_batch.column(i).data(); + let actual_data = actual_batch.column(i).data(); + + assert_eq!(expected_data.data_type(), actual_data.data_type()); + assert_eq!(expected_data.len(), actual_data.len()); + assert_eq!(expected_data.null_count(), actual_data.null_count()); + assert_eq!(expected_data.offset(), actual_data.offset()); + assert_eq!(expected_data.buffers(), actual_data.buffers()); + assert_eq!(expected_data.child_data(), actual_data.child_data()); + // Null counts should be the same, not necessarily bitmaps + // A null bitmap is optional if an array has no nulls + if expected_data.null_count() != 0 { + assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap()); + } + } + } } diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 1aceba2d08742..ba7e3a8c14dab 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -22,7 +22,8 @@ use arrow::{ array::{ Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder, - LargeStringBuilder, StringBuilder, TimestampNanosecondBuilder, + LargeStringBuilder, PrimitiveBuilder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, }, datatypes::Time32MillisecondType, }; @@ -34,12 +35,14 @@ use std::convert::From; use std::sync::Arc; use crate::errors::Result; -use arrow::datatypes::{ArrowPrimitiveType, DataType as ArrowDataType}; +use arrow::datatypes::{ + ArrowDictionaryKeyType, ArrowPrimitiveType, DataType as ArrowDataType, +}; use arrow::array::ArrayDataBuilder; use arrow::array::{ - BinaryArray, FixedSizeBinaryArray, LargeBinaryArray, LargeStringArray, - PrimitiveArray, StringArray, TimestampNanosecondArray, + BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, + LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, }; use std::marker::PhantomData; @@ -253,6 +256,34 @@ impl Converter>, LargeBinaryArray> for LargeBinaryArrayCon } } +pub struct DictionaryArrayConverter {} + +impl Converter>, DictionaryArray> + for DictionaryArrayConverter +{ + fn convert(&self, source: Vec>) -> Result> { + let data_size = source + .iter() + .map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0)) + .sum(); + + let keys_builder = PrimitiveBuilder::::new(source.len()); + let values_builder = StringBuilder::with_capacity(source.len(), data_size); + + let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder); + for v in source { + match v { + Some(array) => { + builder.append(array.as_utf8()?)?; + } + None => builder.append_null()?, + } + } + + Ok(builder.finish()) + } +} + pub type BoolConverter<'a> = ArrayRefConverter< &'a mut RecordReader, BooleanArray, @@ -292,6 +323,11 @@ pub type LargeBinaryConverter = ArrayRefConverter< LargeBinaryArray, LargeBinaryArrayConverter, >; +pub type DictionaryConverter = ArrayRefConverter< + Vec>, + DictionaryArray, + DictionaryArrayConverter, +>; pub type Int96Converter = ArrayRefConverter>, TimestampNanosecondArray, Int96ArrayConverter>; pub type FixedLenBinaryConverter = ArrayRefConverter< From 12e1ddad99f3079881eb235297b6eccbfa87e7ee Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Oct 2020 15:44:34 -0400 Subject: [PATCH 02/28] Change variable name from index_type to key_type --- rust/parquet/src/arrow/array_reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index a9e757792ba9d..df518ab20de42 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1488,9 +1488,9 @@ impl<'a> ArrayReaderBuilder { >::new( page_iterator, column_desc, converter )?)) - } else if let Some(ArrowType::Dictionary(index_type, _)) = arrow_type + } else if let Some(ArrowType::Dictionary(key_type, _)) = arrow_type { - match **index_type { + match **key_type { ArrowType::Int8 => { let converter = DictionaryConverter::new(DictionaryArrayConverter {}); From d55c5e99bf23cc626566cf99dd15e9407466d4d9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Oct 2020 15:45:09 -0400 Subject: [PATCH 03/28] cargo fmt --- rust/parquet/src/arrow/array_reader.rs | 3 +-- rust/parquet/src/arrow/arrow_writer.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index df518ab20de42..34682b532363e 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1488,8 +1488,7 @@ impl<'a> ArrayReaderBuilder { >::new( page_iterator, column_desc, converter )?)) - } else if let Some(ArrowType::Dictionary(key_type, _)) = arrow_type - { + } else if let Some(ArrowType::Dictionary(key_type, _)) = arrow_type { match **key_type { ArrowType::Int8 => { let converter = diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 1f4dfacb7c123..09be6374a890f 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -1191,7 +1191,8 @@ mod tests { ["alpha", "beta", "alpha"].iter().copied().collect(); // build a record batch - let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + let expected_batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); // write to parquet let file = get_temp_file("test_arrow_writer_dictionary.parquet", &[]); From bc13e3aef27365e83e1a39e2f054e31066004996 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Oct 2020 15:46:35 -0400 Subject: [PATCH 04/28] Change an unwrap to an expect --- rust/parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 09be6374a890f..e554abe6a2f47 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -206,7 +206,7 @@ fn write_leaves( // let mut col_writer = get_col_writer(&mut row_group_writer)?; - let levels = levels.pop().unwrap(); + let levels = levels.pop().expect("Levels exhausted"); use ColumnWriter::*; match (&mut col_writer, &**v) { From ae2811463434e8e09e8e916cd21f848992974f3a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Oct 2020 15:50:55 -0400 Subject: [PATCH 05/28] Add a let _ --- rust/parquet/src/arrow/converter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index ba7e3a8c14dab..21445e08ccfc1 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -274,7 +274,7 @@ impl Converter>, DictionaryArra for v in source { match v { Some(array) => { - builder.append(array.as_utf8()?)?; + let _ = builder.append(array.as_utf8()?)?; } None => builder.append_null()?, } From f2f44596523fba66fd5c2d1202556320b755172a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Oct 2020 16:02:09 -0400 Subject: [PATCH 06/28] Use roundtrip test helper function --- rust/parquet/src/arrow/arrow_writer.rs | 32 +------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index e554abe6a2f47..23150b30ec341 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -1174,7 +1174,6 @@ mod tests { } #[test] - #[ignore] // Dictionary support isn't correct yet - child_data buffers don't match fn arrow_writer_dictionary() { // define schema let schema = Arc::new(Schema::new(vec![Field::new_dict( @@ -1194,35 +1193,6 @@ mod tests { let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); - // write to parquet - let file = get_temp_file("test_arrow_writer_dictionary.parquet", &[]); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), schema, None).unwrap(); - writer.write(&expected_batch).unwrap(); - writer.close().unwrap(); - - // read from parquet - let reader = SerializedFileReader::new(file).unwrap(); - let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(reader)); - let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap(); - - let actual_batch = record_batch_reader.next().unwrap().unwrap(); - - for i in 0..expected_batch.num_columns() { - let expected_data = expected_batch.column(i).data(); - let actual_data = actual_batch.column(i).data(); - - assert_eq!(expected_data.data_type(), actual_data.data_type()); - assert_eq!(expected_data.len(), actual_data.len()); - assert_eq!(expected_data.null_count(), actual_data.null_count()); - assert_eq!(expected_data.offset(), actual_data.offset()); - assert_eq!(expected_data.buffers(), actual_data.buffers()); - assert_eq!(expected_data.child_data(), actual_data.child_data()); - // Null counts should be the same, not necessarily bitmaps - // A null bitmap is optional if an array has no nulls - if expected_data.null_count() != 0 { - assert_eq!(expected_data.null_bitmap(), actual_data.null_bitmap()); - } - } + roundtrip("test_arrow_writer_dictionary.parquet", expected_batch); } } From 30d8843371aaf65ee9dcd05e79f962ae6e9b3690 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 14:26:06 +0200 Subject: [PATCH 07/28] We need a custom comparison of ArrayData This allows us to compare padded buffers with unpaddded ones. When reading buffers from IPC, they are padded. --- rust/arrow/src/array/data.rs | 57 +++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index 9589f73caf8ed..a1426a6fb8866 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -29,7 +29,7 @@ use crate::util::bit_util; /// An generic representation of Arrow array data which encapsulates common attributes and /// operations for Arrow array. Specific operations for different arrays types (e.g., /// primitive, list, struct) are implemented in `Array`. -#[derive(PartialEq, Debug, Clone)] +#[derive(Debug, Clone)] pub struct ArrayData { /// The data type for this array data data_type: DataType, @@ -209,6 +209,61 @@ impl ArrayData { } } +impl PartialEq for ArrayData { + fn eq(&self, other: &Self) -> bool { + assert_eq!( + self.data_type(), + other.data_type(), + "Data types not the same" + ); + assert_eq!(self.len(), other.len(), "Lengths not the same"); + // TODO: when adding tests for this, test that we can compare with arrays that have offsets + assert_eq!(self.offset(), other.offset(), "Offsets not the same"); + assert_eq!(self.null_count(), other.null_count()); + // compare buffers excluding padding + let self_buffers = self.buffers(); + let other_buffers = other.buffers(); + assert_eq!(self_buffers.len(), other_buffers.len()); + self_buffers.iter().zip(other_buffers).for_each(|(s, o)| { + compare_buffer_regions( + s, + self.offset(), // TODO mul by data length + o, + other.offset(), // TODO mul by data len + ); + }); + // assert_eq!(self.buffers(), other.buffers()); + + assert_eq!(self.child_data(), other.child_data()); + // null arrays can skip the null bitmap, thus only compare if there are no nulls + if self.null_count() != 0 || other.null_count() != 0 { + compare_buffer_regions( + self.null_buffer().unwrap(), + self.offset(), + other.null_buffer().unwrap(), + other.offset(), + ) + } + true + } +} + +/// A helper to compare buffer regions of 2 buffers. +/// Compares the length of the shorter buffer. +fn compare_buffer_regions( + left: &Buffer, + left_offset: usize, + right: &Buffer, + right_offset: usize, +) { + // for convenience, we assume that the buffer lengths are only unequal if one has padding, + // so we take the shorter length so we can discard the padding from the longer length + let shorter_len = left.len().min(right.len()); + let s_sliced = left.bit_slice(left_offset, shorter_len); + let o_sliced = right.bit_slice(right_offset, shorter_len); + assert_eq!(s_sliced, o_sliced); +} + /// Builder for `ArrayData` type #[derive(Debug)] pub struct ArrayDataBuilder { From 141f0c6ff23ba8657a8000bf33677a7de343615c Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 16 Oct 2020 11:36:18 -0400 Subject: [PATCH 08/28] Improve some variable names --- rust/parquet/src/arrow/arrow_writer.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 23150b30ec341..4bb44d5edd84b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -176,10 +176,10 @@ fn write_leaves( } Ok(()) } - ArrowDataType::Dictionary(k, v) => { + ArrowDataType::Dictionary(key_type, value_type) => { // Materialize the packed dictionary and let the writer repack it let any_array = array.as_any(); - let (k2, v2) = match &**k { + let (keys, any_actual_values) = match &**key_type { ArrowDataType::Int32 => { let typed_array = any_array .downcast_ref::() @@ -190,26 +190,24 @@ fn write_leaves( o => unimplemented!("Unknown key type {:?}", o), }; - let k3 = k2; - let v3 = v2 + let actual_values = any_actual_values .as_any() .downcast_ref::() .unwrap(); // TODO: This removes NULL values; what _should_ be done? // FIXME: Don't use `as` - let materialized: Vec<_> = k3 + let materialized: Vec<_> = keys .flatten() - .map(|k| v3.value(k as usize)) + .map(|key| actual_values.value(key as usize)) .map(ByteArray::from) .collect(); - // let mut col_writer = get_col_writer(&mut row_group_writer)?; let levels = levels.pop().expect("Levels exhausted"); use ColumnWriter::*; - match (&mut col_writer, &**v) { + match (&mut col_writer, &**value_type) { (ByteArrayColumnWriter(typed), ArrowDataType::Utf8) => { typed.write_batch( &materialized, From 009da8a8f6b73248876d3efbd87d792631291e90 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 16 Oct 2020 11:45:02 -0400 Subject: [PATCH 09/28] Add a test and update comment to explain why it's ok to drop nulls --- rust/parquet/src/arrow/arrow_writer.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 4bb44d5edd84b..e7fc0edea658b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -195,7 +195,8 @@ fn write_leaves( .downcast_ref::() .unwrap(); - // TODO: This removes NULL values; what _should_ be done? + // This removes NULL values from the NullableIter, but they're encoded by the levels, + // so that's fine. // FIXME: Don't use `as` let materialized: Vec<_> = keys .flatten() @@ -1183,9 +1184,10 @@ mod tests { )])); // create some data - use Int32DictionaryArray; - let d: Int32DictionaryArray = - ["alpha", "beta", "alpha"].iter().copied().collect(); + let d: Int32DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")] + .iter() + .copied() + .collect(); // build a record batch let expected_batch = From af1fd177db485f1376471ab8aa79d135b77507d2 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Fri, 16 Oct 2020 15:23:59 -0400 Subject: [PATCH 10/28] Support all numeric dictionary key types This leaves a door open to also support dictionaries with non-string values, but that's not currently implemented. --- rust/parquet/src/arrow/arrow_writer.rs | 132 ++++++++++++++++++------- 1 file changed, 96 insertions(+), 36 deletions(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index e7fc0edea658b..0fee6c27d7869 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -25,7 +25,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::Array; use super::schema::add_encoded_arrow_schema_to_metadata; -use crate::column::writer::ColumnWriter; +use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; use crate::{ @@ -177,47 +177,38 @@ fn write_leaves( Ok(()) } ArrowDataType::Dictionary(key_type, value_type) => { - // Materialize the packed dictionary and let the writer repack it - let any_array = array.as_any(); - let (keys, any_actual_values) = match &**key_type { - ArrowDataType::Int32 => { - let typed_array = any_array - .downcast_ref::() - .expect("Unable to get dictionary array"); - - (typed_array.keys(), typed_array.values()) - } - o => unimplemented!("Unknown key type {:?}", o), + use arrow_array::{ + Int16DictionaryArray, Int32DictionaryArray, Int64DictionaryArray, + Int8DictionaryArray, StringArray, UInt16DictionaryArray, + UInt32DictionaryArray, UInt64DictionaryArray, UInt8DictionaryArray, }; + use ArrowDataType::*; + use ColumnWriter::*; - let actual_values = any_actual_values - .as_any() - .downcast_ref::() - .unwrap(); - - // This removes NULL values from the NullableIter, but they're encoded by the levels, - // so that's fine. - // FIXME: Don't use `as` - let materialized: Vec<_> = keys - .flatten() - .map(|key| actual_values.value(key as usize)) - .map(ByteArray::from) - .collect(); - + let array = &**array; let mut col_writer = get_col_writer(&mut row_group_writer)?; let levels = levels.pop().expect("Levels exhausted"); - use ColumnWriter::*; - match (&mut col_writer, &**value_type) { - (ByteArrayColumnWriter(typed), ArrowDataType::Utf8) => { - typed.write_batch( - &materialized, - Some(levels.definition.as_slice()), - levels.repetition.as_deref(), - )?; - } - o => unimplemented!("ColumnWriter not supported for {:?}", o.1), + macro_rules! dispatch_dictionary { + ($($kt: pat, $vt: pat, $w: ident => $kat: ty, $vat: ty,)*) => ( + match (&**key_type, &**value_type, &mut col_writer) { + $(($kt, $vt, $w(writer)) => write_dict::<$kat, $vat, _>(array, writer, levels),)* + (kt, vt, _) => panic!("Don't know how to write dictionary of <{:?}, {:?}>", kt, vt), + } + ); } + + dispatch_dictionary!( + Int8, Utf8, ByteArrayColumnWriter => Int8DictionaryArray, StringArray, + Int16, Utf8, ByteArrayColumnWriter => Int16DictionaryArray, StringArray, + Int32, Utf8, ByteArrayColumnWriter => Int32DictionaryArray, StringArray, + Int64, Utf8, ByteArrayColumnWriter => Int64DictionaryArray, StringArray, + UInt8, Utf8, ByteArrayColumnWriter => UInt8DictionaryArray, StringArray, + UInt16, Utf8, ByteArrayColumnWriter => UInt16DictionaryArray, StringArray, + UInt32, Utf8, ByteArrayColumnWriter => UInt32DictionaryArray, StringArray, + UInt64, Utf8, ByteArrayColumnWriter => UInt64DictionaryArray, StringArray, + )?; + row_group_writer.close_column(col_writer)?; Ok(()) @@ -234,6 +225,75 @@ fn write_leaves( } } +trait Materialize { + type Output; + + // Materialize the packed dictionary. The writer will later repack it. + fn materialize(&self) -> Vec; +} + +macro_rules! materialize_string { + ($($k:ty,)*) => { + $(impl Materialize<$k, arrow_array::StringArray> for dyn Array { + type Output = ByteArray; + + fn materialize(&self) -> Vec { + use std::convert::TryFrom; + + let typed_array = self.as_any() + .downcast_ref::<$k>() + .expect("Unable to get dictionary array"); + + let keys = typed_array.keys(); + + let value_buffer = typed_array.values(); + let values = value_buffer + .as_any() + .downcast_ref::() + .unwrap(); + + // This removes NULL values from the NullableIter, but + // they're encoded by the levels, so that's fine. + keys + .flatten() + .map(|key| usize::try_from(key).unwrap_or_else(|k| panic!("key {} does not fit in usize", k))) + .map(|key| values.value(key)) + .map(ByteArray::from) + .collect() + } + })* + }; +} + +materialize_string! { + arrow_array::Int8DictionaryArray, + arrow_array::Int16DictionaryArray, + arrow_array::Int32DictionaryArray, + arrow_array::Int64DictionaryArray, + arrow_array::UInt8DictionaryArray, + arrow_array::UInt16DictionaryArray, + arrow_array::UInt32DictionaryArray, + arrow_array::UInt64DictionaryArray, +} + +fn write_dict( + array: &(dyn Array + 'static), + writer: &mut ColumnWriterImpl, + levels: Levels, +) -> Result<()> +where + T: DataType, + dyn Array: Materialize, +{ + writer.write_batch( + &array.materialize(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )?; + + Ok(()) +} + fn write_leaf( writer: &mut ColumnWriter, column: &arrow_array::ArrayRef, From 60a38529dfed24d5ac531cf495f29d2d81236faf Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 15:30:08 -0400 Subject: [PATCH 11/28] Serialize unsigned int dictionary index types As the C++ implementation was updated to do in b1a7a73ff2, and as supported by the unsigned integer types that implement ArrowDictionaryKeyType. --- rust/arrow/src/ipc/convert.rs | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index a02b6c44dd999..63d55f043c6e9 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -641,17 +641,23 @@ pub(crate) fn get_fb_dictionary<'a: 'b, 'b>( fbb: &mut FlatBufferBuilder<'a>, ) -> WIPOffset> { // We assume that the dictionary index type (as an integer) has already been - // validated elsewhere, and can safely assume we are dealing with signed - // integers + // validated elsewhere, and can safely assume we are dealing with integers let mut index_builder = ipc::IntBuilder::new(fbb); - index_builder.add_is_signed(true); + match *index_type { - Int8 => index_builder.add_bitWidth(8), - Int16 => index_builder.add_bitWidth(16), - Int32 => index_builder.add_bitWidth(32), - Int64 => index_builder.add_bitWidth(64), + Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true), + UInt8 | UInt16 | UInt32 | UInt64 => index_builder.add_is_signed(false), _ => {} } + + match *index_type { + Int8 | UInt8 => index_builder.add_bitWidth(8), + Int16 | UInt16 => index_builder.add_bitWidth(16), + Int32 | UInt32 => index_builder.add_bitWidth(32), + Int64 | UInt64 => index_builder.add_bitWidth(64), + _ => {} + } + let index_builder = index_builder.finish(); let mut builder = ipc::DictionaryEncodingBuilder::new(fbb); @@ -773,6 +779,16 @@ mod tests { 123, true, ), + Field::new_dict( + "dictionary", + DataType::Dictionary( + Box::new(DataType::UInt8), + Box::new(DataType::UInt32), + ), + true, + 123, + true, + ), ], md, ); From 8f621d0649dec5a7f9f5451150776c5863c23817 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 16:36:52 -0400 Subject: [PATCH 12/28] Add a failing test for string dictionary indexed by an unsinged int --- rust/parquet/src/arrow/arrow_writer.rs | 27 ++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 0fee6c27d7869..d9f71dd4fc7b8 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -1255,4 +1255,31 @@ mod tests { roundtrip("test_arrow_writer_dictionary.parquet", expected_batch); } + + #[test] + fn arrow_writer_string_dictionary_unsigned_index() { + // define schema + let schema = Arc::new(Schema::new(vec![Field::new_dict( + "dictionary", + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), + true, + 42, + true, + )])); + + // create some data + let d: UInt8DictionaryArray = [Some("alpha"), None, Some("beta"), Some("alpha")] + .iter() + .copied() + .collect(); + + // build a record batch + let expected_batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + + roundtrip( + "test_arrow_writer_string_dictionary_unsigned_index.parquet", + expected_batch, + ); + } } From be62e4a7061f208b619a541abbd32865654dc4e2 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 16:37:09 -0400 Subject: [PATCH 13/28] Extract a method for converting dictionaries --- rust/parquet/src/arrow/array_reader.rs | 113 +++++++++++++------------ 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 34682b532363e..986c885a8384d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1489,58 +1489,11 @@ impl<'a> ArrayReaderBuilder { page_iterator, column_desc, converter )?)) } else if let Some(ArrowType::Dictionary(key_type, _)) = arrow_type { - match **key_type { - ArrowType::Int8 => { - let converter = - DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int16 => { - let converter = - DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int32 => { - let converter = - DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int64 => { - let converter = - DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ref other => { - return Err(general_err!( - "Invalid/Unsupported index type for dictionary: {:?}", - other - )) - } - } + self.build_for_string_dictionary_type_inner( + &*key_type, + page_iterator, + column_desc, + ) } else { let converter = Utf8Converter::new(Utf8ArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< @@ -1593,6 +1546,62 @@ impl<'a> ArrayReaderBuilder { } } + fn build_for_string_dictionary_type_inner( + &self, + key_type: &ArrowType, + page_iterator: Box, + column_desc: ColumnDescPtr, + ) -> Result> { + match key_type { + ArrowType::Int8 => { + let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int16 => { + let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int32 => { + let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ArrowType::Int64 => { + let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter, + >::new( + page_iterator, column_desc, converter + )?)) + } + ref other => { + return Err(general_err!( + "Invalid/Unsupported index type for dictionary: {:?}", + other + )) + } + } + } + /// Constructs struct array reader without considering repetition. fn build_for_struct_type_inner( &mut self, From 5f330b2713fba9800eca1998200b6da8d929f408 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 16:48:38 -0400 Subject: [PATCH 14/28] Extract a macro for string dictionary conversion --- rust/parquet/src/arrow/array_reader.rs | 72 ++++++++++---------------- 1 file changed, 27 insertions(+), 45 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 986c885a8384d..dee4641b7c05b 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1552,54 +1552,36 @@ impl<'a> ArrayReaderBuilder { page_iterator: Box, column_desc: ColumnDescPtr, ) -> Result> { - match key_type { - ArrowType::Int8 => { - let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + macro_rules! convert_string_dictionary { + ($(($kt: pat, $at: ident),)*) => ( + match key_type { + $($kt => { + let converter = DictionaryConverter::new(DictionaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int16 => { - let converter = DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int32 => { - let converter = DictionaryConverter::new(DictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ArrowType::Int64 => { - let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + DictionaryConverter<$at>, + >::new( + page_iterator, column_desc, converter + )?)) - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DictionaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } - ref other => { - return Err(general_err!( - "Invalid/Unsupported index type for dictionary: {:?}", - other - )) - } + })* + ref other => { + return Err(general_err!( + "Invalid/Unsupported index type for dictionary: {:?}", + other + )) + } + } + ); } + + convert_string_dictionary!( + (ArrowType::Int8, ArrowInt8Type), + (ArrowType::Int16, ArrowInt16Type), + (ArrowType::Int32, ArrowInt32Type), + (ArrowType::Int64, ArrowInt64Type), + ) } /// Constructs struct array reader without considering repetition. From 45600e650155e52c7f266a20bd4ce9241be47c01 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 16:54:11 -0400 Subject: [PATCH 15/28] Convert string dictionaries indexed by unsigned integers too --- rust/parquet/src/arrow/array_reader.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index dee4641b7c05b..b919badef70c7 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1581,6 +1581,10 @@ impl<'a> ArrayReaderBuilder { (ArrowType::Int16, ArrowInt16Type), (ArrowType::Int32, ArrowInt32Type), (ArrowType::Int64, ArrowInt64Type), + (ArrowType::UInt8, ArrowUInt8Type), + (ArrowType::UInt16, ArrowUInt16Type), + (ArrowType::UInt32, ArrowUInt32Type), + (ArrowType::UInt64, ArrowUInt64Type), ) } From 4cde14eb1d88c680790cb9d9b83a1961107492ac Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 19 Oct 2020 11:30:53 -0400 Subject: [PATCH 16/28] Convert one kind of primitive dictionary --- rust/parquet/src/arrow/array_reader.rs | 261 ++++++++++++++++--------- rust/parquet/src/arrow/arrow_writer.rs | 93 ++++++++- rust/parquet/src/arrow/converter.rs | 84 +++++++- 3 files changed, 334 insertions(+), 104 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index b919badef70c7..e951ab79df14d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -57,14 +57,15 @@ use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, - Converter, Date32Converter, DictionaryArrayConverter, DictionaryConverter, - FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, Float64Converter, - Int16Converter, Int32Converter, Int64Converter, Int8Converter, Int96ArrayConverter, - Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, - LargeUtf8ArrayConverter, LargeUtf8Converter, Time32MillisecondConverter, - Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter, - TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter, - UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter, + Converter, Date32Converter, DictionaryArrayConverter, FixedLenBinaryConverter, + FixedSizeArrayConverter, Float32Converter, Float64Converter, Int16Converter, + Int32Converter, Int64Converter, Int8Converter, Int96ArrayConverter, Int96Converter, + LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, + LargeUtf8Converter, StringDictionaryArrayConverter, StringDictionaryConverter, + Time32MillisecondConverter, Time32SecondConverter, Time64MicrosecondConverter, + Time64NanosecondConverter, TimestampMicrosecondConverter, + TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, + UInt8Converter, Utf8ArrayConverter, Utf8Converter, PrimitiveDictionaryConverter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -1439,110 +1440,184 @@ impl<'a> ArrayReaderBuilder { .ok() .map(|f| f.data_type()); - match cur_type.get_physical_type() { - PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)), - PhysicalType::INT32 => { - if let Some(ArrowType::Null) = arrow_type { - Ok(Box::new(NullArrayReader::::new( - page_iterator, - column_desc, - )?)) - } else { - Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)) + if let Some(ArrowType::Dictionary(key_type, value_type)) = arrow_type { + match cur_type.get_physical_type() { + PhysicalType::BYTE_ARRAY => { + let logical_type = cur_type.get_basic_info().logical_type(); + if logical_type == LogicalType::UTF8 { + self.build_for_string_dictionary_type_inner( + &*key_type, + page_iterator, + column_desc, + ) + } else { + panic!("logical type not handled yet: {:?}", logical_type); + } } + PhysicalType::INT32 => { + if let ArrowType::UInt8 = **key_type { + if let ArrowType::UInt32 = **value_type { + let converter = + PrimitiveDictionaryConverter::::new( + DictionaryArrayConverter::new(), + ); + return Ok(Box::new( + ComplexObjectArrayReader::::new( + page_iterator, + column_desc, + converter, + )?, + )); + } else if let ArrowType::Int32 = **value_type { + let converter = + PrimitiveDictionaryConverter::::new( + DictionaryArrayConverter::new(), + ); + return Ok(Box::new( + ComplexObjectArrayReader::::new( + page_iterator, + column_desc, + converter, + )?, + )); + } else { + panic!("byeagain"); + } + } else if let ArrowType::UInt16 = **key_type { + + if let ArrowType::UInt32 = **value_type { + let converter = + PrimitiveDictionaryConverter::::new( + DictionaryArrayConverter::new(), + ); + return Ok(Box::new( + ComplexObjectArrayReader::::new( + page_iterator, + column_desc, + converter, + )?, + )); + } else if let ArrowType::Int32 = **value_type { + let converter = + PrimitiveDictionaryConverter::::new( + DictionaryArrayConverter::new(), + ); + return Ok(Box::new( + ComplexObjectArrayReader::::new( + page_iterator, + column_desc, + converter, + )?, + )); + } else { + panic!("byeagain"); + } + } else { + panic!("bye"); + } + unimplemented!(); + } + other => panic!("physical type not handled yet: {:?}", other), } - PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)), - PhysicalType::INT96 => { - let converter = Int96Converter::new(Int96ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - Int96Type, - Int96Converter, - >::new( - page_iterator, column_desc, converter - )?)) - } - PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)), - PhysicalType::DOUBLE => Ok(Box::new( - PrimitiveArrayReader::::new(page_iterator, column_desc)?, - )), - PhysicalType::BYTE_ARRAY => { - if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 { - if let Some(ArrowType::LargeUtf8) = arrow_type { + } else { + match cur_type.get_physical_type() { + PhysicalType::BOOLEAN => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::INT32 => { + if let Some(ArrowType::Null) = arrow_type { + Ok(Box::new(NullArrayReader::::new( + page_iterator, + column_desc, + )?)) + } else { + Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)) + } + } + PhysicalType::INT64 => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::INT96 => { + let converter = Int96Converter::new(Int96ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + Int96Type, + Int96Converter, + >::new( + page_iterator, column_desc, converter + )?)) + } + PhysicalType::FLOAT => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::DOUBLE => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::BYTE_ARRAY => { + if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 { + if let Some(ArrowType::LargeUtf8) = arrow_type { + let converter = + LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + LargeUtf8Converter, + >::new( + page_iterator, column_desc, converter + )?)) + } else if let Some(ArrowType::Dictionary(_, _)) = arrow_type { + unreachable!(); + } else { + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + Utf8Converter, + >::new( + page_iterator, column_desc, converter + )?)) + } + } else if let Some(ArrowType::LargeBinary) = arrow_type { let converter = - LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); + LargeBinaryConverter::new(LargeBinaryArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< ByteArrayType, - LargeUtf8Converter, + LargeBinaryConverter, >::new( page_iterator, column_desc, converter )?)) - } else if let Some(ArrowType::Dictionary(key_type, _)) = arrow_type { - self.build_for_string_dictionary_type_inner( - &*key_type, - page_iterator, - column_desc, - ) } else { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let converter = BinaryConverter::new(BinaryArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< ByteArrayType, - Utf8Converter, + BinaryConverter, >::new( page_iterator, column_desc, converter )?)) } - } else if let Some(ArrowType::LargeBinary) = arrow_type { - let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeBinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); + } + PhysicalType::FIXED_LEN_BYTE_ARRAY => { + let byte_width = match *cur_type { + Type::PrimitiveType { + ref type_length, .. + } => *type_length, + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), + )) + } + }; + let converter = FixedLenBinaryConverter::new( + FixedSizeArrayConverter::new(byte_width), + ); Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, + FixedLenByteArrayType, + FixedLenBinaryConverter, >::new( page_iterator, column_desc, converter )?)) } } - PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let byte_width = match *cur_type { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) - } - }; - let converter = FixedLenBinaryConverter::new( - FixedSizeArrayConverter::new(byte_width), - ); - Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - FixedLenBinaryConverter, - >::new( - page_iterator, column_desc, converter - )?)) - } } } @@ -1556,11 +1631,11 @@ impl<'a> ArrayReaderBuilder { ($(($kt: pat, $at: ident),)*) => ( match key_type { $($kt => { - let converter = DictionaryConverter::new(DictionaryArrayConverter {}); + let converter = StringDictionaryConverter::new(StringDictionaryArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< ByteArrayType, - DictionaryConverter<$at>, + StringDictionaryConverter<$at>, >::new( page_iterator, column_desc, converter )?)) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index d9f71dd4fc7b8..8f64b64558b98 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -179,7 +179,7 @@ fn write_leaves( ArrowDataType::Dictionary(key_type, value_type) => { use arrow_array::{ Int16DictionaryArray, Int32DictionaryArray, Int64DictionaryArray, - Int8DictionaryArray, StringArray, UInt16DictionaryArray, + Int8DictionaryArray, PrimitiveArray, StringArray, UInt16DictionaryArray, UInt32DictionaryArray, UInt64DictionaryArray, UInt8DictionaryArray, }; use ArrowDataType::*; @@ -198,6 +198,57 @@ fn write_leaves( ); } + match (&**key_type, &**value_type, &mut col_writer) { + (UInt8, UInt32, Int32ColumnWriter(writer)) => { + let typed_array = array + .as_any() + .downcast_ref::() + .expect("Unable to get dictionary array"); + + let keys = typed_array.keys(); + + let value_buffer = typed_array.values(); + let value_array = + arrow::compute::cast(&value_buffer, &ArrowDataType::Int32)?; + + let values = value_array + .as_any() + .downcast_ref::() + .unwrap(); + + use std::convert::TryFrom; + // This removes NULL values from the NullableIter, but + // they're encoded by the levels, so that's fine. + let materialized_values: Vec<_> = keys + .flatten() + .map(|key| { + usize::try_from(key).unwrap_or_else(|k| { + panic!("key {} does not fit in usize", k) + }) + }) + .map(|key| values.value(key)) + .collect(); + + let materialized_primitive_array = + PrimitiveArray::::from( + materialized_values, + ); + + writer.write_batch( + get_numeric_array_slice::( + &materialized_primitive_array, + ) + .as_slice(), + Some(levels.definition.as_slice()), + levels.repetition.as_deref(), + )?; + row_group_writer.close_column(col_writer)?; + + return Ok(()); + } + _ => {} + } + dispatch_dictionary!( Int8, Utf8, ByteArrayColumnWriter => Int8DictionaryArray, StringArray, Int16, Utf8, ByteArrayColumnWriter => Int16DictionaryArray, StringArray, @@ -614,7 +665,7 @@ mod tests { use arrow::array::*; use arrow::datatypes::ToByteSlice; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type}; use arrow::record_batch::RecordBatch; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -1233,7 +1284,7 @@ mod tests { } #[test] - fn arrow_writer_dictionary() { + fn arrow_writer_string_dictionary() { // define schema let schema = Arc::new(Schema::new(vec![Field::new_dict( "dictionary", @@ -1253,7 +1304,41 @@ mod tests { let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); - roundtrip("test_arrow_writer_dictionary.parquet", expected_batch); + roundtrip( + "test_arrow_writer_string_dictionary.parquet", + expected_batch, + ); + } + + #[test] + fn arrow_writer_primitive_dictionary() { + // define schema + let schema = Arc::new(Schema::new(vec![Field::new_dict( + "dictionary", + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)), + true, + 42, + true, + )])); + + // create some data + let key_builder = PrimitiveBuilder::::new(3); + let value_builder = PrimitiveBuilder::::new(2); + let mut builder = PrimitiveDictionaryBuilder::new(key_builder, value_builder); + builder.append(12345678).unwrap(); + builder.append_null().unwrap(); + builder.append(22345678).unwrap(); + builder.append(12345678).unwrap(); + let d = builder.finish(); + + // build a record batch + let expected_batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + + roundtrip( + "test_arrow_writer_primitive_dictionary.parquet", + expected_batch, + ); } #[test] diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 21445e08ccfc1..e17771aef3d79 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -22,8 +22,8 @@ use arrow::{ array::{ Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder, - LargeStringBuilder, PrimitiveBuilder, StringBuilder, StringDictionaryBuilder, - TimestampNanosecondBuilder, + LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, + StringDictionaryBuilder, TimestampNanosecondBuilder, }, datatypes::Time32MillisecondType, }; @@ -42,7 +42,8 @@ use arrow::datatypes::{ use arrow::array::ArrayDataBuilder; use arrow::array::{ BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, - LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, + LargeStringArray, PrimitiveArray, PrimitiveArrayOps, StringArray, + TimestampNanosecondArray, }; use std::marker::PhantomData; @@ -256,10 +257,10 @@ impl Converter>, LargeBinaryArray> for LargeBinaryArrayCon } } -pub struct DictionaryArrayConverter {} +pub struct StringDictionaryArrayConverter {} impl Converter>, DictionaryArray> - for DictionaryArrayConverter + for StringDictionaryArrayConverter { fn convert(&self, source: Vec>) -> Result> { let data_size = source @@ -284,6 +285,64 @@ impl Converter>, DictionaryArra } } +pub struct DictionaryArrayConverter +{ + _dict_value_source_marker: PhantomData, + _dict_value_target_marker: PhantomData, + _parquet_marker: PhantomData, +} + +impl + DictionaryArrayConverter +{ + pub fn new() -> Self { + Self { + _dict_value_source_marker: PhantomData, + _dict_value_target_marker: PhantomData, + _parquet_marker: PhantomData, + } + } +} + +impl + Converter::T>>, DictionaryArray> + for DictionaryArrayConverter +where + K: ArrowPrimitiveType, + DictValueSourceType: ArrowPrimitiveType, + DictValueTargetType: ArrowPrimitiveType, + ParquetType: DataType, + PrimitiveArray: From::T>>>, +{ + fn convert( + &self, + source: Vec::T>>, + ) -> Result> { + let keys_builder = PrimitiveBuilder::::new(source.len()); + let values_builder = PrimitiveBuilder::::new(source.len()); + + let mut builder = PrimitiveDictionaryBuilder::new(keys_builder, values_builder); + + let source_array: Arc = + Arc::new(PrimitiveArray::::from(source)); + let target_array = cast(&source_array, &DictValueTargetType::get_data_type())?; + let target = target_array + .as_any() + .downcast_ref::>() + .unwrap(); + + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + let _ = builder.append(target.value(i))?; + } + } + + Ok(builder.finish()) + } +} + pub type BoolConverter<'a> = ArrayRefConverter< &'a mut RecordReader, BooleanArray, @@ -323,11 +382,22 @@ pub type LargeBinaryConverter = ArrayRefConverter< LargeBinaryArray, LargeBinaryArrayConverter, >; -pub type DictionaryConverter = ArrayRefConverter< +pub type StringDictionaryConverter = ArrayRefConverter< Vec>, DictionaryArray, - DictionaryArrayConverter, + StringDictionaryArrayConverter, +>; +pub type DictionaryConverter = ArrayRefConverter< + Vec::T>>, + DictionaryArray, + DictionaryArrayConverter, >; +pub type PrimitiveDictionaryConverter = ArrayRefConverter< + Vec::T>>, + DictionaryArray, + DictionaryArrayConverter, +>; + pub type Int96Converter = ArrayRefConverter>, TimestampNanosecondArray, Int96ArrayConverter>; pub type FixedLenBinaryConverter = ArrayRefConverter< From e45265c8192f800e3fd5453641edc6cb351fbeb4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 12:33:27 -0400 Subject: [PATCH 17/28] Update based on rebase --- rust/parquet/src/arrow/converter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index e17771aef3d79..076d71afaa250 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -42,7 +42,7 @@ use arrow::datatypes::{ use arrow::array::ArrayDataBuilder; use arrow::array::{ BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, - LargeStringArray, PrimitiveArray, PrimitiveArrayOps, StringArray, + LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, }; use std::marker::PhantomData; @@ -325,7 +325,7 @@ where let source_array: Arc = Arc::new(PrimitiveArray::::from(source)); - let target_array = cast(&source_array, &DictValueTargetType::get_data_type())?; + let target_array = cast(&source_array, &DictValueTargetType::DATA_TYPE)?; let target = target_array .as_any() .downcast_ref::>() From 3d27a0e1716f1edccc4bc07ebbeda4217329f7eb Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 12:33:42 -0400 Subject: [PATCH 18/28] cargo fmt --- rust/parquet/src/arrow/array_reader.rs | 49 +++++++++++++++----------- rust/parquet/src/arrow/converter.rs | 3 +- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index e951ab79df14d..13e5273e5a6c3 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -61,11 +61,11 @@ use crate::arrow::converter::{ FixedSizeArrayConverter, Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter, Int8Converter, Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, - LargeUtf8Converter, StringDictionaryArrayConverter, StringDictionaryConverter, - Time32MillisecondConverter, Time32SecondConverter, Time64MicrosecondConverter, - Time64NanosecondConverter, TimestampMicrosecondConverter, + LargeUtf8Converter, PrimitiveDictionaryConverter, StringDictionaryArrayConverter, + StringDictionaryConverter, Time32MillisecondConverter, Time32SecondConverter, + Time64MicrosecondConverter, Time64NanosecondConverter, TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, - UInt8Converter, Utf8ArrayConverter, Utf8Converter, PrimitiveDictionaryConverter, + UInt8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -1457,10 +1457,12 @@ impl<'a> ArrayReaderBuilder { PhysicalType::INT32 => { if let ArrowType::UInt8 = **key_type { if let ArrowType::UInt32 = **value_type { - let converter = - PrimitiveDictionaryConverter::::new( - DictionaryArrayConverter::new(), - ); + let converter = PrimitiveDictionaryConverter::< + ArrowUInt8Type, + ArrowUInt32Type, + >::new( + DictionaryArrayConverter::new() + ); return Ok(Box::new( ComplexObjectArrayReader::::new( page_iterator, @@ -1469,10 +1471,12 @@ impl<'a> ArrayReaderBuilder { )?, )); } else if let ArrowType::Int32 = **value_type { - let converter = - PrimitiveDictionaryConverter::::new( - DictionaryArrayConverter::new(), - ); + let converter = PrimitiveDictionaryConverter::< + ArrowUInt8Type, + ArrowInt32Type, + >::new( + DictionaryArrayConverter::new() + ); return Ok(Box::new( ComplexObjectArrayReader::::new( page_iterator, @@ -1484,12 +1488,13 @@ impl<'a> ArrayReaderBuilder { panic!("byeagain"); } } else if let ArrowType::UInt16 = **key_type { - if let ArrowType::UInt32 = **value_type { - let converter = - PrimitiveDictionaryConverter::::new( - DictionaryArrayConverter::new(), - ); + let converter = PrimitiveDictionaryConverter::< + ArrowUInt16Type, + ArrowUInt32Type, + >::new( + DictionaryArrayConverter::new() + ); return Ok(Box::new( ComplexObjectArrayReader::::new( page_iterator, @@ -1498,10 +1503,12 @@ impl<'a> ArrayReaderBuilder { )?, )); } else if let ArrowType::Int32 = **value_type { - let converter = - PrimitiveDictionaryConverter::::new( - DictionaryArrayConverter::new(), - ); + let converter = PrimitiveDictionaryConverter::< + ArrowUInt16Type, + ArrowInt32Type, + >::new( + DictionaryArrayConverter::new() + ); return Ok(Box::new( ComplexObjectArrayReader::::new( page_iterator, diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 076d71afaa250..4783616cd94de 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -42,8 +42,7 @@ use arrow::datatypes::{ use arrow::array::ArrayDataBuilder; use arrow::array::{ BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, - LargeStringArray, PrimitiveArray, StringArray, - TimestampNanosecondArray, + LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, }; use std::marker::PhantomData; From f2f94fd8088a254eabf3d059578d4a7afba6cff2 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 25 Oct 2020 10:11:02 +0200 Subject: [PATCH 19/28] Complete dictionary support --- rust/parquet/src/arrow/array_reader.rs | 478 +++++++++---------------- rust/parquet/src/arrow/arrow_writer.rs | 9 +- rust/parquet/src/arrow/converter.rs | 8 +- 3 files changed, 181 insertions(+), 314 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 13e5273e5a6c3..3959d9668f832 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -34,19 +34,19 @@ use arrow::array::{ use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ BooleanType as ArrowBooleanType, DataType as ArrowType, - Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DateUnit, + Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DurationMicrosecondType as ArrowDurationMicrosecondType, DurationMillisecondType as ArrowDurationMillisecondType, DurationNanosecondType as ArrowDurationNanosecondType, DurationSecondType as ArrowDurationSecondType, Field, Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, - Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, Schema, + Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, Schema, Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, - Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit, - TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as ArrowTimestampMicrosecondType, + Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, + TimestampMicrosecondType as ArrowTimestampMicrosecondType, TimestampMillisecondType as ArrowTimestampMillisecondType, TimestampNanosecondType as ArrowTimestampNanosecondType, TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, @@ -57,15 +57,10 @@ use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, - Converter, Date32Converter, DictionaryArrayConverter, FixedLenBinaryConverter, - FixedSizeArrayConverter, Float32Converter, Float64Converter, Int16Converter, - Int32Converter, Int64Converter, Int8Converter, Int96ArrayConverter, Int96Converter, - LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, - LargeUtf8Converter, PrimitiveDictionaryConverter, StringDictionaryArrayConverter, - StringDictionaryConverter, Time32MillisecondConverter, Time32SecondConverter, - Time64MicrosecondConverter, Time64NanosecondConverter, TimestampMicrosecondConverter, - TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter, - UInt8Converter, Utf8ArrayConverter, Utf8Converter, + Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, + Float64Converter, Int32Converter, Int64Converter, Int96ArrayConverter, + Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, + LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -213,10 +208,15 @@ impl PrimitiveArrayReader { pub fn new( mut pages: Box, column_desc: ColumnDescPtr, + arrow_type: Option, ) -> Result { - let data_type = parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(); + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; let mut record_reader = RecordReader::::new(column_desc.clone()); if let Some(page_reader) = pages.next() { @@ -268,91 +268,40 @@ impl ArrayReader for PrimitiveArrayReader { } } - // convert to arrays + // Convert to arrays by using the Parquet phyisical type. + // The physical types are then cast to Arrow types if necessary let array = - match (&self.data_type, T::get_physical_type()) { - (ArrowType::Boolean, PhysicalType::BOOLEAN) => { - BoolConverter::new(BooleanArrayConverter {}) - .convert(self.record_reader.cast::()) - } - (ArrowType::Int8, PhysicalType::INT32) => { - Int8Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::Int16, PhysicalType::INT32) => { - Int16Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::Int32, PhysicalType::INT32) => { + match T::get_physical_type() { + PhysicalType::BOOLEAN => BoolConverter::new(BooleanArrayConverter {}) + .convert(self.record_reader.cast::()), + PhysicalType::INT32 => { + // TODO: the cast is a no-op, but we should remove it Int32Converter::new().convert(self.record_reader.cast::()) } - (ArrowType::UInt8, PhysicalType::INT32) => { - UInt8Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::UInt16, PhysicalType::INT32) => { - UInt16Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::UInt32, PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::Int64, PhysicalType::INT64) => { + PhysicalType::INT64 => { Int64Converter::new().convert(self.record_reader.cast::()) } - (ArrowType::UInt64, PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::Float32, PhysicalType::FLOAT) => Float32Converter::new() + PhysicalType::FLOAT => Float32Converter::new() .convert(self.record_reader.cast::()), - (ArrowType::Float64, PhysicalType::DOUBLE) => Float64Converter::new() + PhysicalType::DOUBLE => Float64Converter::new() .convert(self.record_reader.cast::()), - (ArrowType::Timestamp(unit, _), PhysicalType::INT64) => match unit { - TimeUnit::Millisecond => TimestampMillisecondConverter::new() - .convert(self.record_reader.cast::()), - TimeUnit::Microsecond => TimestampMicrosecondConverter::new() - .convert(self.record_reader.cast::()), - _ => Err(general_err!("No conversion from parquet type to arrow type for timestamp with unit {:?}", unit)), - }, - (ArrowType::Date32(unit), PhysicalType::INT32) => match unit { - DateUnit::Day => Date32Converter::new() - .convert(self.record_reader.cast::()), - _ => Err(general_err!("No conversion from parquet type to arrow type for date with unit {:?}", unit)), - } - (ArrowType::Time32(unit), PhysicalType::INT32) => { - match unit { - TimeUnit::Second => { - Time32SecondConverter::new().convert(self.record_reader.cast::()) - } - TimeUnit::Millisecond => { - Time32MillisecondConverter::new().convert(self.record_reader.cast::()) - } - _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) - } - } - (ArrowType::Time64(unit), PhysicalType::INT64) => { - match unit { - TimeUnit::Microsecond => { - Time64MicrosecondConverter::new().convert(self.record_reader.cast::()) - } - TimeUnit::Nanosecond => { - Time64NanosecondConverter::new().convert(self.record_reader.cast::()) - } - _ => Err(general_err!("Invalid or unsupported arrow array with datatype {:?}", self.get_data_type())) - } - } - (ArrowType::Interval(IntervalUnit::YearMonth), PhysicalType::INT32) => { - UInt32Converter::new().convert(self.record_reader.cast::()) - } - (ArrowType::Interval(IntervalUnit::DayTime), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::()) + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + // TODO: we could use unreachable!() as this is a private fn + Err(general_err!( + "Cannot read primitive array with a complex physical type" + )) } - (ArrowType::Duration(_), PhysicalType::INT64) => { - UInt64Converter::new().convert(self.record_reader.cast::()) - } - (arrow_type, physical_type) => Err(general_err!( - "Reading {:?} type from parquet {:?} is not supported yet.", - arrow_type, - physical_type - )), }?; + // cast to Arrow type + // TODO: we need to check if it's fine for this to be fallible. + // My assumption is that we can't get to an illegal cast as we can only + // generate types that are supported, because we'd have gotten them from + // the metadata which was written to the Parquet sink + let array = arrow::compute::cast(&array, self.get_data_type())?; + // save definition and repetition buffers self.def_levels_buffer = self.record_reader.consume_def_levels()?; self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; @@ -504,7 +453,13 @@ where data_buffer.into_iter().map(Some).collect() }; - self.converter.convert(data) + // TODO: I did this quickly without thinking through it, there might be edge cases to consider + let array = self.converter.convert(data)?; + + Ok(match self.data_type { + ArrowType::Dictionary(_, _) => arrow::compute::cast(&array, &self.data_type)?, + _ => array, + }) } fn get_def_levels(&self) -> Option<&[i16]> { @@ -525,10 +480,14 @@ where pages: Box, column_desc: ColumnDescPtr, converter: C, + arrow_type: Option, ) -> Result { - let data_type = parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(); + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; Ok(Self { data_type, @@ -1438,236 +1397,134 @@ impl<'a> ArrayReaderBuilder { .arrow_schema .field_with_name(cur_type.name()) .ok() - .map(|f| f.data_type()); - - if let Some(ArrowType::Dictionary(key_type, value_type)) = arrow_type { - match cur_type.get_physical_type() { - PhysicalType::BYTE_ARRAY => { - let logical_type = cur_type.get_basic_info().logical_type(); - if logical_type == LogicalType::UTF8 { - self.build_for_string_dictionary_type_inner( - &*key_type, - page_iterator, - column_desc, - ) - } else { - panic!("logical type not handled yet: {:?}", logical_type); - } - } - PhysicalType::INT32 => { - if let ArrowType::UInt8 = **key_type { - if let ArrowType::UInt32 = **value_type { - let converter = PrimitiveDictionaryConverter::< - ArrowUInt8Type, - ArrowUInt32Type, - >::new( - DictionaryArrayConverter::new() - ); - return Ok(Box::new( - ComplexObjectArrayReader::::new( - page_iterator, - column_desc, - converter, - )?, - )); - } else if let ArrowType::Int32 = **value_type { - let converter = PrimitiveDictionaryConverter::< - ArrowUInt8Type, - ArrowInt32Type, - >::new( - DictionaryArrayConverter::new() - ); - return Ok(Box::new( - ComplexObjectArrayReader::::new( - page_iterator, - column_desc, - converter, - )?, - )); - } else { - panic!("byeagain"); - } - } else if let ArrowType::UInt16 = **key_type { - if let ArrowType::UInt32 = **value_type { - let converter = PrimitiveDictionaryConverter::< - ArrowUInt16Type, - ArrowUInt32Type, - >::new( - DictionaryArrayConverter::new() - ); - return Ok(Box::new( - ComplexObjectArrayReader::::new( - page_iterator, - column_desc, - converter, - )?, - )); - } else if let ArrowType::Int32 = **value_type { - let converter = PrimitiveDictionaryConverter::< - ArrowUInt16Type, - ArrowInt32Type, - >::new( - DictionaryArrayConverter::new() - ); - return Ok(Box::new( - ComplexObjectArrayReader::::new( - page_iterator, - column_desc, - converter, - )?, - )); - } else { - panic!("byeagain"); - } - } else { - panic!("bye"); - } - unimplemented!(); - } - other => panic!("physical type not handled yet: {:?}", other), - } - } else { - match cur_type.get_physical_type() { - PhysicalType::BOOLEAN => Ok(Box::new( - PrimitiveArrayReader::::new(page_iterator, column_desc)?, - )), - PhysicalType::INT32 => { - if let Some(ArrowType::Null) = arrow_type { - Ok(Box::new(NullArrayReader::::new( - page_iterator, - column_desc, - )?)) - } else { - Ok(Box::new(PrimitiveArrayReader::::new( - page_iterator, - column_desc, - )?)) - } - } - PhysicalType::INT64 => Ok(Box::new( - PrimitiveArrayReader::::new(page_iterator, column_desc)?, - )), - PhysicalType::INT96 => { - let converter = Int96Converter::new(Int96ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - Int96Type, - Int96Converter, - >::new( - page_iterator, column_desc, converter + .map(|f| f.data_type()) + .cloned(); + + match cur_type.get_physical_type() { + PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?)), + PhysicalType::INT32 => { + if let Some(ArrowType::Null) = arrow_type { + Ok(Box::new(NullArrayReader::::new( + page_iterator, + column_desc, + )?)) + } else { + Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, )?)) } - PhysicalType::FLOAT => Ok(Box::new( - PrimitiveArrayReader::::new(page_iterator, column_desc)?, - )), - PhysicalType::DOUBLE => Ok(Box::new( - PrimitiveArrayReader::::new(page_iterator, column_desc)?, - )), - PhysicalType::BYTE_ARRAY => { - if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 { - if let Some(ArrowType::LargeUtf8) = arrow_type { - let converter = - LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeUtf8Converter, - >::new( - page_iterator, column_desc, converter - )?)) - } else if let Some(ArrowType::Dictionary(_, _)) = arrow_type { - unreachable!(); - } else { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, column_desc, converter - )?)) - } - } else if let Some(ArrowType::LargeBinary) = arrow_type { + } + PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?)), + PhysicalType::INT96 => { + let converter = Int96Converter::new(Int96ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + Int96Type, + Int96Converter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?)), + PhysicalType::DOUBLE => { + Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + arrow_type, + )?)) + } + PhysicalType::BYTE_ARRAY => { + if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 { + if let Some(ArrowType::LargeUtf8) = arrow_type { let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); + LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< ByteArrayType, - LargeBinaryConverter, + LargeUtf8Converter, >::new( - page_iterator, column_desc, converter + page_iterator, + column_desc, + converter, + arrow_type, )?)) } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); + let converter = Utf8Converter::new(Utf8ArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< ByteArrayType, - BinaryConverter, + Utf8Converter, >::new( - page_iterator, column_desc, converter + page_iterator, + column_desc, + converter, + arrow_type, )?)) } - } - PhysicalType::FIXED_LEN_BYTE_ARRAY => { - let byte_width = match *cur_type { - Type::PrimitiveType { - ref type_length, .. - } => *type_length, - _ => { - return Err(ArrowError( - "Expected a physical type, not a group type".to_string(), - )) - } - }; - let converter = FixedLenBinaryConverter::new( - FixedSizeArrayConverter::new(byte_width), - ); + } else if let Some(ArrowType::LargeBinary) = arrow_type { + let converter = + LargeBinaryConverter::new(LargeBinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + LargeBinaryConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } else { + let converter = BinaryConverter::new(BinaryArrayConverter {}); Ok(Box::new(ComplexObjectArrayReader::< - FixedLenByteArrayType, - FixedLenBinaryConverter, + ByteArrayType, + BinaryConverter, >::new( - page_iterator, column_desc, converter + page_iterator, + column_desc, + converter, + arrow_type, )?)) } } - } - } - - fn build_for_string_dictionary_type_inner( - &self, - key_type: &ArrowType, - page_iterator: Box, - column_desc: ColumnDescPtr, - ) -> Result> { - macro_rules! convert_string_dictionary { - ($(($kt: pat, $at: ident),)*) => ( - match key_type { - $($kt => { - let converter = StringDictionaryConverter::new(StringDictionaryArrayConverter {}); - - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - StringDictionaryConverter<$at>, - >::new( - page_iterator, column_desc, converter - )?)) - - })* - ref other => { - return Err(general_err!( - "Invalid/Unsupported index type for dictionary: {:?}", - other + PhysicalType::FIXED_LEN_BYTE_ARRAY => { + let byte_width = match *cur_type { + Type::PrimitiveType { + ref type_length, .. + } => *type_length, + _ => { + return Err(ArrowError( + "Expected a physical type, not a group type".to_string(), )) } - } - ); + }; + let converter = FixedLenBinaryConverter::new( + FixedSizeArrayConverter::new(byte_width), + ); + Ok(Box::new(ComplexObjectArrayReader::< + FixedLenByteArrayType, + FixedLenBinaryConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } } - - convert_string_dictionary!( - (ArrowType::Int8, ArrowInt8Type), - (ArrowType::Int16, ArrowInt16Type), - (ArrowType::Int32, ArrowInt32Type), - (ArrowType::Int64, ArrowInt64Type), - (ArrowType::UInt8, ArrowUInt8Type), - (ArrowType::UInt16, ArrowUInt16Type), - (ArrowType::UInt32, ArrowUInt32Type), - (ArrowType::UInt64, ArrowUInt64Type), - ) } /// Constructs struct array reader without considering repetition. @@ -1801,9 +1658,12 @@ mod tests { let column_desc = schema.column(0); let page_iterator = EmptyPageIterator::new(schema); - let mut array_reader = - PrimitiveArrayReader::::new(Box::new(page_iterator), column_desc) - .unwrap(); + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + ) + .unwrap(); // expect no values to be read let array = array_reader.next_batch(50).unwrap(); @@ -1848,6 +1708,7 @@ mod tests { let mut array_reader = PrimitiveArrayReader::::new( Box::new(page_iterator), column_desc, + None, ) .unwrap(); @@ -1931,6 +1792,7 @@ mod tests { let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new( Box::new(page_iterator), column_desc.clone(), + None, ) .expect("Unable to get array reader"); @@ -2064,6 +1926,7 @@ mod tests { let mut array_reader = PrimitiveArrayReader::::new( Box::new(page_iterator), column_desc, + None, ) .unwrap(); @@ -2177,6 +2040,7 @@ mod tests { Box::new(page_iterator), column_desc, converter, + None, ) .unwrap(); diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 8f64b64558b98..fceeae37dbb19 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -1301,8 +1301,7 @@ mod tests { .collect(); // build a record batch - let expected_batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); roundtrip( "test_arrow_writer_string_dictionary.parquet", @@ -1332,8 +1331,7 @@ mod tests { let d = builder.finish(); // build a record batch - let expected_batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); roundtrip( "test_arrow_writer_primitive_dictionary.parquet", @@ -1359,8 +1357,7 @@ mod tests { .collect(); // build a record batch - let expected_batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(d)]).unwrap(); + let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); roundtrip( "test_arrow_writer_string_dictionary_unsigned_index.parquet", diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 4783616cd94de..4421268f2016b 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -111,7 +111,9 @@ where let primitive_array: ArrayRef = Arc::new(PrimitiveArray::::from(array_data.build())); - Ok(cast(&primitive_array, &ArrowTargetType::DATA_TYPE)?) + // TODO: We should make this cast redundant in favour of 1 cast to rule them all + // Ok(cast(&primitive_array, &ArrowTargetType::DATA_TYPE)?) + Ok(primitive_array) } } @@ -347,6 +349,7 @@ pub type BoolConverter<'a> = ArrayRefConverter< BooleanArray, BooleanArrayConverter, >; +// TODO: intuition tells me that removing many of these converters could help us consolidate where we cast pub type Int8Converter = CastConverter; pub type UInt8Converter = CastConverter; pub type Int16Converter = CastConverter; @@ -515,7 +518,10 @@ mod tests { } #[test] + #[ignore = "We need to look at whether this is still relevant after we refactor out the casts"] fn test_converter_arrow_source_i16_target_i32() { + // TODO: this fails if we remove the cast here on converter. Is it still relevant? + // I'd favour removing these Parquet::PHYSICAL > Arrow::DataType, so we can do it in 1 pleace. let raw_data = vec![Some(1i16), None, Some(2i16), Some(3i16)]; converter_arrow_source_target!(raw_data, "INT32", Int16Type, Int16Converter) } From 9d692484aafee180023b16608012eaf917ed2b5d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 12:53:31 -0400 Subject: [PATCH 20/28] Switch from general_err to unreachable --- rust/parquet/src/arrow/array_reader.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 3959d9668f832..d81433506c4b0 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -288,10 +288,9 @@ impl ArrayReader for PrimitiveArrayReader { PhysicalType::INT96 | PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - // TODO: we could use unreachable!() as this is a private fn - Err(general_err!( - "Cannot read primitive array with a complex physical type" - )) + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); } }?; From f3b287dfbb7fd41722c9659a61484e5cf948a3f1 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 13:00:54 -0400 Subject: [PATCH 21/28] Change match with one arm to an if let --- rust/parquet/src/arrow/array_reader.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index d81433506c4b0..7060dc1a4f129 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -453,12 +453,13 @@ where }; // TODO: I did this quickly without thinking through it, there might be edge cases to consider - let array = self.converter.convert(data)?; + let mut array = self.converter.convert(data)?; - Ok(match self.data_type { - ArrowType::Dictionary(_, _) => arrow::compute::cast(&array, &self.data_type)?, - _ => array, - }) + if let ArrowType::Dictionary(_, _) = self.data_type { + array = arrow::compute::cast(&array, &self.data_type)?; + } + + Ok(array) } fn get_def_levels(&self) -> Option<&[i16]> { From bb5d5d7ba9187e9ab71be5eab2f1aad1b7ef912e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 14:17:55 -0400 Subject: [PATCH 22/28] Remove some type aliases and calls to cast --- rust/parquet/src/arrow/array_reader.rs | 55 ++++++++++++++------------ rust/parquet/src/arrow/converter.rs | 32 +++++---------- 2 files changed, 39 insertions(+), 48 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 7060dc1a4f129..2fe79eadae016 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -57,9 +57,8 @@ use arrow::util::bit_util; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, - Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, Float32Converter, - Float64Converter, Int32Converter, Int64Converter, Int96ArrayConverter, - Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, + CastConverter, Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, + Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; @@ -270,29 +269,33 @@ impl ArrayReader for PrimitiveArrayReader { // Convert to arrays by using the Parquet phyisical type. // The physical types are then cast to Arrow types if necessary - let array = - match T::get_physical_type() { - PhysicalType::BOOLEAN => BoolConverter::new(BooleanArrayConverter {}) - .convert(self.record_reader.cast::()), - PhysicalType::INT32 => { - // TODO: the cast is a no-op, but we should remove it - Int32Converter::new().convert(self.record_reader.cast::()) - } - PhysicalType::INT64 => { - Int64Converter::new().convert(self.record_reader.cast::()) - } - PhysicalType::FLOAT => Float32Converter::new() - .convert(self.record_reader.cast::()), - PhysicalType::DOUBLE => Float64Converter::new() - .convert(self.record_reader.cast::()), - PhysicalType::INT96 - | PhysicalType::BYTE_ARRAY - | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); - } - }?; + let array = match T::get_physical_type() { + PhysicalType::BOOLEAN => BoolConverter::new(BooleanArrayConverter {}) + .convert(&mut self.record_reader), + PhysicalType::INT32 => { + CastConverter::<_, ArrowInt32Type, ArrowInt32Type>::new() + .convert(&mut self.record_reader) + } + PhysicalType::INT64 => { + CastConverter::<_, ArrowInt64Type, ArrowInt64Type>::new() + .convert(&mut self.record_reader) + } + PhysicalType::FLOAT => { + CastConverter::<_, ArrowFloat32Type, ArrowFloat32Type>::new() + .convert(&mut self.record_reader) + } + PhysicalType::DOUBLE => { + CastConverter::<_, ArrowFloat64Type, ArrowFloat64Type>::new() + .convert(&mut self.record_reader) + } + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }?; // cast to Arrow type // TODO: we need to check if it's fine for this to be fallible. diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 4421268f2016b..67553de7d32f2 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -46,14 +46,10 @@ use arrow::array::{ }; use std::marker::PhantomData; -use crate::data_type::{ - BoolType, DoubleType as ParquetDoubleType, FloatType as ParquetFloatType, - Int32Type as ParquetInt32Type, Int64Type as ParquetInt64Type, -}; +use crate::data_type::{Int32Type as ParquetInt32Type, Int64Type as ParquetInt64Type}; use arrow::datatypes::{ - Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - TimestampMicrosecondType, TimestampMillisecondType, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, + Date32Type, Int16Type, Int32Type, Int8Type, TimestampMicrosecondType, + TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; /// A converter is used to consume record reader's content and convert it to arrow @@ -119,11 +115,10 @@ where pub struct BooleanArrayConverter {} -impl Converter<&mut RecordReader, BooleanArray> for BooleanArrayConverter { - fn convert( - &self, - record_reader: &mut RecordReader, - ) -> Result { +impl Converter<&mut RecordReader, BooleanArray> + for BooleanArrayConverter +{ + fn convert(&self, record_reader: &mut RecordReader) -> Result { let record_data = record_reader.consume_record_data()?; let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); @@ -344,19 +339,14 @@ where } } -pub type BoolConverter<'a> = ArrayRefConverter< - &'a mut RecordReader, - BooleanArray, - BooleanArrayConverter, ->; +pub type BoolConverter<'a, T> = + ArrayRefConverter<&'a mut RecordReader, BooleanArray, BooleanArrayConverter>; // TODO: intuition tells me that removing many of these converters could help us consolidate where we cast pub type Int8Converter = CastConverter; pub type UInt8Converter = CastConverter; pub type Int16Converter = CastConverter; pub type UInt16Converter = CastConverter; -pub type Int32Converter = CastConverter; pub type UInt32Converter = CastConverter; -pub type Int64Converter = CastConverter; pub type Date32Converter = CastConverter; pub type TimestampMillisecondConverter = CastConverter; @@ -371,8 +361,6 @@ pub type Time64MicrosecondConverter = pub type Time64NanosecondConverter = CastConverter; pub type UInt64Converter = CastConverter; -pub type Float32Converter = CastConverter; -pub type Float64Converter = CastConverter; pub type Utf8Converter = ArrayRefConverter>, StringArray, Utf8ArrayConverter>; pub type LargeUtf8Converter = @@ -535,7 +523,7 @@ mod tests { #[test] fn test_converter_arrow_source_i32_target_i32() { let raw_data = vec![Some(1i32), None, Some(2i32), Some(3i32)]; - converter_arrow_source_target!(raw_data, "INT32", Int32Type, Int32Converter) + converter_arrow_source_target!(raw_data, "INT32", Int32Type, CastConverter) } fn build_record_reader( From a1c153f2ea097a8a732e1d0a35afca417a9d64d4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 14:32:05 -0400 Subject: [PATCH 23/28] Remove RecordReader cast and the CastRecordReader trait --- rust/parquet/src/arrow/record_reader.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index b30ab7760b275..519bd15fb0c2f 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -124,26 +124,6 @@ impl RecordReader { } } - pub(crate) fn cast(&mut self) -> &mut RecordReader { - trait CastRecordReader { - fn cast(&mut self) -> &mut RecordReader; - } - - impl CastRecordReader for RecordReader { - default fn cast(&mut self) -> &mut RecordReader { - panic!("Attempted to cast RecordReader to the wrong type") - } - } - - impl CastRecordReader for RecordReader { - fn cast(&mut self) -> &mut RecordReader { - self - } - } - - CastRecordReader::::cast(self) - } - /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { self.column_reader = From bfe76698ea7fce9e9d4b673639d755e9cf00701e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 14:35:42 -0400 Subject: [PATCH 24/28] Remove some more type aliases --- rust/parquet/src/arrow/converter.rs | 52 ++++++----------------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 67553de7d32f2..b91e4dad7a007 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -18,19 +18,13 @@ use crate::arrow::record_reader::RecordReader; use crate::data_type::{ByteArray, DataType, Int96}; // TODO: clean up imports (best done when there are few moving parts) -use arrow::{ - array::{ - Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, - BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder, - LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, - StringDictionaryBuilder, TimestampNanosecondBuilder, - }, - datatypes::Time32MillisecondType, -}; -use arrow::{ - compute::cast, datatypes::Time32SecondType, datatypes::Time64MicrosecondType, - datatypes::Time64NanosecondType, +use arrow::array::{ + Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, + BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder, LargeStringBuilder, + PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, StringDictionaryBuilder, + TimestampNanosecondBuilder, }; +use arrow::compute::cast; use std::convert::From; use std::sync::Arc; @@ -46,11 +40,8 @@ use arrow::array::{ }; use std::marker::PhantomData; -use crate::data_type::{Int32Type as ParquetInt32Type, Int64Type as ParquetInt64Type}; -use arrow::datatypes::{ - Date32Type, Int16Type, Int32Type, Int8Type, TimestampMicrosecondType, - TimestampMillisecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, -}; +use crate::data_type::Int32Type as ParquetInt32Type; +use arrow::datatypes::Int32Type; /// A converter is used to consume record reader's content and convert it to arrow /// primitive array. @@ -341,26 +332,6 @@ where pub type BoolConverter<'a, T> = ArrayRefConverter<&'a mut RecordReader, BooleanArray, BooleanArrayConverter>; -// TODO: intuition tells me that removing many of these converters could help us consolidate where we cast -pub type Int8Converter = CastConverter; -pub type UInt8Converter = CastConverter; -pub type Int16Converter = CastConverter; -pub type UInt16Converter = CastConverter; -pub type UInt32Converter = CastConverter; -pub type Date32Converter = CastConverter; -pub type TimestampMillisecondConverter = - CastConverter; -pub type TimestampMicrosecondConverter = - CastConverter; -pub type Time32SecondConverter = - CastConverter; -pub type Time32MillisecondConverter = - CastConverter; -pub type Time64MicrosecondConverter = - CastConverter; -pub type Time64NanosecondConverter = - CastConverter; -pub type UInt64Converter = CastConverter; pub type Utf8Converter = ArrayRefConverter>, StringArray, Utf8ArrayConverter>; pub type LargeUtf8Converter = @@ -457,7 +428,6 @@ where #[cfg(test)] mod tests { use super::*; - use crate::arrow::converter::Int16Converter; use crate::arrow::record_reader::RecordReader; use crate::basic::Encoding; use crate::schema::parser::parse_message_type; @@ -466,7 +436,7 @@ mod tests { use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; use arrow::array::ArrayEqual; use arrow::array::PrimitiveArray; - use arrow::datatypes::{Int16Type, Int32Type}; + use arrow::datatypes::{Date32Type, Int16Type, Int32Type}; use std::rc::Rc; macro_rules! converter_arrow_source_target { @@ -511,13 +481,13 @@ mod tests { // TODO: this fails if we remove the cast here on converter. Is it still relevant? // I'd favour removing these Parquet::PHYSICAL > Arrow::DataType, so we can do it in 1 pleace. let raw_data = vec![Some(1i16), None, Some(2i16), Some(3i16)]; - converter_arrow_source_target!(raw_data, "INT32", Int16Type, Int16Converter) + converter_arrow_source_target!(raw_data, "INT32", Int16Type, CastConverter) } #[test] fn test_converter_arrow_source_i32_target_date32() { let raw_data = vec![Some(1i32), None, Some(2i32), Some(3i32)]; - converter_arrow_source_target!(raw_data, "INT32", Date32Type, Date32Converter) + converter_arrow_source_target!(raw_data, "INT32", Date32Type, CastConverter) } #[test] From e15ecf79d1f56d3b72f6fb0396c4766e260adc4b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 15:23:42 -0400 Subject: [PATCH 25/28] Move the CastConverter code into PrimitiveArrayReader --- rust/parquet/src/arrow/array_reader.rs | 69 ++++++++++++++++++++------ 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 2fe79eadae016..f2f7a20959a3d 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -33,7 +33,7 @@ use arrow::array::{ }; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ - BooleanType as ArrowBooleanType, DataType as ArrowType, + ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, DurationMicrosecondType as ArrowDurationMicrosecondType, DurationMillisecondType as ArrowDurationMillisecondType, @@ -56,10 +56,10 @@ use arrow::datatypes::{ use arrow::util::bit_util; use crate::arrow::converter::{ - BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, - CastConverter, Converter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter, LargeBinaryConverter, - LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, + BinaryArrayConverter, BinaryConverter, Converter, FixedLenBinaryConverter, + FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, + LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, + LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -267,26 +267,63 @@ impl ArrayReader for PrimitiveArrayReader { } } + let arrow_data_type = match T::get_physical_type() { + PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE, + PhysicalType::INT32 => ArrowInt32Type::DATA_TYPE, + PhysicalType::INT64 => ArrowInt64Type::DATA_TYPE, + PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE, + PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE, + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }; + // Convert to arrays by using the Parquet phyisical type. // The physical types are then cast to Arrow types if necessary + + let mut record_data = self.record_reader.consume_record_data()?; + + if T::get_physical_type() == PhysicalType::BOOLEAN { + let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); + + for e in record_data.data() { + boolean_buffer.append(*e > 0)?; + } + record_data = boolean_buffer.finish(); + } + + let mut array_data = ArrayDataBuilder::new(arrow_data_type) + .len(self.record_reader.num_values()) + .add_buffer(record_data); + + if let Some(b) = self.record_reader.consume_bitmap_buffer()? { + array_data = array_data.null_bit_buffer(b); + } + let array = match T::get_physical_type() { - PhysicalType::BOOLEAN => BoolConverter::new(BooleanArrayConverter {}) - .convert(&mut self.record_reader), + PhysicalType::BOOLEAN => { + Arc::new(PrimitiveArray::::from(array_data.build())) + as ArrayRef + } PhysicalType::INT32 => { - CastConverter::<_, ArrowInt32Type, ArrowInt32Type>::new() - .convert(&mut self.record_reader) + Arc::new(PrimitiveArray::::from(array_data.build())) + as ArrayRef } PhysicalType::INT64 => { - CastConverter::<_, ArrowInt64Type, ArrowInt64Type>::new() - .convert(&mut self.record_reader) + Arc::new(PrimitiveArray::::from(array_data.build())) + as ArrayRef } PhysicalType::FLOAT => { - CastConverter::<_, ArrowFloat32Type, ArrowFloat32Type>::new() - .convert(&mut self.record_reader) + Arc::new(PrimitiveArray::::from(array_data.build())) + as ArrayRef } PhysicalType::DOUBLE => { - CastConverter::<_, ArrowFloat64Type, ArrowFloat64Type>::new() - .convert(&mut self.record_reader) + Arc::new(PrimitiveArray::::from(array_data.build())) + as ArrayRef } PhysicalType::INT96 | PhysicalType::BYTE_ARRAY @@ -295,7 +332,7 @@ impl ArrayReader for PrimitiveArrayReader { "PrimitiveArrayReaders don't support complex physical types" ); } - }?; + }; // cast to Arrow type // TODO: we need to check if it's fine for this to be fallible. From 7e3d54a2a573af35693bb3b183fdc8f2c29864ba Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 15:36:17 -0400 Subject: [PATCH 26/28] Remove now unneeded CastConverter and BoolConverter --- rust/parquet/src/arrow/converter.rs | 212 +--------------------------- 1 file changed, 4 insertions(+), 208 deletions(-) diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index b91e4dad7a007..33b29c897e6b7 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -15,25 +15,20 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::record_reader::RecordReader; use crate::data_type::{ByteArray, DataType, Int96}; // TODO: clean up imports (best done when there are few moving parts) use arrow::array::{ - Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder, - BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder, LargeStringBuilder, - PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, StringDictionaryBuilder, - TimestampNanosecondBuilder, + Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder, + LargeStringBuilder, PrimitiveBuilder, PrimitiveDictionaryBuilder, StringBuilder, + StringDictionaryBuilder, TimestampNanosecondBuilder, }; use arrow::compute::cast; use std::convert::From; use std::sync::Arc; use crate::errors::Result; -use arrow::datatypes::{ - ArrowDictionaryKeyType, ArrowPrimitiveType, DataType as ArrowDataType, -}; +use arrow::datatypes::{ArrowDictionaryKeyType, ArrowPrimitiveType}; -use arrow::array::ArrayDataBuilder; use arrow::array::{ BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeBinaryArray, LargeStringArray, PrimitiveArray, StringArray, TimestampNanosecondArray, @@ -52,84 +47,6 @@ pub trait Converter { fn convert(&self, source: S) -> Result; } -/// Cast converter first converts record reader's buffer to arrow's -/// `PrimitiveArray`, then casts it to `PrimitiveArray`. -pub struct CastConverter { - _parquet_marker: PhantomData, - _arrow_source_marker: PhantomData, - _arrow_target_marker: PhantomData, -} - -impl - CastConverter -where - ParquetType: DataType, - ArrowSourceType: ArrowPrimitiveType, - ArrowTargetType: ArrowPrimitiveType, -{ - pub fn new() -> Self { - Self { - _parquet_marker: PhantomData, - _arrow_source_marker: PhantomData, - _arrow_target_marker: PhantomData, - } - } -} - -impl - Converter<&mut RecordReader, ArrayRef> - for CastConverter -where - ParquetType: DataType, - ArrowSourceType: ArrowPrimitiveType, - ArrowTargetType: ArrowPrimitiveType, -{ - fn convert(&self, record_reader: &mut RecordReader) -> Result { - let record_data = record_reader.consume_record_data(); - - let mut array_data = ArrayDataBuilder::new(ArrowSourceType::DATA_TYPE) - .len(record_reader.num_values()) - .add_buffer(record_data?); - - if let Some(b) = record_reader.consume_bitmap_buffer()? { - array_data = array_data.null_bit_buffer(b); - } - - let primitive_array: ArrayRef = - Arc::new(PrimitiveArray::::from(array_data.build())); - - // TODO: We should make this cast redundant in favour of 1 cast to rule them all - // Ok(cast(&primitive_array, &ArrowTargetType::DATA_TYPE)?) - Ok(primitive_array) - } -} - -pub struct BooleanArrayConverter {} - -impl Converter<&mut RecordReader, BooleanArray> - for BooleanArrayConverter -{ - fn convert(&self, record_reader: &mut RecordReader) -> Result { - let record_data = record_reader.consume_record_data()?; - - let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); - - for e in record_data.data() { - boolean_buffer.append(*e > 0)?; - } - - let mut array_data = ArrayDataBuilder::new(ArrowDataType::Boolean) - .len(record_data.len()) - .add_buffer(boolean_buffer.finish()); - - if let Some(b) = record_reader.consume_bitmap_buffer()? { - array_data = array_data.null_bit_buffer(b); - } - - Ok(BooleanArray::from(array_data.build())) - } -} - pub struct FixedSizeArrayConverter { byte_width: i32, } @@ -330,8 +247,6 @@ where } } -pub type BoolConverter<'a, T> = - ArrayRefConverter<&'a mut RecordReader, BooleanArray, BooleanArrayConverter>; pub type Utf8Converter = ArrayRefConverter>, StringArray, Utf8ArrayConverter>; pub type LargeUtf8Converter = @@ -424,122 +339,3 @@ where .map(|array| Arc::new(array) as ArrayRef) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::arrow::record_reader::RecordReader; - use crate::basic::Encoding; - use crate::schema::parser::parse_message_type; - use crate::schema::types::SchemaDescriptor; - use crate::util::test_common::page_util::InMemoryPageReader; - use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; - use arrow::array::ArrayEqual; - use arrow::array::PrimitiveArray; - use arrow::datatypes::{Date32Type, Int16Type, Int32Type}; - use std::rc::Rc; - - macro_rules! converter_arrow_source_target { - ($raw_data:expr, $physical_type:expr, $result_arrow_type:ty, $converter:ty) => {{ - // Construct record reader - let mut record_reader = { - // Construct column schema - let message_type = &format!( - " - message test_schema {{ - OPTIONAL {} leaf; - }} - ", - $physical_type - ); - - let def_levels = [1i16, 0i16, 1i16, 1i16]; - build_record_reader( - message_type, - &[1, 2, 3], - 0i16, - None, - 1i16, - Some(&def_levels), - 10, - ) - }; - - let array = <$converter>::new().convert(&mut record_reader).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert!(array.equals(&PrimitiveArray::<$result_arrow_type>::from($raw_data))); - }}; - } - - #[test] - #[ignore = "We need to look at whether this is still relevant after we refactor out the casts"] - fn test_converter_arrow_source_i16_target_i32() { - // TODO: this fails if we remove the cast here on converter. Is it still relevant? - // I'd favour removing these Parquet::PHYSICAL > Arrow::DataType, so we can do it in 1 pleace. - let raw_data = vec![Some(1i16), None, Some(2i16), Some(3i16)]; - converter_arrow_source_target!(raw_data, "INT32", Int16Type, CastConverter) - } - - #[test] - fn test_converter_arrow_source_i32_target_date32() { - let raw_data = vec![Some(1i32), None, Some(2i32), Some(3i32)]; - converter_arrow_source_target!(raw_data, "INT32", Date32Type, CastConverter) - } - - #[test] - fn test_converter_arrow_source_i32_target_i32() { - let raw_data = vec![Some(1i32), None, Some(2i32), Some(3i32)]; - converter_arrow_source_target!(raw_data, "INT32", Int32Type, CastConverter) - } - - fn build_record_reader( - message_type: &str, - values: &[T::T], - max_rep_level: i16, - rep_levels: Option<&[i16]>, - max_def_level: i16, - def_levels: Option<&[i16]>, - num_records: usize, - ) -> RecordReader { - let desc = parse_message_type(message_type) - .map(|t| SchemaDescriptor::new(Rc::new(t))) - .map(|s| s.column(0)) - .unwrap(); - - let mut record_reader = RecordReader::::new(desc.clone()); - - // Prepare record reader - let mut pb = DataPageBuilderImpl::new(desc, 4, true); - if rep_levels.is_some() { - pb.add_rep_levels( - max_rep_level, - match rep_levels { - Some(a) => a, - _ => unreachable!(), - }, - ); - } - if def_levels.is_some() { - pb.add_def_levels( - max_def_level, - match def_levels { - Some(a) => a, - _ => unreachable!(), - }, - ); - } - pb.add_values::(Encoding::PLAIN, &values); - let page = pb.consume(); - - let page_reader = Box::new(InMemoryPageReader::new(vec![page])); - record_reader.set_page_reader(page_reader).unwrap(); - - record_reader.read_records(num_records).unwrap(); - - record_reader - } -} From c90485d7d7cce5a96109c58aed8e885eb58b8324 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 16:51:48 -0400 Subject: [PATCH 27/28] Remove a resolved TODO --- rust/parquet/src/arrow/array_reader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index f2f7a20959a3d..47665a75f8aad 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -492,7 +492,6 @@ where data_buffer.into_iter().map(Some).collect() }; - // TODO: I did this quickly without thinking through it, there might be edge cases to consider let mut array = self.converter.convert(data)?; if let ArrowType::Dictionary(_, _) = self.data_type { From 1cc53e447b8b532b6e32a22dda792b59054f3f81 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Oct 2020 16:53:03 -0400 Subject: [PATCH 28/28] Change a panic to unreachable --- rust/parquet/src/arrow/arrow_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index fceeae37dbb19..09f004107e37b 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -193,7 +193,7 @@ fn write_leaves( ($($kt: pat, $vt: pat, $w: ident => $kat: ty, $vat: ty,)*) => ( match (&**key_type, &**value_type, &mut col_writer) { $(($kt, $vt, $w(writer)) => write_dict::<$kat, $vat, _>(array, writer, levels),)* - (kt, vt, _) => panic!("Don't know how to write dictionary of <{:?}, {:?}>", kt, vt), + (kt, vt, _) => unreachable!("Shouldn't be attempting to write dictionary of <{:?}, {:?}>", kt, vt), } ); }