From 5bed494782e2f1d4b4b05ec7a1dcd9f73b286ecb Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:03:13 +0200 Subject: [PATCH] make sure that scratch is used in decompressing data --- examples/ipc_file_read.rs | 3 +- src/io/flight/mod.rs | 1 + src/io/ipc/read/array/binary.rs | 5 +++ src/io/ipc/read/array/dictionary.rs | 3 ++ src/io/ipc/read/array/fixed_size_binary.rs | 4 +++ src/io/ipc/read/array/fixed_size_list.rs | 3 ++ src/io/ipc/read/array/list.rs | 4 +++ src/io/ipc/read/array/map.rs | 4 +++ src/io/ipc/read/array/primitive.rs | 4 +++ src/io/ipc/read/array/struct_.rs | 3 ++ src/io/ipc/read/array/union.rs | 5 +++ src/io/ipc/read/array/utf8.rs | 5 +++ src/io/ipc/read/common.rs | 6 ++++ src/io/ipc/read/deserialize.rs | 15 ++++++++ src/io/ipc/read/file_async.rs | 8 +++++ src/io/ipc/read/read_basic.rs | 25 +++++++++----- src/io/ipc/read/reader.rs | 40 +++++++++++++++------- src/io/ipc/read/stream.rs | 7 ++++ src/io/ipc/read/stream_async.rs | 4 +++ 19 files changed, 127 insertions(+), 22 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 27373ece01f..6478b25d484 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { let schema = metadata.schema.clone(); // advanced way: read the dictionary - let dictionaries = read::read_file_dictionaries(&mut file, &metadata, Default::default())?; + let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?; let chunk_index = 0; @@ -44,6 +44,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { None, chunk_index, &mut Default::default(), + &mut Default::default(), )?; Ok((schema, chunk)) diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 8aa4a1de0e1..9ed0835bb84 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -142,6 +142,7 @@ pub fn deserialize_batch( &mut reader, 0, length as u64, + &mut Default::default(), ), _ => Err(Error::nyi( "flight currently only supports reading RecordBatch messages", diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index f68471f699d..d800408247e 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -5,10 +5,12 @@ use crate::array::{BinaryArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +19,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +62,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, )?; BinaryArray::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 6589790c946..fee1d6a046f 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::Dictionaries; use super::super::{Compression, IpcBuffer, Node}; @@ -19,6 +20,7 @@ pub fn read_dictionary( block_offset: u64, compression: Option, is_little_endian: bool, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -47,6 +49,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; Ok(DictionaryArray::::from_data(keys, values)) diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 04029cd7c87..9b03127901a 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -4,10 +4,12 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeBinaryArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +18,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + scratch, )?; FixedSizeBinaryArray::try_new(data_type, values, validity) diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index f8e314fc080..7aa5c30c92e 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeListArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -23,6 +24,7 @@ pub fn read_fixed_size_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +55,7 @@ pub fn read_fixed_size_list( is_little_endian, compression, version, + scratch, )?; FixedSizeListArray::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index f5d2eb5465e..d913c856b64 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -6,6 +6,7 @@ use crate::array::{ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -25,6 +26,7 @@ pub fn read_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -57,6 +59,7 @@ where block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -74,6 +77,7 @@ where is_little_endian, compression, version, + scratch, )?; ListArray::try_new(data_type, offsets, values, validity) } diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 594f8a495b1..fd887d7efb5 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -5,6 +5,7 @@ use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -24,6 +25,7 @@ pub fn read_map( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +55,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![0i32])))?; @@ -70,6 +73,7 @@ pub fn read_map( is_little_endian, compression, version, + scratch, )?; MapArray::try_new(data_type, offsets, field, validity) } diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index c7c7b780d89..1ae4c94bc2a 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -3,11 +3,13 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_primitive( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +18,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -48,6 +51,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; PrimitiveArray::::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index c87440a9782..76894f03c14 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::StructArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -23,6 +24,7 @@ pub fn read_struct( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -57,6 +59,7 @@ pub fn read_struct( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 7c86f5e30bb..4c7ea723004 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -5,6 +5,7 @@ use crate::array::UnionArray; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -24,6 +25,7 @@ pub fn read_union( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -50,6 +52,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?; let offsets = if let DataType::Union(_, _, mode) = data_type { @@ -61,6 +64,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?) } else { None @@ -86,6 +90,7 @@ pub fn read_union( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 8424fbb2e73..1e9b52eacad 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -5,10 +5,12 @@ use crate::array::{Offset, Utf8Array}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_utf8( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +19,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +62,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, )?; Utf8Array::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index dededdb817d..4e52d22ddde 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -85,6 +85,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch @@ -136,6 +137,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, )?)), ProjectionResult::NotSelected((field, _)) => { skip(&mut field_nodes, &field.data_type, &mut buffers)?; @@ -162,6 +164,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, ) }) .collect::>>()? @@ -221,6 +224,7 @@ fn first_dict_field<'a>( /// Read the dictionary from the buffer and provided metadata, /// updating the `dictionaries` with the resulting dictionary +#[allow(clippy::too_many_arguments)] pub fn read_dictionary( batch: arrow_format::ipc::DictionaryBatchRef, fields: &[Field], @@ -229,6 +233,7 @@ pub fn read_dictionary( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result<()> { if batch .is_delta() @@ -270,6 +275,7 @@ pub fn read_dictionary( reader, block_offset, file_size, + scratch, )?; let mut arrays = columns.into_arrays(); arrays.pop().unwrap() diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index f5c8b97c00f..337ee58bd9a 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -7,6 +7,7 @@ use arrow_format::ipc::MetadataVersion; use crate::array::*; use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Result; +use crate::io::ipc::read::common::ReadBuffer; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; @@ -24,6 +25,7 @@ pub fn read( is_little_endian: bool, compression: Option, version: MetadataVersion, + scratch: &mut ReadBuffer, ) -> Result> { use PhysicalType::*; let data_type = field.data_type.clone(); @@ -49,6 +51,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch ) .map(|x| x.boxed()) }), @@ -61,6 +64,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -73,6 +77,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -85,6 +90,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -97,6 +103,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -109,6 +116,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -123,6 +131,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), LargeList => read_list::( @@ -136,6 +145,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), FixedSizeList => read_fixed_size_list( @@ -149,6 +159,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Struct => read_struct( @@ -162,6 +173,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Dictionary(key_type) => { @@ -175,6 +187,7 @@ pub fn read( block_offset, compression, is_little_endian, + scratch ) .map(|x| x.boxed()) }) @@ -190,6 +203,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Map => read_map( @@ -203,6 +217,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 194a41a12e9..59ddd51c6f0 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -81,6 +81,7 @@ impl<'a> FileStream<'a> { let mut meta_buffer = Default::default(); let mut block_buffer = Default::default(); + let mut scratch = Default::default(); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, @@ -90,6 +91,7 @@ impl<'a> FileStream<'a> { block, &mut meta_buffer, &mut block_buffer, + &mut scratch ).await?; let chunk = if let Some((_, map)) = &projection { @@ -148,6 +150,7 @@ where deserialize_footer(&footer, u64::MAX) } +#[allow(clippy::too_many_arguments)] async fn read_batch( mut reader: R, dictionaries: &mut Dictionaries, @@ -156,6 +159,7 @@ async fn read_batch( block: usize, meta_buffer: &mut ReadBuffer, block_buffer: &mut ReadBuffer, + scratch: &mut ReadBuffer, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -208,6 +212,7 @@ where &mut cursor, 0, metadata.size, + scratch, ) } @@ -216,6 +221,7 @@ async fn read_dictionaries( fields: &[Field], ipc_schema: &IpcSchema, blocks: &[Block], + scratch: &mut ReadBuffer, ) -> Result where R: AsyncRead + AsyncSeek + Unpin, @@ -258,6 +264,7 @@ where &mut cursor, 0, u64::MAX, + scratch, )?; } _ => return Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -300,6 +307,7 @@ async fn cached_read_dictionaries( &metadata.schema.fields, &metadata.ipc_schema, blocks, + &mut Default::default(), ) .await?; *dictionaries = Some(new_dictionaries); diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 378b2976e4f..585e0e33eff 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -3,6 +3,7 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::buffer::Buffer; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; @@ -93,6 +94,7 @@ fn read_compressed_buffer( length: usize, is_little_endian: bool, compression: Compression, + scratch: &mut ReadBuffer, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(Error::NotYetImplemented( @@ -105,9 +107,8 @@ fn read_compressed_buffer( let mut buffer = vec![T::default(); length]; // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; buffer_length]; - reader.read_exact(&mut slice)?; + scratch.set_len(buffer_length); + reader.read_exact(scratch.as_mut())?; let out_slice = bytemuck::cast_slice_mut(&mut buffer); @@ -117,10 +118,10 @@ fn read_compressed_buffer( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&slice[8..], out_slice)?; + compression::decompress_lz4(&scratch.as_ref()[8..], out_slice)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&slice[8..], out_slice)?; + compression::decompress_zstd(&scratch.as_ref()[8..], out_slice)?; } } Ok(buffer) @@ -133,6 +134,7 @@ pub fn read_buffer( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let buf = buf .pop_front() @@ -151,10 +153,15 @@ pub fn read_buffer( reader.seek(SeekFrom::Start(block_offset + offset))?; if let Some(compression) = compression { - Ok( - read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)? - .into(), - ) + Ok(read_compressed_buffer( + reader, + buffer_length, + length, + is_little_endian, + compression, + scratch, + )? + .into()) } else { Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9f7ab43346f..430b4560bed 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -44,7 +44,8 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - buffer: ReadBuffer, + data_scratch: ReadBuffer, + message_scratch: ReadBuffer, } fn read_dictionary_message( @@ -74,7 +75,8 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - scratch: &mut ReadBuffer, + message_scratch: &mut ReadBuffer, + dictionary_scratch: &mut ReadBuffer, ) -> Result<()> { let offset: u64 = block .offset @@ -84,9 +86,9 @@ fn read_dictionary_block( .meta_data_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - read_dictionary_message(reader, offset, scratch)?; + read_dictionary_message(reader, offset, message_scratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -105,6 +107,7 @@ fn read_dictionary_block( reader, block_offset, metadata.size, + dictionary_scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -125,9 +128,18 @@ pub fn read_file_dictionaries( } else { return Ok(HashMap::new()); }; + // use a temporary smaller scratch for the messages + let mut message_scratch = Default::default(); for block in blocks { - read_dictionary_block(reader, metadata, block, &mut dictionaries, scratch)?; + read_dictionary_block( + reader, + metadata, + block, + &mut dictionaries, + &mut message_scratch, + scratch, + )?; } Ok(dictionaries) } @@ -246,7 +258,8 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - scratch: &mut ReadBuffer, + message_scratch: &mut ReadBuffer, + data_scratch: &mut ReadBuffer, ) -> Result>> { let block = metadata.blocks[index]; @@ -267,10 +280,10 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - scratch.set_len(meta_len); - reader.read_exact(scratch.as_mut())?; + message_scratch.set_len(meta_len); + reader.read_exact(message_scratch.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -297,6 +310,7 @@ pub fn read_batch( reader, offset + length, metadata.size, + data_scratch, ) } @@ -319,7 +333,8 @@ impl FileReader { dictionaries: Default::default(), projection, current_block: 0, - buffer: Default::default(), + data_scratch: Default::default(), + message_scratch: Default::default(), } } @@ -346,7 +361,7 @@ impl FileReader { self.dictionaries = Some(read_file_dictionaries( &mut self.reader, &self.metadata, - &mut self.buffer, + &mut self.data_scratch, )?); }; Ok(()) @@ -376,7 +391,8 @@ impl Iterator for FileReader { &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), block, - &mut self.buffer, + &mut self.message_scratch, + &mut self.data_scratch, ); let chunk = if let Some((_, map, _)) = &self.projection { diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index a5b1610ebda..95bb148ff12 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -93,6 +93,7 @@ fn read_next( message_buffer: &mut ReadBuffer, data_buffer: &mut ReadBuffer, projection: &Option<(Vec, HashMap, Schema)>, + scratch: &mut ReadBuffer, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -165,6 +166,7 @@ fn read_next( &mut reader, 0, file_size, + scratch, ); if let Some((_, map, _)) = projection { @@ -190,6 +192,7 @@ fn read_next( &mut dict_reader, 0, buf.len() as u64, + scratch, )?; // read the next message until we encounter a RecordBatch message @@ -200,6 +203,7 @@ fn read_next( message_buffer, data_buffer, projection, + scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -220,6 +224,7 @@ pub struct StreamReader { data_buffer: ReadBuffer, message_buffer: ReadBuffer, projection: Option<(Vec, HashMap, Schema)>, + scratch: ReadBuffer, } impl StreamReader { @@ -246,6 +251,7 @@ impl StreamReader { data_buffer: Default::default(), message_buffer: Default::default(), projection, + scratch: Default::default(), } } @@ -278,6 +284,7 @@ impl StreamReader { &mut self.message_buffer, &mut self.data_buffer, &self.projection, + &mut self.scratch, )?; if batch.is_none() { self.finished = true; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 6730999c5e4..6ce5593f75b 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -68,6 +68,7 @@ pub async fn read_stream_metadata_async( async fn maybe_next( mut state: ReadState, ) -> Result>> { + let mut scratch = Default::default(); // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -124,6 +125,7 @@ async fn maybe_next( .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; state.data_buffer.set_len(block_length); + match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { state.reader.read_exact(state.data_buffer.as_mut()).await?; @@ -138,6 +140,7 @@ async fn maybe_next( &mut std::io::Cursor::new(&state.data_buffer), 0, state.data_buffer.as_ref().len() as u64, + &mut scratch, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } @@ -157,6 +160,7 @@ async fn maybe_next( &mut dict_reader, 0, file_size, + &mut scratch, )?; // read the next message until we encounter a Chunk> message