diff --git a/Cargo.toml b/Cargo.toml index ad96f42e54e..a15771600f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.3", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] } # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index 6c8c6ea1054..3a7e38c7ab3 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -183,12 +183,13 @@ fn main() -> Result<()> { let mut writer = File::create(write_path)?; - write_file( + let _ = write_file( &mut writer, row_groups, &schema, parquet_schema, options, None, - ) + )?; + Ok(()) } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index a5fb9658791..1e0183ba1e3 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -16,14 +16,13 @@ fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result Result>> { for column in 0..producer_metadata.schema().num_columns() { for row_group in 0..producer_metadata.row_groups.len() { let start = SystemTime::now(); + let column_metadata = producer_metadata.row_groups[row_group].column(column); println!("produce start: {} {}", column, row_group); - let pages = read::get_page_iterator( - &producer_metadata, - row_group, - column, - &mut file, - None, - vec![], - ) - .unwrap() - .collect::>(); + let pages = read::get_page_iterator(column_metadata, &mut file, None, vec![]) + .unwrap() + .collect::>(); println!( "produce end - {:?}: {} {}", start.elapsed().unwrap(), diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index af00a6a25a4..d6df8d736c2 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -40,14 +40,15 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> let mut file = File::create(path)?; // Write the file. Note that, at present, any error results in a corrupted file. - write_file( + let _ = write_file( &mut file, row_groups, &schema, parquet_schema, options, None, - ) + )?; + Ok(()) } fn main() -> Result<()> { diff --git a/examples/parquet_write_record.rs b/examples/parquet_write_record.rs index 7528f85a33d..4a3a6180e4e 100644 --- a/examples/parquet_write_record.rs +++ b/examples/parquet_write_record.rs @@ -30,14 +30,15 @@ fn write_batch(path: &str, batch: RecordBatch) -> Result<()> { // Write the file. Note that, at present, any error results in a corrupted file. let parquet_schema = row_groups.parquet_schema().clone(); - write_file( + let _ = write_file( &mut file, row_groups, &schema, parquet_schema, options, None, - ) + )?; + Ok(()) } fn main() -> Result<()> { diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index c9d71f75ba2..5171cd87dd0 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -1,6 +1,6 @@ use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ - encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding}, + encoding::{delta_length_byte_array, hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, read::StreamingIterator, @@ -37,12 +37,7 @@ fn read_dict_buffer( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let (_, consumed) = uleb128::decode(indices_buffer); - let indices_buffer = &indices_buffer[consumed..]; - - let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; - - let mut indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length); let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -214,7 +209,7 @@ fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs index d55fc81364f..011a19dd642 100644 --- a/src/io/parquet/read/binary/dictionary.rs +++ b/src/io/parquet/read/binary/dictionary.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use parquet2::{ - encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, + encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, page::{BinaryPageDict, DataPage}, read::StreamingIterator, @@ -40,12 +40,8 @@ fn read_dict_optional( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let (_, consumed) = uleb128::decode(indices_buffer); - let indices_buffer = &indices_buffer[consumed..]; - - let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; - - let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + let mut new_indices = + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -97,7 +93,7 @@ where assert_eq!(descriptor.max_rep_level(), 0); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = other_utils::split_buffer(page, descriptor); match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index e20af80426b..139e66ae156 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -1,13 +1,10 @@ use std::sync::Arc; use parquet2::{ - encoding::Encoding, + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, - page::{DataPage, DataPageHeader, DataPageHeaderExt}, - read::{ - levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, - StreamingIterator, - }, + page::DataPage, + read::{levels::get_bit_width, StreamingIterator}, }; use super::super::nested_utils::*; @@ -65,16 +62,13 @@ fn read( match (rep_level_encoding.0, def_level_encoding.0) { (Encoding::Rle, Encoding::Rle) => { - let rep_levels = RLEDecoder::new( - rep_levels, - get_bit_width(rep_level_encoding.1), - additional as u32, - ); + let rep_levels = + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); if is_nullable { - let def_levels = RLEDecoder::new( + let def_levels = HybridRleDecoder::new( def_levels, get_bit_width(def_level_encoding.1), - additional as u32, + additional, ); let new_values = utils::BinaryIter::new(values_buffer); read_values( @@ -89,11 +83,8 @@ fn read( read_plain_required(values_buffer, additional, offsets, values) } - let def_levels = RLEDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), - additional as u32, - ); + let def_levels = + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); extend_offsets( rep_levels, @@ -119,83 +110,38 @@ fn extend_from_page( ) -> Result<()> { let additional = page.num_values(); - match page.header() { - DataPageHeader::V1(header) => { - assert_eq!(header.definition_level_encoding(), Encoding::Rle); - assert_eq!(header.repetition_level_encoding(), Encoding::Rle); - - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let (rep_levels, def_levels, values_buffer) = split_buffer_v1( - page.buffer(), - descriptor.max_rep_level() > 0, - descriptor.max_def_level() > 0, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &header.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - offsets, - values, - validity, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V1", - "primitive", - )) - } - } + let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); + + match (&page.encoding(), page.dictionary_page()) { + (Encoding::Plain, None) => read( + rep_levels, + def_levels, + values_buffer, + additional, + ( + &page.repetition_level_encoding(), + descriptor.max_rep_level(), + ), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + is_nullable, + nested, + offsets, + values, + validity, + ), + _ => { + return Err(utils::not_implemented( + &page.encoding(), + is_nullable, + page.dictionary_page().is_some(), + version, + "primitive", + )) } - DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let def_level_buffer_length = header.definition_levels_byte_length as usize; - let rep_level_buffer_length = header.repetition_levels_byte_length as usize; - let (rep_levels, def_levels, values_buffer) = split_buffer_v2( - page.buffer(), - rep_level_buffer_length, - def_level_buffer_length, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - (&Encoding::Rle, descriptor.max_rep_level()), - (&Encoding::Rle, descriptor.max_def_level()), - is_nullable, - nested, - offsets, - values, - validity, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V2", - "primitive", - )) - } - }, - }; + } Ok(()) } diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index b5e894e5443..f23f631b2ff 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -122,7 +122,7 @@ fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::Plain, None, true) => read_optional( diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 2accb1e352b..1cb0ffc7c52 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -1,13 +1,10 @@ use std::sync::Arc; use parquet2::{ - encoding::Encoding, + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, - page::{DataPage, DataPageHeader, DataPageHeaderExt}, - read::{ - levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, - StreamingIterator, - }, + page::DataPage, + read::{levels::get_bit_width, StreamingIterator}, }; use super::super::nested_utils::*; @@ -59,16 +56,13 @@ fn read( match (rep_level_encoding.0, def_level_encoding.0) { (Encoding::Rle, Encoding::Rle) => { - let rep_levels = RLEDecoder::new( - rep_levels, - get_bit_width(rep_level_encoding.1), - additional as u32, - ); + let rep_levels = + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); if is_nullable { - let def_levels = RLEDecoder::new( + let def_levels = HybridRleDecoder::new( def_levels, get_bit_width(def_level_encoding.1), - additional as u32, + additional, ); let new_values = BitmapIter::new(values_buffer, 0, additional); read_values(def_levels, max_def_level, new_values, values, validity) @@ -76,11 +70,8 @@ fn read( read_required(values_buffer, additional, values) } - let def_levels = RLEDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), - additional as u32, - ); + let def_levels = + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); extend_offsets( rep_levels, @@ -105,81 +96,37 @@ fn extend_from_page( ) -> Result<()> { let additional = page.num_values(); - match page.header() { - DataPageHeader::V1(header) => { - assert_eq!(header.definition_level_encoding(), Encoding::Rle); - assert_eq!(header.repetition_level_encoding(), Encoding::Rle); + let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let (rep_levels, def_levels, values_buffer) = split_buffer_v1( - page.buffer(), - descriptor.max_rep_level() > 0, - descriptor.max_def_level() > 0, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &header.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - values, - validity, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V1", - "primitive", - )) - } - } + match (&page.encoding(), page.dictionary_page()) { + (Encoding::Plain, None) => read( + rep_levels, + def_levels, + values_buffer, + additional, + ( + &page.repetition_level_encoding(), + descriptor.max_rep_level(), + ), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + is_nullable, + nested, + values, + validity, + ), + _ => { + return Err(utils::not_implemented( + &page.encoding(), + is_nullable, + page.dictionary_page().is_some(), + version, + "primitive", + )) } - DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let def_level_buffer_length = header.definition_levels_byte_length as usize; - let rep_level_buffer_length = header.repetition_levels_byte_length as usize; - let (rep_levels, def_levels, values_buffer) = split_buffer_v2( - page.buffer(), - rep_level_buffer_length, - def_level_buffer_length, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - (&Encoding::Rle, descriptor.max_rep_level()), - (&Encoding::Rle, descriptor.max_def_level()), - is_nullable, - nested, - values, - validity, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V2", - "primitive", - )) - } - }, - }; + } Ok(()) } diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index e0d56686adb..0957c651866 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -1,6 +1,6 @@ use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ - encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, + encoding::{hybrid_rle, Encoding}, page::{DataPage, FixedLenByteArrayPageDict}, read::StreamingIterator, }; @@ -36,13 +36,7 @@ pub(crate) fn read_dict_buffer( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let (_, consumed) = uleb128::decode(indices_buffer); - let indices_buffer = &indices_buffer[consumed..]; - - let non_null_indices_len = (indices_buffer.len() * 8 / bit_width as usize) as u32; - - let mut indices = - bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len as usize); + let mut indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, length); let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -204,7 +198,7 @@ pub(crate) fn extend_from_page( assert!(descriptor.max_def_level() <= 1); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary, Some(dict), true) => read_dict_buffer( diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 6146787a440..3cee6b32666 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -41,17 +41,13 @@ pub use schema::{get_schema, is_type_nullable, FileMetaData}; /// Creates a new iterator of compressed pages. pub fn get_page_iterator<'b, RR: Read + Seek>( - metadata: &FileMetaData, - row_group: usize, - column: usize, + column_metadata: &ColumnChunkMetaData, reader: &'b mut RR, pages_filter: Option, buffer: Vec, ) -> Result> { Ok(_get_page_iterator( - metadata, - row_group, - column, + column_metadata, reader, pages_filter, buffer, @@ -60,15 +56,13 @@ pub fn get_page_iterator<'b, RR: Read + Seek>( /// Creates a new iterator of compressed pages. pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( - metadata: &'a FileMetaData, - row_group: usize, - column: usize, + column_metadata: &'a ColumnChunkMetaData, reader: &'a mut RR, pages_filter: Option, buffer: Vec, ) -> Result> + 'a> { let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true)); - Ok(_get_page_stream(metadata, row_group, column, reader, buffer, pages_filter).await?) + Ok(_get_page_stream(column_metadata, reader, buffer, pages_filter).await?) } /// Reads parquets' metadata syncronously. diff --git a/src/io/parquet/read/primitive/basic.rs b/src/io/parquet/read/primitive/basic.rs index f519cd5bc09..4d5be5332b5 100644 --- a/src/io/parquet/read/primitive/basic.rs +++ b/src/io/parquet/read/primitive/basic.rs @@ -1,5 +1,5 @@ use parquet2::{ - encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, + encoding::{hybrid_rle, Encoding}, page::{DataPage, PrimitivePageDict}, types::NativeType, }; @@ -34,12 +34,8 @@ fn read_dict_buffer_optional( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let (_, consumed) = uleb128::decode(indices_buffer); - let indices_buffer = &indices_buffer[consumed..]; - - let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; - - let mut indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + let mut indices = + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -158,7 +154,7 @@ where assert_eq!(descriptor.max_rep_level(), 0); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = other_utils::split_buffer(page, descriptor); match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { diff --git a/src/io/parquet/read/primitive/dictionary.rs b/src/io/parquet/read/primitive/dictionary.rs index 324471ed328..8d7c7de4bca 100644 --- a/src/io/parquet/read/primitive/dictionary.rs +++ b/src/io/parquet/read/primitive/dictionary.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use parquet2::{ - encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, + encoding::{hybrid_rle, Encoding}, page::{DataPage, PrimitivePageDict}, read::StreamingIterator, types::NativeType, }; -use super::super::utils as other_utils; +use super::super::utils; use super::{ColumnChunkMetaData, ColumnDescriptor}; use crate::{ array::{Array, DictionaryArray, DictionaryKey, PrimitiveArray}, @@ -41,12 +41,9 @@ fn read_dict_optional( let bit_width = indices_buffer[0]; let indices_buffer = &indices_buffer[1..]; - let (_, consumed) = uleb128::decode(indices_buffer); - let indices_buffer = &indices_buffer[consumed..]; - - let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; - - let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + println!("indices_buffer: {:?}", indices_buffer); + let mut new_indices = + hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); @@ -100,7 +97,7 @@ where assert_eq!(descriptor.max_rep_level(), 0); let is_optional = descriptor.max_def_level() == 1; - let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional); + let (_, validity_buffer, values_buffer, version) = utils::split_buffer(page, descriptor); match (&page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { @@ -116,7 +113,7 @@ where ) } _ => { - return Err(other_utils::not_implemented( + return Err(utils::not_implemented( &page.encoding(), is_optional, page.dictionary_page().is_some(), diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/primitive/nested.rs index da10354681d..fab3646bd8f 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/primitive/nested.rs @@ -1,7 +1,7 @@ use parquet2::{ - encoding::Encoding, - page::{DataPage, DataPageHeader, DataPageHeaderExt}, - read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, + page::DataPage, + read::levels::get_bit_width, types::NativeType, }; @@ -74,27 +74,21 @@ fn read( match (rep_level_encoding.0, def_level_encoding.0) { (Encoding::Rle, Encoding::Rle) => { - let rep_levels = RLEDecoder::new( - rep_levels, - get_bit_width(rep_level_encoding.1), - additional as u32, - ); + let rep_levels = + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), additional); if is_nullable { - let def_levels = RLEDecoder::new( + let def_levels = HybridRleDecoder::new( def_levels, get_bit_width(def_level_encoding.1), - additional as u32, + additional, ); read_values(def_levels, max_def_level, new_values, op, values, validity) } else { read_values_required(new_values, op, values) } - let def_levels = RLEDecoder::new( - def_levels, - get_bit_width(def_level_encoding.1), - additional as u32, - ); + let def_levels = + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), additional); extend_offsets( rep_levels, @@ -125,82 +119,37 @@ where { let additional = page.num_values(); - match page.header() { - DataPageHeader::V1(header) => { - assert_eq!(header.definition_level_encoding(), Encoding::Rle); - assert_eq!(header.repetition_level_encoding(), Encoding::Rle); + let (rep_levels, def_levels, values_buffer, version) = utils::split_buffer(page, descriptor); - match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let (rep_levels, def_levels, values_buffer) = split_buffer_v1( - page.buffer(), - descriptor.max_rep_level() > 0, - descriptor.max_def_level() > 0, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - ( - &header.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - is_nullable, - nested, - values, - validity, - op, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V1", - "primitive", - )) - } - } + match (&page.encoding(), page.dictionary_page()) { + (Encoding::Plain, None) => read( + rep_levels, + def_levels, + values_buffer, + additional, + ( + &page.repetition_level_encoding(), + descriptor.max_rep_level(), + ), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + is_nullable, + nested, + values, + validity, + op, + ), + _ => { + return Err(utils::not_implemented( + &page.encoding(), + is_nullable, + page.dictionary_page().is_some(), + version, + "primitive", + )) } - DataPageHeader::V2(header) => match (&page.encoding(), page.dictionary_page()) { - (Encoding::Plain, None) => { - let def_level_buffer_length = header.definition_levels_byte_length as usize; - let rep_level_buffer_length = header.repetition_levels_byte_length as usize; - let (rep_levels, def_levels, values_buffer) = split_buffer_v2( - page.buffer(), - rep_level_buffer_length, - def_level_buffer_length, - ); - read( - rep_levels, - def_levels, - values_buffer, - additional, - (&Encoding::Rle, descriptor.max_rep_level()), - (&Encoding::Rle, descriptor.max_def_level()), - is_nullable, - nested, - values, - validity, - op, - ) - } - _ => { - return Err(utils::not_implemented( - &page.encoding(), - is_nullable, - page.dictionary_page().is_some(), - "V2", - "primitive", - )) - } - }, - }; + } Ok(()) } diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index b88f3703baa..4335dd62058 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -126,11 +126,9 @@ impl Iterator for RecordReader { |(b1, b2, mut columns), (column, field)| { // column according to the file's indexing let column = self.indices[column]; - let column_meta = &columns_meta[column]; + let column_metadata = &columns_meta[column]; let pages = get_page_iterator( - &metadata, - row_group, - column, + column_metadata, &mut self.reader, self.pages_filter.clone(), b1, @@ -138,7 +136,8 @@ impl Iterator for RecordReader { let mut pages = Decompressor::new(pages, b2); - let array = page_iter_to_array(&mut pages, column_meta, field.data_type().clone())?; + let array = + page_iter_to_array(&mut pages, column_metadata, field.data_type().clone())?; let array = if array.len() > remaining_rows { array.slice(0, remaining_rows) diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 2370ed546f4..5659f2d6440 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -1,8 +1,6 @@ use parquet2::encoding::{get_length, Encoding}; -use parquet2::{ - page::{DataPage, DataPageHeader, DataPageHeaderExt}, - read::levels, -}; +use parquet2::metadata::ColumnDescriptor; +use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; use crate::error::ArrowError; @@ -47,21 +45,15 @@ pub fn not_implemented( )) } -pub fn split_buffer(page: &DataPage, is_optional: bool) -> (&[u8], &[u8], &'static str) { - match page.header() { - DataPageHeader::V1(header) => { - assert_eq!(header.definition_level_encoding(), Encoding::Rle); - - let (_, validity_buffer, values_buffer) = - levels::split_buffer_v1(page.buffer(), false, is_optional); - (validity_buffer, values_buffer, "V1") - } - DataPageHeader::V2(header) => { - let def_level_buffer_length = header.definition_levels_byte_length as usize; - - let (_, validity_buffer, values_buffer) = - levels::split_buffer_v2(page.buffer(), 0, def_level_buffer_length); - (validity_buffer, values_buffer, "V2") - } - } +pub fn split_buffer<'a>( + page: &'a DataPage, + descriptor: &ColumnDescriptor, +) -> (&'a [u8], &'a [u8], &'a [u8], &'static str) { + let (rep_levels, validity_buffer, values_buffer) = _split_buffer(page, descriptor); + + let version = match page.header() { + DataPageHeader::V1(_) => "V1", + DataPageHeader::V2(_) => "V2", + }; + (rep_levels, validity_buffer, values_buffer, version) } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index febc803efc6..81a17c45c15 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -11,6 +11,8 @@ mod utils; pub mod stream; +use std::sync::Arc; + use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -62,7 +64,7 @@ pub fn write_file<'a, W, I>( parquet_schema: SchemaDescriptor, options: WriteOptions, key_value_metadata: Option>, -) -> Result<()> +) -> Result where W: std::io::Write + std::io::Seek, I: Iterator>>, @@ -103,7 +105,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of compressed pages, pub fn array_to_pages( - array: &dyn Array, + array: Arc, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, @@ -119,7 +121,7 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array, descriptor, options, encoding) + _ => array_to_page(array.as_ref(), descriptor, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index 1dffa13a03c..ca7295cc336 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -18,7 +18,7 @@ pub struct RowGroupIterator>> { encodings: Vec, } -impl>> RowGroupIterator { +impl<'a, I: Iterator>> RowGroupIterator { /// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatch`]. pub fn try_new( iter: I, @@ -59,7 +59,7 @@ impl>> Iterator for RowGroupIterator { .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, type_), encoding)| { - array_to_pages(array.as_ref(), type_, options, encoding) + array_to_pages(array, type_, options, encoding) }), )) }) diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs index 8383ffb1edc..35972cab0b3 100644 --- a/src/io/parquet/write/stream.rs +++ b/src/io/parquet/write/stream.rs @@ -20,10 +20,10 @@ pub async fn write_stream<'a, W, I>( parquet_schema: SchemaDescriptor, options: WriteOptions, key_value_metadata: Option>, -) -> Result<()> +) -> Result where W: std::io::Write + std::io::Seek, - I: Stream>>, + I: Stream>>, { let key_value_metadata = key_value_metadata .map(|mut x| { diff --git a/tests/it/compute/cast.rs b/tests/it/compute/cast.rs index 36ebfde4f90..d9160559d89 100644 --- a/tests/it/compute/cast.rs +++ b/tests/it/compute/cast.rs @@ -34,8 +34,6 @@ fn u16_as_u8_overflow() { let c = b.as_any().downcast_ref::().unwrap(); let values = c.values().as_slice(); - println!("{}", 255u8.wrapping_add(10)); - assert_eq!(values, &[255, 0, 1, 2, 3]) } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 3a520e006e1..2810aaf4673 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -400,16 +400,19 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result let descritors = parquet_schema.columns().to_vec().into_iter(); let row_groups = batches.iter().map(|batch| { - let iterator = DynIter::new(batch.columns().iter().zip(descritors.clone()).map( - |(array, type_)| { + let iterator = batch + .columns() + .iter() + .zip(descritors.clone()) + .map(|(array, type_)| { let encoding = if let DataType::Dictionary(_, _) = array.data_type() { Encoding::RleDictionary } else { Encoding::Plain }; - array_to_pages(array.as_ref(), type_, options, encoding) - }, - )); + array_to_pages(array.clone(), type_, options, encoding) + }); + let iterator = DynIter::new(iterator); Ok(iterator) });