From da77d13ed78aa167eeef3bd2b332044b9a252ed9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 14 Oct 2021 19:53:56 +0000 Subject: [PATCH] Temp --- Cargo.toml | 6 +++--- examples/parquet_read.rs | 3 ++- examples/parquet_read_parallel.rs | 8 +++---- examples/parquet_write.rs | 31 +++++++++++++++++----------- src/io/parquet/read/mod.rs | 4 ++-- src/io/parquet/write/mod.rs | 7 +++---- src/io/parquet/write/record_batch.rs | 16 +++++++------- tests/it/io/parquet/mod.rs | 2 +- 8 files changed, 42 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0ad596603ed..33435336312 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,8 +61,8 @@ futures = { version = "0.3", optional = true } ahash = { version = "0.7", optional = true } #parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } -#parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } -parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "reuse_compress", optional = true, default_features = false, features = ["stream"] } +#parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } @@ -100,7 +100,7 @@ full = [ "io_avro", "regex", "merge_sort", - #"compute", + "compute", # parses timezones used in timestamp conversions "chrono-tz" ] diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 1e0183ba1e3..8ab3ccdac2c 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,11 +1,12 @@ use std::fs::File; +use std::io::BufReader; use arrow2::io::parquet::read; use arrow2::{array::Array, error::Result}; fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result> { // Open a file, a common operation in Rust - let mut file = File::open(path)?; + let mut file = BufReader::new(File::open(path)?); // Read the files' metadata. This has a small IO cost because it requires seeking to the end // of the file to read its footer. diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index defaf6bb411..9dead6c43ac 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result>> { let metadata_consumer = file_metadata.clone(); let arrow_schema_consumer = arrow_schema.clone(); let child = thread::spawn(move || { - let (column, row_group, iter) = rx_consumer.recv().unwrap(); + let (column, row_group, pages) = rx_consumer.recv().unwrap(); let start = SystemTime::now(); println!("consumer start - {} {}", column, row_group); let metadata = metadata_consumer.row_groups[row_group].column(column); let data_type = arrow_schema_consumer.fields()[column].data_type().clone(); - let pages = iter - .into_iter() - .map(|x| x.and_then(|x| read::decompress(x, &mut vec![]))); - let mut pages = read::streaming_iterator::convert(pages); + let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let array = read::page_iter_to_array(&mut pages, metadata, data_type); println!( "consumer end - {:?}: {} {}", diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index d6df8d736c2..888fce79242 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -1,13 +1,15 @@ use std::fs::File; use std::iter::once; +use arrow2::error::ArrowError; use arrow2::io::parquet::write::to_parquet_schema; use arrow2::{ array::{Array, Int32Array}, datatypes::{Field, Schema}, error::Result, io::parquet::write::{ - array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions, + array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator, + Encoding, FallibleStreamingIterator, Version, WriteOptions, }, }; @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> // map arrow fields to parquet fields let parquet_schema = to_parquet_schema(&schema)?; - // Declare the row group iterator. This must be an iterator of iterators of iterators: - // * first iterator of row groups - // * second iterator of column chunks - // * third iterator of pages - // an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`). - // All column chunks within a row group MUST have the same length. - let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new( - once(array) - .zip(parquet_schema.columns().to_vec().into_iter()) - .map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)), - )))))); + let descriptor = parquet_schema.columns()[0].clone(); + + // Declare the row group iterator. This must be an iterator of iterators of streaming iterators + // * first iterator over row groups + let row_groups = once(Result::Ok(DynIter::new( + // * second iterator over column chunks (we assume no struct arrays -> `once` column) + once( + // * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array. + array_to_pages(array, descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }), + ), + ))); // Create a new empty file let mut file = File::create(path)?; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index d28b7f30a85..1f6198c1e69 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -13,8 +13,8 @@ pub use parquet2::{ page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, - read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, Decompressor, - PageFilter, PageIterator, + read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, + BasicDecompressor, Decompressor, PageFilter, PageIterator, }, schema::types::{ LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index fdc3d886652..e1a6aa6b5e9 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -12,8 +12,6 @@ mod utils; pub mod stream; -use std::sync::Arc; - use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -35,6 +33,7 @@ pub use parquet2::{ write_file as parquet_write_file, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version, WriteOptions, }, + FallibleStreamingIterator, }; pub use record_batch::RowGroupIterator; use schema::schema_to_metadata_key; @@ -110,7 +109,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( - array: Arc, + array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, @@ -126,7 +125,7 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array.as_ref(), descriptor, options, encoding) + _ => array_to_page(array, descriptor, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index bfa599bfd07..b6cb940799c 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -63,13 +63,15 @@ impl>> Iterator for RowGroupIterator { .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, descriptor), encoding)| { - array_to_pages(array, descriptor, options, encoding).map(move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }) + array_to_pages(array.as_ref(), descriptor, options, encoding).map( + move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }, + ) }), )) }) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index cac44296792..e82135fd812 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -499,7 +499,7 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result } else { Encoding::Plain }; - array_to_pages(array.clone(), descriptor, options, encoding).map(|pages| { + array_to_pages(array.as_ref(), descriptor, options, encoding).map(|pages| { let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![])