From 7fbf8e4a940a1b2cc0c20e0a5d1bf0aaa08352a4 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 17 Oct 2021 06:29:54 +0200 Subject: [PATCH] Reuse compress buffer (#60) --- Cargo.toml | 2 +- integration-tests/src/read/binary.rs | 2 +- integration-tests/src/read/mod.rs | 51 ++--- integration-tests/src/read/primitive.rs | 2 +- .../src/read/primitive_nested.rs | 2 +- integration-tests/src/write/mod.rs | 44 +++-- integration-tests/src/write/primitive.rs | 22 +-- src/compression.rs | 26 +-- src/error.rs | 7 - src/lib.rs | 3 + src/page/mod.rs | 29 ++- src/page/page_dict/binary.rs | 4 +- src/page/page_dict/fixed_len_binary.rs | 4 +- src/page/page_dict/mod.rs | 26 ++- src/page/page_dict/primitive.rs | 4 +- src/read/compression.rs | 177 ++++++++++++++---- src/read/mod.rs | 39 ++-- src/read/page_iterator.rs | 17 +- src/write/column_chunk.rs | 61 +++--- src/write/compression.rs | 156 +++++++++++---- src/write/dyn_iter.rs | 35 ++++ src/write/file.rs | 7 +- src/write/mod.rs | 7 +- src/write/page.rs | 4 +- src/write/row_group.rs | 37 ++-- src/write/stream.rs | 9 +- src/write/stream_stream.rs | 7 +- 27 files changed, 509 insertions(+), 275 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2bdcace94..bc44be532 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ bench = false [dependencies] parquet-format-async-temp = "0.2.0" bitpacking = { version = "0.8.2", features = ["bitpacker1x"] } -streaming-iterator = "0.1.5" +streaming-decompression = "0.1" async-stream = { version = "0.3.2", optional = true } futures = { version = "0.3", optional = true } diff --git a/integration-tests/src/read/binary.rs b/integration-tests/src/read/binary.rs index 763835a4a..f0fe5cbec 100644 --- a/integration-tests/src/read/binary.rs +++ b/integration-tests/src/read/binary.rs @@ -1,5 +1,5 @@ use parquet::{ - encoding::{bitpacking, hybrid_rle::HybridRleDecoder, plain_byte_array, uleb128, Encoding}, + encoding::{hybrid_rle::HybridRleDecoder, plain_byte_array, Encoding}, error::Result, metadata::ColumnDescriptor, page::{split_buffer, BinaryPageDict, DataPage}, diff --git a/integration-tests/src/read/mod.rs b/integration-tests/src/read/mod.rs index 5419b2d32..3a1bcf975 100644 --- a/integration-tests/src/read/mod.rs +++ b/integration-tests/src/read/mod.rs @@ -21,41 +21,41 @@ pub fn page_to_array(page: &DataPage, descriptor: &ColumnDescriptor) -> Result match page.dictionary_page() { Some(_) => match physical_type { - PhysicalType::Int32 => Ok(Array::Int32(primitive::page_dict_to_vec( - &page, descriptor, - )?)), - PhysicalType::Int64 => Ok(Array::Int64(primitive::page_dict_to_vec( - &page, descriptor, - )?)), - PhysicalType::Int96 => Ok(Array::Int96(primitive::page_dict_to_vec( - &page, descriptor, - )?)), + PhysicalType::Int32 => { + Ok(Array::Int32(primitive::page_dict_to_vec(page, descriptor)?)) + } + PhysicalType::Int64 => { + Ok(Array::Int64(primitive::page_dict_to_vec(page, descriptor)?)) + } + PhysicalType::Int96 => { + Ok(Array::Int96(primitive::page_dict_to_vec(page, descriptor)?)) + } PhysicalType::Float => Ok(Array::Float32(primitive::page_dict_to_vec( - &page, descriptor, + page, descriptor, )?)), PhysicalType::Double => Ok(Array::Float64(primitive::page_dict_to_vec( - &page, descriptor, + page, descriptor, )?)), PhysicalType::ByteArray => { - Ok(Array::Binary(binary::page_dict_to_vec(&page, descriptor)?)) + Ok(Array::Binary(binary::page_dict_to_vec(page, descriptor)?)) } _ => todo!(), }, None => match physical_type { PhysicalType::Boolean => { - Ok(Array::Boolean(boolean::page_to_vec(&page, descriptor)?)) + Ok(Array::Boolean(boolean::page_to_vec(page, descriptor)?)) } - PhysicalType::Int32 => Ok(Array::Int32(primitive::page_to_vec(&page, descriptor)?)), - PhysicalType::Int64 => Ok(Array::Int64(primitive::page_to_vec(&page, descriptor)?)), - PhysicalType::Int96 => Ok(Array::Int96(primitive::page_to_vec(&page, descriptor)?)), + PhysicalType::Int32 => Ok(Array::Int32(primitive::page_to_vec(page, descriptor)?)), + PhysicalType::Int64 => Ok(Array::Int64(primitive::page_to_vec(page, descriptor)?)), + PhysicalType::Int96 => Ok(Array::Int96(primitive::page_to_vec(page, descriptor)?)), PhysicalType::Float => { - Ok(Array::Float32(primitive::page_to_vec(&page, descriptor)?)) + Ok(Array::Float32(primitive::page_to_vec(page, descriptor)?)) } PhysicalType::Double => { - Ok(Array::Float64(primitive::page_to_vec(&page, descriptor)?)) + Ok(Array::Float64(primitive::page_to_vec(page, descriptor)?)) } PhysicalType::ByteArray => { - Ok(Array::Binary(binary::page_to_vec(&page, descriptor)?)) + Ok(Array::Binary(binary::page_to_vec(page, descriptor)?)) } _ => todo!(), }, @@ -63,13 +63,13 @@ pub fn page_to_array(page: &DataPage, descriptor: &ColumnDescriptor) -> Result match page.dictionary_page() { None => match physical_type { PhysicalType::Int64 => { - Ok(primitive_nested::page_to_array::(&page, descriptor)?) + Ok(primitive_nested::page_to_array::(page, descriptor)?) } _ => todo!(), }, Some(_) => match physical_type { PhysicalType::Int64 => Ok(primitive_nested::page_dict_to_array::( - &page, descriptor, + page, descriptor, )?), _ => todo!(), }, @@ -83,9 +83,10 @@ pub(crate) mod tests { use std::fs::File; use parquet::error::Result; - use parquet::read::{get_page_iterator, read_metadata, Decompressor, StreamingIterator}; + use parquet::read::{get_page_iterator, read_metadata, Decompressor}; use parquet::statistics::{BinaryStatistics, PrimitiveStatistics, Statistics}; use parquet::types::int96_to_i64_ns; + use parquet::FallibleStreamingIterator; use super::*; use crate::tests::*; @@ -100,14 +101,14 @@ pub(crate) mod tests { let column_meta = metadata.row_groups[row_group].column(column); let descriptor = column_meta.descriptor().clone(); - let iterator = get_page_iterator(&column_meta, reader, None, vec![])?; + let iterator = get_page_iterator(column_meta, reader, None, vec![])?; let buffer = vec![]; let mut iterator = Decompressor::new(iterator, buffer); let statistics = column_meta.statistics().transpose()?; - let page = iterator.next().unwrap().as_ref().unwrap(); + let page = iterator.next()?.unwrap(); let array = page_to_array(page, &descriptor)?; @@ -201,7 +202,7 @@ pub(crate) mod tests { ]; let expected = expected.into_iter().map(Some).collect::>(); - let (array, _) = get_column(&path, 10)?; + let (array, _) = get_column(path, 10)?; if let Array::Int96(array) = array { let a = array .into_iter() diff --git a/integration-tests/src/read/primitive.rs b/integration-tests/src/read/primitive.rs index 31eb6b023..4eada5bb9 100644 --- a/integration-tests/src/read/primitive.rs +++ b/integration-tests/src/read/primitive.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use super::utils::ValuesDef; use parquet::{ - encoding::{bitpacking, hybrid_rle::HybridRleDecoder, uleb128, Encoding}, + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, error::Result, metadata::ColumnDescriptor, page::{split_buffer, DataPage, PrimitivePageDict}, diff --git a/integration-tests/src/read/primitive_nested.rs b/integration-tests/src/read/primitive_nested.rs index 91d57ea14..45619e173 100644 --- a/integration-tests/src/read/primitive_nested.rs +++ b/integration-tests/src/read/primitive_nested.rs @@ -177,7 +177,7 @@ fn read_dict_array( let bit_width = values[0]; let values = &values[1..]; - let (_, consumed) = uleb128::decode(&values); + let (_, consumed) = uleb128::decode(values); let values = &values[consumed..]; let indices = bitpacking::Decoder::new(values, bit_width, length as usize); diff --git a/integration-tests/src/write/mod.rs b/integration-tests/src/write/mod.rs index d917fbe0b..d12986529 100644 --- a/integration-tests/src/write/mod.rs +++ b/integration-tests/src/write/mod.rs @@ -1,8 +1,6 @@ pub(crate) mod primitive; -use parquet::{ - error::Result, metadata::ColumnDescriptor, page::CompressedPage, write::WriteOptions, -}; +use parquet::{error::Result, metadata::ColumnDescriptor, page::EncodedPage, write::WriteOptions}; use super::Array; @@ -10,14 +8,14 @@ pub fn array_to_page( array: &Array, options: &WriteOptions, descriptor: &ColumnDescriptor, -) -> Result { +) -> Result { // using plain encoding format match array { - Array::Int32(array) => primitive::array_to_page_v1(&array, options, descriptor), - Array::Int64(array) => primitive::array_to_page_v1(&array, options, descriptor), - Array::Int96(array) => primitive::array_to_page_v1(&array, options, descriptor), - Array::Float32(array) => primitive::array_to_page_v1(&array, options, descriptor), - Array::Float64(array) => primitive::array_to_page_v1(&array, options, descriptor), + Array::Int32(array) => primitive::array_to_page_v1(array, options, descriptor), + Array::Int64(array) => primitive::array_to_page_v1(array, options, descriptor), + Array::Int96(array) => primitive::array_to_page_v1(array, options, descriptor), + Array::Float32(array) => primitive::array_to_page_v1(array, options, descriptor), + Array::Float64(array) => primitive::array_to_page_v1(array, options, descriptor), _ => todo!(), } } @@ -33,7 +31,7 @@ mod tests { use parquet::error::Result; use parquet::metadata::SchemaDescriptor; use parquet::statistics::Statistics; - use parquet::write::{write_file, DynIter, Version}; + use parquet::write::{write_file, Compressor, DynIter, DynStreamingIterator, Version}; use super::*; @@ -67,9 +65,13 @@ mod tests { let a = schema.columns(); - let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(DynIter::new( - std::iter::once(array_to_page(&array, &options, &a[0])), - )))))); + let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok( + DynStreamingIterator::new(Compressor::new_from_vec( + DynIter::new(std::iter::once(array_to_page(&array, &options, &a[0]))), + options.compression, + vec![], + )), + ))))); let mut writer = Cursor::new(vec![]); write_file(&mut writer, row_groups, schema, options, None, None)?; @@ -140,7 +142,7 @@ mod tests2 { error::Result, metadata::SchemaDescriptor, read::read_metadata, - write::{write_file, DynIter, Version}, + write::{write_file, Compressor, DynIter, DynStreamingIterator, Version}, }; #[test] @@ -163,9 +165,17 @@ mod tests2 { let schema = SchemaDescriptor::try_from_message("message schema { OPTIONAL INT32 col; }")?; - let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok(DynIter::new( - std::iter::once(array_to_page_v1(&array, &options, &schema.columns()[0])), - )))))); + let row_groups = std::iter::once(Ok(DynIter::new(std::iter::once(Ok( + DynStreamingIterator::new(Compressor::new_from_vec( + DynIter::new(std::iter::once(array_to_page_v1( + &array, + &options, + &schema.columns()[0], + ))), + options.compression, + vec![], + )), + ))))); let mut writer = Cursor::new(vec![]); write_file(&mut writer, row_groups, schema, options, None, None)?; diff --git a/integration-tests/src/write/primitive.rs b/integration-tests/src/write/primitive.rs index e9b092a31..e99cd4d28 100644 --- a/integration-tests/src/write/primitive.rs +++ b/integration-tests/src/write/primitive.rs @@ -1,11 +1,11 @@ use parquet::{ encoding::Encoding, metadata::ColumnDescriptor, - page::{CompressedDataPage, CompressedPage, DataPageHeader, DataPageHeaderV1}, + page::{DataPage, DataPageHeader, DataPageHeaderV1, EncodedPage}, statistics::{serialize_statistics, PrimitiveStatistics, Statistics}, types::NativeType, write::WriteOptions, - {compression::create_codec, encoding::hybrid_rle::encode_bool, error::Result}, + {encoding::hybrid_rle::encode_bool, error::Result}, }; fn unzip_option(array: &[Option]) -> Result<(Vec, Vec)> { @@ -44,22 +44,10 @@ pub fn array_to_page_v1( array: &[Option], options: &WriteOptions, descriptor: &ColumnDescriptor, -) -> Result { +) -> Result { let (values, mut buffer) = unzip_option(array)?; buffer.extend_from_slice(&values); - let uncompressed_page_size = buffer.len(); - - let codec = create_codec(&options.compression)?; - let buffer = if let Some(mut codec) = codec { - // todo: remove this allocation by extending `buffer` directly. - // needs refactoring `compress`'s API. - let mut tmp = vec![]; - codec.compress(&values, &mut tmp)?; - tmp - } else { - buffer - }; let statistics = if options.write_statistics { let statistics = &PrimitiveStatistics { @@ -82,11 +70,9 @@ pub fn array_to_page_v1( statistics, }; - Ok(CompressedPage::Data(CompressedDataPage::new( + Ok(EncodedPage::Data(DataPage::new( DataPageHeader::V1(header), buffer, - options.compression, - uncompressed_page_size, None, descriptor.clone(), ))) diff --git a/src/compression.rs b/src/compression.rs index 63fd606d9..c073ff0db 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -274,33 +274,25 @@ mod tests { fn test_roundtrip(c: Compression, data: &[u8]) { let mut c1 = create_codec(&c).unwrap().unwrap(); - let mut c2 = create_codec(&c).unwrap().unwrap(); - // Compress with c1 - let mut compressed = Vec::new(); + let offset = 2; + + // Compress to a buffer that already has data is possible + let mut compressed = vec![2; offset]; c1.compress(data, &mut compressed) .expect("Error when compressing"); - // Decompress with c2 - let mut decompressed = vec![0; data.len()]; - c2.decompress(compressed.as_slice(), &mut decompressed) - .expect("Error when decompressing"); - assert_eq!(data, decompressed.as_slice()); - - compressed.clear(); - - // Compress with c2 - c2.compress(data, &mut compressed) - .expect("Error when compressing"); + // data is compressed... + assert!(compressed.len() - 2 < data.len()); - // Decompress with c1 - c1.decompress(compressed.as_slice(), &mut decompressed) + let mut decompressed = vec![0; data.len()]; + c1.decompress(&compressed[offset..], &mut decompressed) .expect("Error when decompressing"); assert_eq!(data, decompressed.as_slice()); } fn test_codec(c: Compression) { - let sizes = vec![100, 10000, 100000]; + let sizes = vec![10000, 100000]; for size in sizes { let data = (0..size).map(|x| (x % 255) as u8).collect::>(); test_roundtrip(c, &data); diff --git a/src/error.rs b/src/error.rs index e416ff590..096f96f13 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,13 +28,6 @@ impl std::fmt::Display for ParquetError { } } -impl ParquetError { - /// Wraps an external error in an `ParquetError`. - pub fn from_external_error(error: impl std::error::Error + Send + Sync + 'static) -> Self { - Self::External("".to_string(), Arc::new(error)) - } -} - #[cfg(feature = "snappy")] impl From for ParquetError { fn from(e: snap::Error) -> ParquetError { diff --git a/src/lib.rs b/src/lib.rs index b57525c55..bddd62b8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,9 @@ pub mod statistics; pub mod types; pub mod write; +pub use streaming_decompression::fallible_streaming_iterator; +pub use streaming_decompression::FallibleStreamingIterator; + const FOOTER_SIZE: u64 = 8; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; diff --git a/src/page/mod.rs b/src/page/mod.rs index 77558df8a..e8a949929 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -105,10 +105,10 @@ impl DataPageHeader { /// and thus cloning it is expensive. #[derive(Debug, Clone)] pub struct DataPage { - header: DataPageHeader, + pub(super) header: DataPageHeader, pub(super) buffer: Vec, - dictionary_page: Option>, - descriptor: ColumnDescriptor, + pub(super) dictionary_page: Option>, + pub(super) descriptor: ColumnDescriptor, } impl DataPage { @@ -138,6 +138,12 @@ impl DataPage { &self.buffer } + /// Returns a mutable reference to the internal buffer. + /// Useful to recover the buffer after the page has been decoded. + pub fn buffer_mut(&mut self) -> &mut Vec { + &mut self.buffer + } + pub fn num_values(&self) -> usize { self.header.num_values() } @@ -190,6 +196,14 @@ pub enum Page { Dict(Arc), } +/// A [`EncodedPage`] is an uncompressed, encoded representation of a Parquet page. It may hold actual data +/// and thus cloning it may be expensive. +#[derive(Debug)] +pub enum EncodedPage { + Data(DataPage), + Dict(EncodedDictPage), +} + /// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data /// and thus cloning it is expensive. #[derive(Debug)] @@ -198,6 +212,15 @@ pub enum CompressedPage { Dict(CompressedDictPage), } +impl CompressedPage { + pub(crate) fn buffer(&mut self) -> &mut Vec { + match self { + CompressedPage::Data(page) => &mut page.buffer, + CompressedPage::Dict(page) => &mut page.buffer, + } + } +} + /// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages. #[inline] pub fn split_buffer_v1(buffer: &[u8], has_rep: bool, has_def: bool) -> (&[u8], &[u8], &[u8]) { diff --git a/src/page/page_dict/binary.rs b/src/page/page_dict/binary.rs index 99a1feeb2..b321fc1d4 100644 --- a/src/page/page_dict/binary.rs +++ b/src/page/page_dict/binary.rs @@ -53,7 +53,7 @@ fn read_plain(bytes: &[u8], length: usize) -> (Vec, Vec) { (values, offsets) } -pub fn read(buf: &[u8], num_values: u32) -> Result> { - let (values, offsets) = read_plain(buf, num_values as usize); +pub fn read(buf: &[u8], num_values: usize) -> Result> { + let (values, offsets) = read_plain(buf, num_values); Ok(Arc::new(BinaryPageDict::new(values, offsets))) } diff --git a/src/page/page_dict/fixed_len_binary.rs b/src/page/page_dict/fixed_len_binary.rs index 32771f96c..4cd735f9b 100644 --- a/src/page/page_dict/fixed_len_binary.rs +++ b/src/page/page_dict/fixed_len_binary.rs @@ -44,8 +44,8 @@ fn read_plain(bytes: &[u8], size: usize, length: usize) -> Vec { bytes[..size * length].to_vec() } -pub fn read(buf: &[u8], size: i32, num_values: u32) -> Result> { - let values = read_plain(buf, size as usize, num_values as usize); +pub fn read(buf: &[u8], size: i32, num_values: usize) -> Result> { + let values = read_plain(buf, size as usize, num_values); Ok(Arc::new(FixedLenByteArrayPageDict::new( values, PhysicalType::FixedLenByteArray(size), diff --git a/src/page/page_dict/mod.rs b/src/page/page_dict/mod.rs index 5b8f57722..3d3df1166 100644 --- a/src/page/page_dict/mod.rs +++ b/src/page/page_dict/mod.rs @@ -19,7 +19,20 @@ pub trait DictPage: std::fmt::Debug + Send + Sync { fn physical_type(&self) -> &PhysicalType; } -/// A compressed dictionary page. +/// A encoded and uncompressed dictionary page. +#[derive(Debug)] +pub struct EncodedDictPage { + pub(crate) buffer: Vec, + pub(crate) num_values: usize, +} + +impl EncodedDictPage { + pub fn new(buffer: Vec, num_values: usize) -> Self { + Self { buffer, num_values } + } +} + +/// An encoded and compressed dictionary page. #[derive(Debug)] pub struct CompressedDictPage { pub(crate) buffer: Vec, @@ -33,8 +46,7 @@ impl CompressedDictPage { } pub fn read_dict_page( - buf: &[u8], - num_values: u32, + page: &EncodedDictPage, compression: (Compression, usize), is_sorted: bool, physical_type: &PhysicalType, @@ -42,16 +54,16 @@ pub fn read_dict_page( let decompressor = create_codec(&compression.0)?; if let Some(mut decompressor) = decompressor { let mut decompressed = vec![0; compression.1]; - decompressor.decompress(buf, &mut decompressed)?; - deserialize(&decompressed, num_values, is_sorted, physical_type) + decompressor.decompress(&page.buffer, &mut decompressed)?; + deserialize(&decompressed, page.num_values, is_sorted, physical_type) } else { - deserialize(buf, num_values, is_sorted, physical_type) + deserialize(&page.buffer, page.num_values, is_sorted, physical_type) } } fn deserialize( buf: &[u8], - num_values: u32, + num_values: usize, is_sorted: bool, physical_type: &PhysicalType, ) -> Result> { diff --git a/src/page/page_dict/primitive.rs b/src/page/page_dict/primitive.rs index 067a76504..b966c31dc 100644 --- a/src/page/page_dict/primitive.rs +++ b/src/page/page_dict/primitive.rs @@ -39,10 +39,10 @@ fn read_plain(values: &[u8]) -> Vec { pub fn read( buf: &[u8], - num_values: u32, + num_values: usize, _is_sorted: bool, ) -> Result> { - let typed_size = num_values as usize * std::mem::size_of::(); + let typed_size = num_values * std::mem::size_of::(); let values = read_plain::(&buf[..typed_size]); Ok(Arc::new(PrimitivePageDict::new(values))) } diff --git a/src/read/compression.rs b/src/read/compression.rs index 541a1af33..c304e1d9d 100644 --- a/src/read/compression.rs +++ b/src/read/compression.rs @@ -1,10 +1,13 @@ use parquet_format_async_temp::DataPageHeaderV2; +use streaming_decompression; use crate::compression::{create_codec, Codec}; -use crate::error::Result; - -use super::{PageIterator, StreamingIterator}; +use crate::error::{ParquetError, Result}; use crate::page::{CompressedDataPage, DataPage, DataPageHeader}; +use crate::parquet_bridge::Compression; +use crate::FallibleStreamingIterator; + +use super::PageIterator; fn decompress_v1(compressed: &[u8], decompressor: &mut dyn Codec, buffer: &mut [u8]) -> Result<()> { decompressor.decompress(compressed, buffer) @@ -37,9 +40,9 @@ fn decompress_v2( Ok(()) } -/// decompresses a page. -/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved. -/// Else, decompression took place. +/// decompresses a [`CompressedDataPage`] into `buffer`. +/// If the page is un-compressed, `buffer` is swapped instead. +/// Returns whether the page was decompressed. pub fn decompress_buffer( compressed_page: &mut CompressedDataPage, buffer: &mut Vec, @@ -47,8 +50,10 @@ pub fn decompress_buffer( let codec = create_codec(&compressed_page.compression())?; if let Some(mut codec) = codec { - // the buffer must be decompressed; do it so now, writing the decompressed data into `buffer` let compressed_buffer = &compressed_page.buffer; + + // prepare the compression buffer + buffer.clear(); buffer.resize(compressed_page.uncompressed_size(), 0); match compressed_page.header() { DataPageHeader::V1(_) => decompress_v1(compressed_buffer, codec.as_mut(), buffer)?, @@ -85,8 +90,7 @@ fn decompress_reuse( mut compressed_page: CompressedDataPage, iterator: &mut PageIterator, buffer: &mut Vec, - decompressions: &mut usize, -) -> Result { +) -> Result<(DataPage, bool)> { let was_decompressed = decompress_buffer(&mut compressed_page, buffer)?; let new_page = DataPage::new( @@ -97,66 +101,157 @@ fn decompress_reuse( ); if was_decompressed { - *decompressions += 1; - } else { iterator.reuse_buffer(compressed_page.buffer) - } - Ok(new_page) + }; + Ok((new_page, was_decompressed)) } -/// Decompressor that allows re-using the page buffer of [`PageIterator`] +/// Decompressor that allows re-using the page buffer of [`PageIterator`]. +/// # Implementation +/// The implementation depends on whether a page is compressed or not. +/// > `PageIterator(a)`, `CompressedPage(b)`, `Decompressor(c)`, `DecompressedPage(d)` +/// ### un-compressed pages: +/// > page iter: `a` is swapped with `b` +/// > decompress iter: `b` is swapped with `d`, `b` is swapped with `a` +/// therefore: +/// * `PageIterator` has its buffer back +/// * `Decompressor`'s buffer is un-used +/// * `DecompressedPage` has the same data as `CompressedPage` had +/// ### compressed pages: +/// > page iter: `a` is swapped with `b` +/// > decompress iter: +/// > * `b` is decompressed into `c` +/// > * `b` is swapped with `a` +/// > * `c` is moved to `d` +/// > * (next iteration): `d` is moved to `c` +/// therefore, while the page is available: +/// * `PageIterator` has its buffer back +/// * `Decompressor`'s buffer empty +/// * `DecompressedPage` has the decompressed buffer +/// after the page is used: +/// * `PageIterator` has its buffer back +/// * `Decompressor` has its buffer back +/// * `DecompressedPage` has an empty buffer pub struct Decompressor<'a, R: std::io::Read> { iter: PageIterator<'a, R>, buffer: Vec, - current: Option>, - decompressions: usize, + current: Option, + was_decompressed: bool, } impl<'a, R: std::io::Read> Decompressor<'a, R> { + /// Creates a new [`Decompressor`]. pub fn new(iter: PageIterator<'a, R>, buffer: Vec) -> Self { Self { iter, buffer, current: None, - decompressions: 0, + was_decompressed: false, } } - pub fn into_buffers(mut self) -> (Vec, Vec) { - let mut a = self - .current - .map(|x| x.map(|x| x.buffer).unwrap_or_else(|_| Vec::new())) - .unwrap_or(self.iter.buffer); - - if self.decompressions % 2 == 0 { - std::mem::swap(&mut a, &mut self.buffer) - }; - (a, self.buffer) + /// Returns two buffers: the first buffer corresponds to the page buffer, + /// the second to the decompression buffer. + pub fn into_buffers(self) -> (Vec, Vec) { + let page_buffer = self.iter.into_buffer(); + (page_buffer, self.buffer) } } -impl<'a, R: std::io::Read> StreamingIterator for Decompressor<'a, R> { - type Item = Result; +impl<'a, R: std::io::Read> FallibleStreamingIterator for Decompressor<'a, R> { + type Item = DataPage; + type Error = ParquetError; - fn advance(&mut self) { - if let Some(Ok(page)) = self.current.as_mut() { - self.buffer = std::mem::take(&mut page.buffer); + fn advance(&mut self) -> Result<()> { + if let Some(page) = self.current.as_mut() { + if self.was_decompressed { + self.buffer = std::mem::take(&mut page.buffer); + } else { + self.iter.reuse_buffer(std::mem::take(&mut page.buffer)); + } } - let next = self.iter.next().map(|x| { - x.and_then(|x| { - decompress_reuse( - x, - &mut self.iter, - &mut self.buffer, - &mut self.decompressions, - ) + let next = self + .iter + .next() + .map(|x| { + x.and_then(|x| { + let (page, was_decompressed) = + decompress_reuse(x, &mut self.iter, &mut self.buffer)?; + self.was_decompressed = was_decompressed; + Ok(page) + }) }) - }); + .transpose()?; self.current = next; + Ok(()) } fn get(&self) -> Option<&Self::Item> { self.current.as_ref() } } + +type _Decompressor = streaming_decompression::Decompressor< + CompressedDataPage, + DataPage, + fn(CompressedDataPage, &mut Vec) -> Result, + ParquetError, + I, +>; + +impl streaming_decompression::Compressed for CompressedDataPage { + #[inline] + fn is_compressed(&self) -> bool { + self.compression() != Compression::Uncompressed + } +} + +impl streaming_decompression::Decompressed for DataPage { + #[inline] + fn buffer_mut(&mut self) -> &mut Vec { + self.buffer_mut() + } +} + +/// A [`FallibleStreamingIterator`] that decompresses [`CompressedDataPage`] into [`DataPage`]. +/// # Implementation +/// This decompressor uses an internal [`Vec`] to perform decompressions which +/// is re-used across pages, so that a single allocation is required. +/// If the pages are not compressed, the internal buffer is not used. +pub struct BasicDecompressor>> { + iter: _Decompressor, +} + +impl BasicDecompressor +where + I: Iterator>, +{ + /// Returns a new [`BasicDecompressor`]. + pub fn new(iter: I, buffer: Vec) -> Self { + Self { + iter: _Decompressor::new(iter, buffer, decompress), + } + } + + /// Returns its internal buffer, consuming itself. + pub fn into_inner(self) -> Vec { + self.iter.into_inner() + } +} + +impl FallibleStreamingIterator for BasicDecompressor +where + I: Iterator>, +{ + type Item = DataPage; + type Error = ParquetError; + + fn advance(&mut self) -> Result<()> { + self.iter.advance() + } + + fn get(&self) -> Option<&Self::Item> { + self.iter.get() + } +} diff --git a/src/read/mod.rs b/src/read/mod.rs index 56102bf9f..c85b7d43a 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -7,25 +7,20 @@ mod page_stream; #[cfg(feature = "stream")] mod stream; -pub use streaming_iterator; -pub use streaming_iterator::StreamingIterator; - -pub use compression::{decompress, Decompressor}; +use std::io::{Read, Seek, SeekFrom}; +use std::sync::Arc; +pub use compression::{decompress, BasicDecompressor, Decompressor}; pub use metadata::read_metadata; +pub use page_iterator::{PageFilter, PageIterator}; +#[cfg(feature = "stream")] +pub use page_stream::get_page_stream; #[cfg(feature = "stream")] pub use stream::read_metadata as read_metadata_async; -use std::io::{Read, Seek, SeekFrom}; -use std::sync::Arc; - use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::{error::Result, metadata::FileMetaData}; -pub use page_iterator::{PageFilter, PageIterator}; -#[cfg(feature = "stream")] -pub use page_stream::get_page_stream; - /// Filters row group metadata to only those row groups, /// for which the predicate function returns true pub fn filter_row_groups( @@ -67,6 +62,8 @@ pub fn get_page_iterator<'a, RR: Read + Seek>( mod tests { use std::fs::File; + use crate::FallibleStreamingIterator; + use super::*; use crate::tests::get_path; @@ -93,7 +90,7 @@ mod tests { #[test] fn reuse_buffer() -> Result<()> { let mut testdata = get_path(); - testdata.push("alltypes_plain.parquet"); + testdata.push("alltypes_plain.snappy.parquet"); let mut file = File::open(testdata).unwrap(); let metadata = read_metadata(&mut file)?; @@ -102,13 +99,17 @@ mod tests { let column = 0; let column_metadata = metadata.row_groups[row_group].column(column); let buffer = vec![0]; - let mut iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?; + let iterator = get_page_iterator(column_metadata, &mut file, None, buffer)?; - let page = iterator.next().unwrap().unwrap(); - iterator.reuse_buffer(page.buffer); + let buffer = vec![]; + let mut iterator = Decompressor::new(iterator, buffer); - assert!(iterator.next().is_none()); - assert!(!iterator.buffer.is_empty()); + let _ = iterator.next()?.unwrap(); + + assert!(iterator.next()?.is_none()); + let (a, b) = iterator.into_buffers(); + assert_eq!(a.len(), 11); // note: compressed is higher in this example. + assert_eq!(b.len(), 9); Ok(()) } @@ -130,9 +131,9 @@ mod tests { let buffer = vec![]; let mut iterator = Decompressor::new(iterator, buffer); - iterator.next().unwrap().as_ref().unwrap(); + iterator.next()?.unwrap(); - assert!(iterator.next().is_none()); + assert!(iterator.next()?.is_none()); let (a, b) = iterator.into_buffers(); assert_eq!(a.len(), 11); diff --git a/src/read/page_iterator.rs b/src/read/page_iterator.rs index dc3bcf602..926c8b445 100644 --- a/src/read/page_iterator.rs +++ b/src/read/page_iterator.rs @@ -8,7 +8,8 @@ use crate::error::Result; use crate::metadata::ColumnDescriptor; use crate::page::{ - read_dict_page, CompressedDataPage, DataPageHeader, DictPage, PageType, ParquetPageHeader, + read_dict_page, CompressedDataPage, DataPageHeader, DictPage, EncodedDictPage, PageType, + ParquetPageHeader, }; /// Type declaration for a page filter @@ -70,6 +71,10 @@ impl<'a, R: Read> PageIterator<'a, R> { pub fn reuse_buffer(&mut self, buffer: Vec) { self.buffer = buffer; } + + pub fn into_buffer(self) -> Vec { + self.buffer + } } impl<'a, R: Read> Iterator for PageIterator<'a, R> { @@ -128,6 +133,7 @@ fn build_page( let read_size = page_header.compressed_page_size as usize; if read_size > 0 { + buffer.clear(); buffer.resize(read_size, 0); reader.reader.read_exact(buffer)?; } @@ -169,13 +175,18 @@ pub(super) fn finish_page( let dict_header = page_header.dictionary_page_header.as_ref().unwrap(); let is_sorted = dict_header.is_sorted.unwrap_or(false); + // move the buffer to `dict_page` + let mut dict_page = + EncodedDictPage::new(std::mem::take(buffer), dict_header.num_values as usize); + let page = read_dict_page( - buffer, - dict_header.num_values as u32, + &dict_page, (compression, page_header.uncompressed_page_size as usize), is_sorted, descriptor.physical_type(), )?; + // take the buffer out of the `dict_page` to re-use it + std::mem::swap(&mut dict_page.buffer, buffer); Ok(FinishedPage::Dict(page)) } diff --git a/src/write/column_chunk.rs b/src/write/column_chunk.rs index 4b3f946ce..662668181 100644 --- a/src/write/column_chunk.rs +++ b/src/write/column_chunk.rs @@ -1,6 +1,6 @@ +use std::collections::HashSet; use std::convert::TryInto; use std::io::Write; -use std::{collections::HashSet, error::Error}; use futures::AsyncWrite; use parquet_format_async_temp::thrift::protocol::{ @@ -9,6 +9,7 @@ use parquet_format_async_temp::thrift::protocol::{ use parquet_format_async_temp::{ColumnChunk, ColumnMetaData}; use crate::statistics::serialize_statistics; +use crate::FallibleStreamingIterator; use crate::{ compression::Compression, encoding::Encoding, @@ -20,32 +21,30 @@ use crate::{ use super::page::{write_page, write_page_async, PageWriteSpec}; use super::statistics::reduce; +use super::DynStreamingIterator; -pub fn write_column_chunk< - W: Write, - I: Iterator>, - E: Error + Send + Sync + 'static, ->( +pub fn write_column_chunk<'a, W, E>( writer: &mut W, mut offset: u64, descriptor: &ColumnDescriptor, compression: Compression, - compressed_pages: I, -) -> Result<(ColumnChunk, u64)> { + mut compressed_pages: DynStreamingIterator<'a, CompressedPage, E>, +) -> Result<(ColumnChunk, u64)> +where + W: Write, + ParquetError: From, + E: std::error::Error, +{ // write every page let initial = offset; - let specs = compressed_pages - .map(|compressed_page| { - let spec = write_page( - writer, - offset, - compressed_page.map_err(ParquetError::from_external_error)?, - )?; - offset += spec.bytes_written; - Ok(spec) - }) - .collect::>>()?; + + let mut specs = vec![]; + while let Some(compressed_page) = compressed_pages.next()? { + let spec = write_page(writer, offset, compressed_page)?; + offset += spec.bytes_written; + specs.push(spec); + } let mut bytes_written = offset - initial; let column_chunk = build_column_chunk(&specs, descriptor, compression)?; @@ -58,27 +57,23 @@ pub fn write_column_chunk< Ok((column_chunk, bytes_written)) } -pub async fn write_column_chunk_async< - W: AsyncWrite + Unpin + Send, - I: Iterator>, - E: Error + Send + Sync + 'static, ->( +pub async fn write_column_chunk_async( writer: &mut W, mut offset: u64, descriptor: &ColumnDescriptor, compression: Compression, - compressed_pages: I, -) -> Result<(ColumnChunk, usize)> { + mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>, +) -> Result<(ColumnChunk, usize)> +where + W: AsyncWrite + Unpin + Send, + ParquetError: From, + E: std::error::Error, +{ let initial = offset; // write every page let mut specs = vec![]; - for compressed_page in compressed_pages { - let spec = write_page_async( - writer, - offset, - compressed_page.map_err(ParquetError::from_external_error)?, - ) - .await?; + while let Some(compressed_page) = compressed_pages.next()? { + let spec = write_page_async(writer, offset, compressed_page).await?; offset += spec.bytes_written; specs.push(spec); } diff --git a/src/write/compression.rs b/src/write/compression.rs index 03ab5f104..bfa7b6e35 100644 --- a/src/write/compression.rs +++ b/src/write/compression.rs @@ -1,49 +1,131 @@ -use crate::error::Result; +use crate::error::{ParquetError, Result}; +use crate::page::{CompressedDictPage, CompressedPage, DataPageHeader, EncodedDictPage}; +use crate::parquet_bridge::Compression; +use crate::FallibleStreamingIterator; use crate::{ - compression::{create_codec, Codec}, - read::{CompressedDataPage, Page, PageHeader}, + compression::create_codec, + page::{CompressedDataPage, DataPage, EncodedPage}, }; -fn compress_(buffer: &[u8], decompressor: &mut dyn Codec) -> Result> { - let mut compressed_buffer = Vec::new(); - decompressor.compress(buffer, &mut compressed_buffer)?; - Ok(compressed_buffer) -} - -fn compress_v1(mut page: PageV1, codec: &mut dyn Codec) -> Result { - page.buffer = compress_(&page.buffer, codec)?; - Ok(page) +/// Compresses a [`DataPage`] into a [`CompressedDataPage`]. +fn compress_data( + page: DataPage, + mut compressed_buffer: Vec, + compression: Compression, +) -> Result { + let DataPage { + mut buffer, + header, + dictionary_page, + descriptor, + } = page; + let uncompressed_page_size = buffer.len(); + let codec = create_codec(&compression)?; + if let Some(mut codec) = codec { + match &header { + DataPageHeader::V1(_) => { + codec.compress(&buffer, &mut compressed_buffer)?; + } + DataPageHeader::V2(header) => { + let levels_byte_length = (header.repetition_levels_byte_length + + header.definition_levels_byte_length) + as usize; + compressed_buffer.extend_from_slice(&buffer[..levels_byte_length]); + codec.compress(&buffer[levels_byte_length..], &mut compressed_buffer)?; + } + }; + } else { + std::mem::swap(&mut buffer, &mut compressed_buffer); + }; + Ok(CompressedDataPage::new( + header, + compressed_buffer, + compression, + uncompressed_page_size, + dictionary_page, + descriptor, + )) } -fn compress_v2(mut page: PageV2, codec: &mut dyn Codec) -> Result { - // only values are compressed in v2: - // [ ] -> [ ] - let prefix = (page.header.repetition_levels_byte_length - + page.header.definition_levels_byte_length) as usize; - let compressed_values = compress_(&page.buffer[prefix..], codec)?; - page.buffer.truncate(prefix); - page.buffer.extend(compressed_values); - Ok(page) +fn compress_dict( + page: EncodedDictPage, + mut compressed_buffer: Vec, + compression: Compression, +) -> Result { + let EncodedDictPage { + mut buffer, + num_values, + } = page; + let codec = create_codec(&compression)?; + if let Some(mut codec) = codec { + codec.compress(&buffer, &mut compressed_buffer)?; + } else { + std::mem::swap(&mut buffer, &mut compressed_buffer); + } + Ok(CompressedDictPage::new(compressed_buffer, num_values)) } -/// decompresses a page in place. This only changes the pages' internal buffer. -pub fn compress(page: Page) -> Result { +pub fn compress( + page: EncodedPage, + compressed_buffer: Vec, + compression: Compression, +) -> Result { match page { - Page::V1(page) => { - let codec = create_codec(&page.compression)?; - if let Some(mut codec) = codec { - compress_v1(page, codec.as_mut()).map(CompressedDataPage::V1) - } else { - Ok(CompressedDataPage::V1(page)) - } + EncodedPage::Data(page) => { + compress_data(page, compressed_buffer, compression).map(CompressedPage::Data) } - Page::V2(page) => { - let codec = create_codec(&page.compression)?; - if let Some(mut codec) = codec { - compress_v2(page, codec.as_mut()).map(CompressedDataPage::V2) - } else { - Ok(CompressedDataPage::V2(page)) - } + EncodedPage::Dict(page) => { + compress_dict(page, compressed_buffer, compression).map(CompressedPage::Dict) + } + } +} + +/// A [`FallibleStreamingIterator`] that consumes [`EncodedPage`] and yields [`CompressedPage`] +/// holding a reusable buffer ([`Vec`]) for compression. +pub struct Compressor>> { + iter: I, + compression: Compression, + buffer: Vec, + current: Option, +} + +impl>> Compressor { + pub fn new_from_vec(iter: I, compression: Compression, buffer: Vec) -> Self { + Self::new(iter, compression, buffer) + } + + pub fn new(iter: I, compression: Compression, buffer: Vec) -> Self { + Self { + iter, + compression, + buffer, + current: None, } } } + +impl>> FallibleStreamingIterator for Compressor { + type Item = CompressedPage; + type Error = ParquetError; + + fn advance(&mut self) -> std::result::Result<(), Self::Error> { + let mut compressed_buffer = if let Some(page) = self.current.as_mut() { + std::mem::take(page.buffer()) + } else { + std::mem::take(&mut self.buffer) + }; + compressed_buffer.clear(); + + let next = self + .iter + .next() + .map(|x| x.and_then(|page| compress(page, compressed_buffer, self.compression))) + .transpose()?; + self.current = next; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + self.current.as_ref() + } +} diff --git a/src/write/dyn_iter.rs b/src/write/dyn_iter.rs index 8a6ae2646..c35934143 100644 --- a/src/write/dyn_iter.rs +++ b/src/write/dyn_iter.rs @@ -1,3 +1,5 @@ +use crate::FallibleStreamingIterator; + /// [`DynIter`] is an implementation of a single-threaded, dynamically-typed iterator. pub struct DynIter<'a, V> { iter: Box + 'a>, @@ -24,3 +26,36 @@ impl<'a, V> DynIter<'a, V> { } } } + +/// Dynamically-typed [`FallibleStreamingIterator`]. +pub struct DynStreamingIterator<'a, V, E> { + iter: Box + 'a>, +} + +impl<'a, V, E> FallibleStreamingIterator for DynStreamingIterator<'a, V, E> { + type Item = V; + type Error = E; + + fn advance(&mut self) -> Result<(), Self::Error> { + self.iter.advance() + } + + fn get(&self) -> Option<&Self::Item> { + self.iter.get() + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} + +impl<'a, V, E> DynStreamingIterator<'a, V, E> { + pub fn new(iter: I) -> Self + where + I: FallibleStreamingIterator + 'a, + { + Self { + iter: Box::new(iter), + } + } +} diff --git a/src/write/file.rs b/src/write/file.rs index 38d3ff949..291d00f6b 100644 --- a/src/write/file.rs +++ b/src/write/file.rs @@ -1,4 +1,4 @@ -use std::{error::Error, io::Write}; +use std::io::Write; use parquet_format_async_temp::FileMetaData; @@ -48,7 +48,8 @@ pub fn write_file<'a, W, I, E>( where W: Write, I: Iterator, E>>, - E: Error + Send + Sync + 'static, + ParquetError: From, + E: std::error::Error, { let mut offset = start_file(writer)? as u64; @@ -59,7 +60,7 @@ where offset, schema.columns(), options.compression, - row_group.map_err(ParquetError::from_external_error)?, + row_group?, )?; offset += size; Ok(group) diff --git a/src/write/mod.rs b/src/write/mod.rs index e82727e6c..9af7bcb9d 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -1,4 +1,5 @@ mod column_chunk; +mod compression; mod file; mod page; mod row_group; @@ -10,7 +11,9 @@ pub mod stream; mod stream_stream; mod dyn_iter; -pub use dyn_iter::DynIter; +pub use dyn_iter::{DynIter, DynStreamingIterator}; + +pub use compression::{compress, Compressor}; pub use file::write_file; @@ -18,7 +21,7 @@ use crate::compression::Compression; use crate::page::CompressedPage; pub type RowGroupIter<'a, E> = - DynIter<'a, std::result::Result>, E>>; + DynIter<'a, std::result::Result, E>>; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct WriteOptions { diff --git a/src/write/page.rs b/src/write/page.rs index a33786bb2..80fb54f36 100644 --- a/src/write/page.rs +++ b/src/write/page.rs @@ -25,7 +25,7 @@ pub struct PageWriteSpec { pub fn write_page( writer: &mut W, offset: u64, - compressed_page: CompressedPage, + compressed_page: &CompressedPage, ) -> Result { let header = match &compressed_page { CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page), @@ -63,7 +63,7 @@ pub fn write_page( pub async fn write_page_async( writer: &mut W, offset: u64, - compressed_page: CompressedPage, + compressed_page: &CompressedPage, ) -> Result { let header = match &compressed_page { CompressedPage::Data(compressed_page) => assemble_data_page_header(compressed_page), diff --git a/src/write/row_group.rs b/src/write/row_group.rs index 79fc75f6b..c165ef171 100644 --- a/src/write/row_group.rs +++ b/src/write/row_group.rs @@ -1,4 +1,4 @@ -use std::{error::Error, io::Write}; +use std::io::Write; use futures::AsyncWrite; use parquet_format_async_temp::RowGroup; @@ -12,7 +12,7 @@ use crate::{ use super::{ column_chunk::{write_column_chunk, write_column_chunk_async}, - DynIter, + DynIter, DynStreamingIterator, }; fn same_elements(arr: &[T]) -> Option> { @@ -28,6 +28,7 @@ fn same_elements(arr: &[T]) -> Option> { } pub fn write_row_group< + 'a, W, E, // external error any of the iterators may emit >( @@ -35,24 +36,20 @@ pub fn write_row_group< mut offset: u64, descriptors: &[ColumnDescriptor], compression: Compression, - columns: DynIter>, E>>, + columns: DynIter<'a, std::result::Result, E>>, ) -> Result<(RowGroup, u64)> where W: Write, - E: Error + Send + Sync + 'static, + ParquetError: From, + E: std::error::Error, { let column_iter = descriptors.iter().zip(columns); let initial = offset; let columns = column_iter .map(|(descriptor, page_iter)| { - let (column, size) = write_column_chunk( - writer, - offset, - descriptor, - compression, - page_iter.map_err(ParquetError::from_external_error)?, - )?; + let (column, size) = + write_column_chunk(writer, offset, descriptor, compression, page_iter?)?; offset += size; Ok(column) }) @@ -98,28 +95,20 @@ pub async fn write_row_group_async< mut offset: u64, descriptors: &[ColumnDescriptor], compression: Compression, - columns: DynIter< - 'a, - std::result::Result>, E>, - >, + columns: DynIter<'a, std::result::Result, E>>, ) -> Result<(RowGroup, u64)> where W: AsyncWrite + Unpin + Send, - E: Error + Send + Sync + 'static, + ParquetError: From, + E: std::error::Error, { let column_iter = descriptors.iter().zip(columns); let initial = offset; let mut columns = vec![]; for (descriptor, page_iter) in column_iter { - let (spec, size) = write_column_chunk_async( - writer, - offset, - descriptor, - compression, - page_iter.map_err(ParquetError::from_external_error)?, - ) - .await?; + let (spec, size) = + write_column_chunk_async(writer, offset, descriptor, compression, page_iter?).await?; offset += size as u64; columns.push(spec); } diff --git a/src/write/stream.rs b/src/write/stream.rs index ea11b6e3a..3f951cd7c 100644 --- a/src/write/stream.rs +++ b/src/write/stream.rs @@ -1,9 +1,9 @@ +use std::io::Write; + use futures::stream::Stream; use futures::StreamExt; use futures::TryStreamExt; -use std::{error::Error, io::Write}; - use parquet_format_async_temp::FileMetaData; pub use crate::metadata::KeyValue; @@ -26,7 +26,8 @@ pub async fn write_stream<'a, W, S, E>( where W: Write, S: Stream, E>>, - E: Error + Send + Sync + 'static, + ParquetError: From, + E: std::error::Error, { let mut offset = start_file(writer)? as u64; @@ -37,7 +38,7 @@ where offset, schema.columns(), options.compression, - row_group.map_err(ParquetError::from_external_error)?, + row_group?, )?; offset += size; Result::Ok(group) diff --git a/src/write/stream_stream.rs b/src/write/stream_stream.rs index d8ee31d82..7be386d78 100644 --- a/src/write/stream_stream.rs +++ b/src/write/stream_stream.rs @@ -1,4 +1,4 @@ -use std::{error::Error, io::Write}; +use std::io::Write; use futures::{pin_mut, stream::Stream, AsyncWrite, AsyncWriteExt, StreamExt}; @@ -54,7 +54,8 @@ pub async fn write_stream_stream<'a, W, S, E>( where W: AsyncWrite + Unpin + Send, S: Stream, E>>, - E: Error + Send + Sync + 'static, + ParquetError: From, + E: std::error::Error, { let mut offset = start_file(writer).await?; @@ -67,7 +68,7 @@ where offset, schema.columns(), options.compression, - row_group.map_err(ParquetError::from_external_error)?, + row_group?, ) .await?; offset += size;