From 09817a4e8a7d18a9b9d5e58f072662631a644624 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 13:47:06 +0200 Subject: [PATCH] Reduced reallocations when reading from IPC (`~12%`) (#1105) --- examples/ipc_file_read.rs | 5 +- .../integration_test.rs | 2 +- .../integration_test.rs | 3 +- src/io/flight/mod.rs | 1 + src/io/ipc/append/mod.rs | 3 +- src/io/ipc/read/array/binary.rs | 6 +- src/io/ipc/read/array/dictionary.rs | 4 +- src/io/ipc/read/array/fixed_size_binary.rs | 5 +- src/io/ipc/read/array/fixed_size_list.rs | 5 +- src/io/ipc/read/array/list.rs | 8 ++- src/io/ipc/read/array/map.rs | 8 ++- src/io/ipc/read/array/primitive.rs | 5 +- src/io/ipc/read/array/struct_.rs | 5 +- src/io/ipc/read/array/union.rs | 9 ++- src/io/ipc/read/array/utf8.rs | 6 +- src/io/ipc/read/common.rs | 7 +++ src/io/ipc/read/deserialize.rs | 16 +++++- src/io/ipc/read/file_async.rs | 51 +++++++++-------- src/io/ipc/read/mod.rs | 2 + src/io/ipc/read/read_basic.rs | 26 +++++---- src/io/ipc/read/reader.rs | 57 ++++++++++++------- src/io/ipc/read/stream.rs | 34 ++++++----- src/io/ipc/read/stream_async.rs | 26 +++++---- src/io/mod.rs | 6 ++ src/io/readbuf.rs | 50 ++++++++++++++++ 25 files changed, 252 insertions(+), 98 deletions(-) create mode 100644 src/io/readbuf.rs diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 23d179adb47..6478b25d484 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { let schema = metadata.schema.clone(); // advanced way: read the dictionary - let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?; + let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?; let chunk_index = 0; @@ -43,7 +43,8 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { &metadata, None, chunk_index, - &mut vec![], + &mut Default::default(), + &mut Default::default(), )?; Ok((schema, chunk)) diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 19a4b1a4ef1..a731a06eab1 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -276,7 +276,7 @@ async fn receive_batch_flight_data( { let length = data.data_body.len(); let mut reader = std::io::Cursor::new(&data.data_body); - read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64) + read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default()) .expect("Error reading dictionary"); data = resp.next().await?.ok()?; diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 07453f5c724..10424759e87 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -296,6 +296,7 @@ async fn record_batch_from_message( &mut reader, 0, length as u64, + &mut Default::default() ); arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e))) @@ -312,7 +313,7 @@ async fn dictionary_from_message( let mut reader = std::io::Cursor::new(data_body); let dictionary_batch_result = - ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64); + ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default()); dictionary_batch_result .map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e))) } diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 8aa4a1de0e1..9ed0835bb84 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -142,6 +142,7 @@ pub fn deserialize_batch( &mut reader, 0, length as u64, + &mut Default::default(), ), _ => Err(Error::nyi( "flight currently only supports reading RecordBatch messages", diff --git a/src/io/ipc/append/mod.rs b/src/io/ipc/append/mod.rs index 4a225c4db61..9e084a45896 100644 --- a/src/io/ipc/append/mod.rs +++ b/src/io/ipc/append/mod.rs @@ -32,7 +32,8 @@ impl FileWriter { )); } - let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?; + let dictionaries = + read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?; let last_block = metadata.blocks.last().ok_or_else(|| { Error::oos("An Arrow IPC file must have at least 1 message (the schema message)") diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index f68471f699d..51d3d7d26b8 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -7,8 +7,9 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +#[allow(clippy::too_many_arguments)] pub fn read_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +18,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +48,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +61,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, )?; BinaryArray::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 6589790c946..0b212c45012 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node}; +use super::super::{Compression, IpcBuffer, Node, ReadBuffer}; use super::{read_primitive, skip_primitive}; #[allow(clippy::too_many_arguments)] @@ -19,6 +19,7 @@ pub fn read_dictionary( block_offset: u64, compression: Option, is_little_endian: bool, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -47,6 +48,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; Ok(DictionaryArray::::from_data(keys, values)) diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 04029cd7c87..5dc1b2ab0bf 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -6,8 +6,9 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +#[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +17,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +48,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + scratch, )?; FixedSizeBinaryArray::try_new(data_type, values, validity) diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index f8e314fc080..ac169ed3f67 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -8,8 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( @@ -23,6 +22,7 @@ pub fn read_fixed_size_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +53,7 @@ pub fn read_fixed_size_list( is_little_endian, compression, version, + scratch, )?; FixedSizeListArray::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index f5d2eb5465e..bb85f233cf4 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -10,8 +10,9 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_list( @@ -25,6 +26,7 @@ pub fn read_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -57,6 +59,7 @@ where block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -74,6 +77,7 @@ where is_little_endian, compression, version, + scratch, )?; ListArray::try_new(data_type, offsets, values, validity) } diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 594f8a495b1..40b4f5c0389 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -9,8 +9,9 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_map( @@ -24,6 +25,7 @@ pub fn read_map( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +55,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![0i32])))?; @@ -70,6 +73,7 @@ pub fn read_map( is_little_endian, compression, version, + scratch, )?; MapArray::try_new(data_type, offsets, field, validity) } diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index c7c7b780d89..1e4000123b6 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -6,8 +6,9 @@ use crate::error::{Error, Result}; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +#[allow(clippy::too_many_arguments)] pub fn read_primitive( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +17,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -48,6 +50,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; PrimitiveArray::::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index c87440a9782..ae23c0f8829 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -8,8 +8,7 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; #[allow(clippy::too_many_arguments)] pub fn read_struct( @@ -23,6 +22,7 @@ pub fn read_struct( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -57,6 +57,7 @@ pub fn read_struct( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 7c86f5e30bb..fc907a32395 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -9,8 +9,9 @@ use crate::error::{Error, Result}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_union( @@ -24,6 +25,7 @@ pub fn read_union( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -50,6 +52,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?; let offsets = if let DataType::Union(_, _, mode) = data_type { @@ -61,6 +64,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?) } else { None @@ -86,6 +90,7 @@ pub fn read_union( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 8424fbb2e73..10f1260f5ac 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -7,8 +7,9 @@ use crate::datatypes::DataType; use crate::error::{Error, Result}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; +#[allow(clippy::too_many_arguments)] pub fn read_utf8( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +18,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +48,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +61,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, )?; Utf8Array::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 7e32cde5d0c..18ddd8cbb8b 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -9,6 +9,7 @@ use crate::datatypes::{DataType, Field}; use crate::error::{Error, Result}; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::{IpcField, IpcSchema}; +use crate::io::ReadBuffer; use super::deserialize::{read, skip}; use super::Dictionaries; @@ -85,6 +86,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch @@ -136,6 +138,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, )?)), ProjectionResult::NotSelected((field, _)) => { skip(&mut field_nodes, &field.data_type, &mut buffers)?; @@ -162,6 +165,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, ) }) .collect::>>()? @@ -221,6 +225,7 @@ fn first_dict_field<'a>( /// Read the dictionary from the buffer and provided metadata, /// updating the `dictionaries` with the resulting dictionary +#[allow(clippy::too_many_arguments)] pub fn read_dictionary( batch: arrow_format::ipc::DictionaryBatchRef, fields: &[Field], @@ -229,6 +234,7 @@ pub fn read_dictionary( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result<()> { if batch .is_delta() @@ -270,6 +276,7 @@ pub fn read_dictionary( reader, block_offset, file_size, + scratch, )?; let mut arrays = columns.into_arrays(); arrays.pop().unwrap() diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index f5c8b97c00f..3378f99a894 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -10,7 +10,7 @@ use crate::error::Result; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; -use super::{IpcBuffer, Node}; +use super::{IpcBuffer, Node, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read( @@ -24,6 +24,7 @@ pub fn read( is_little_endian: bool, compression: Option, version: MetadataVersion, + scratch: &mut ReadBuffer, ) -> Result> { use PhysicalType::*; let data_type = field.data_type.clone(); @@ -49,6 +50,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch ) .map(|x| x.boxed()) }), @@ -61,6 +63,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -73,6 +76,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -85,6 +89,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -97,6 +102,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -109,6 +115,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -123,6 +130,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), LargeList => read_list::( @@ -136,6 +144,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), FixedSizeList => read_fixed_size_list( @@ -149,6 +158,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Struct => read_struct( @@ -162,6 +172,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Dictionary(key_type) => { @@ -175,6 +186,7 @@ pub fn read( block_offset, compression, is_little_endian, + scratch ) .map(|x| x.boxed()) }) @@ -190,6 +202,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Map => read_map( @@ -203,6 +216,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 8397eb4a016..cabfdd43b70 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -18,6 +18,7 @@ use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; +use super::ReadBuffer; /// Async reader for Arrow IPC files pub struct FileStream<'a> { @@ -77,8 +78,9 @@ impl<'a> FileStream<'a> { // read dictionaries cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?; - let mut meta_buffer = vec![]; - let mut block_buffer = vec![]; + let mut meta_buffer = Default::default(); + let mut block_buffer = Default::default(); + let mut scratch = Default::default(); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, @@ -88,6 +90,7 @@ impl<'a> FileStream<'a> { block, &mut meta_buffer, &mut block_buffer, + &mut scratch ).await?; let chunk = if let Some((_, map)) = &projection { @@ -146,14 +149,16 @@ where deserialize_footer(&footer, u64::MAX) } +#[allow(clippy::too_many_arguments)] async fn read_batch( mut reader: R, dictionaries: &mut Dictionaries, metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, - meta_buffer: &mut Vec, - block_buffer: &mut Vec, + meta_buffer: &mut ReadBuffer, + block_buffer: &mut ReadBuffer, + scratch: &mut ReadBuffer, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -176,11 +181,10 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - meta_buffer.clear(); - meta_buffer.resize(meta_len, 0); - reader.read_exact(meta_buffer).await?; + meta_buffer.set_len(meta_len); + reader.read_exact(meta_buffer.as_mut()).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -191,10 +195,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - block_buffer.clear(); - block_buffer.resize(block_length, 0); - reader.read_exact(block_buffer).await?; - let mut cursor = std::io::Cursor::new(block_buffer); + block_buffer.set_len(block_length); + reader.read_exact(block_buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(block_buffer.as_ref()); read_record_batch( batch, @@ -208,6 +211,7 @@ where &mut cursor, 0, metadata.size, + scratch, ) } @@ -216,13 +220,14 @@ async fn read_dictionaries( fields: &[Field], ipc_schema: &IpcSchema, blocks: &[Block], + scratch: &mut ReadBuffer, ) -> Result where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data = vec![]; - let mut buffer = vec![]; + let mut data: ReadBuffer = vec![].into(); + let mut buffer: ReadBuffer = vec![].into(); for block in blocks { let offset: u64 = block @@ -237,7 +242,7 @@ where read_dictionary_message(&mut reader, offset, &mut data).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&data) + let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -245,12 +250,11 @@ where .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + buffer.set_len(length); match header { MessageHeaderRef::DictionaryBatch(batch) => { - buffer.clear(); - buffer.resize(length, 0); - reader.read_exact(&mut buffer).await?; - let mut cursor = std::io::Cursor::new(&mut buffer); + reader.read_exact(buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(buffer.as_ref()); read_dictionary( batch, fields, @@ -259,6 +263,7 @@ where &mut cursor, 0, u64::MAX, + scratch, )?; } _ => return Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -267,7 +272,7 @@ where Ok(dictionaries) } -async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut Vec) -> Result<()> +async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()> where R: AsyncRead + AsyncSeek + Unpin, { @@ -283,9 +288,8 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.clear(); - data.resize(footer_size, 0); - reader.read_exact(data).await?; + data.set_len(footer_size); + reader.read_exact(data.as_mut()).await?; Ok(()) } @@ -302,6 +306,7 @@ async fn cached_read_dictionaries( &metadata.schema.fields, &metadata.ipc_schema, blocks, + &mut Default::default(), ) .await?; *dictionaries = Some(new_dictionaries); diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 5ffe6426e20..f25da7a4a1c 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,6 +27,8 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; +use super::super::ReadBuffer; + pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 378b2976e4f..cc04035eec7 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -7,7 +7,7 @@ use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; use super::super::endianess::is_native_little_endian; -use super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; fn read_swapped( reader: &mut R, @@ -93,6 +93,7 @@ fn read_compressed_buffer( length: usize, is_little_endian: bool, compression: Compression, + scratch: &mut ReadBuffer, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(Error::NotYetImplemented( @@ -105,9 +106,8 @@ fn read_compressed_buffer( let mut buffer = vec![T::default(); length]; // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; buffer_length]; - reader.read_exact(&mut slice)?; + scratch.set_len(buffer_length); + reader.read_exact(scratch.as_mut())?; let out_slice = bytemuck::cast_slice_mut(&mut buffer); @@ -117,10 +117,10 @@ fn read_compressed_buffer( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&slice[8..], out_slice)?; + compression::decompress_lz4(&scratch.as_ref()[8..], out_slice)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&slice[8..], out_slice)?; + compression::decompress_zstd(&scratch.as_ref()[8..], out_slice)?; } } Ok(buffer) @@ -133,6 +133,7 @@ pub fn read_buffer( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let buf = buf .pop_front() @@ -151,10 +152,15 @@ pub fn read_buffer( reader.seek(SeekFrom::Start(block_offset + offset))?; if let Some(compression) = compression { - Ok( - read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)? - .into(), - ) + Ok(read_compressed_buffer( + reader, + buffer_length, + length, + is_little_endian, + compression, + scratch, + )? + .into()) } else { Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9807c198496..b01d85cf499 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -13,6 +13,7 @@ use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; use arrow_format::ipc::planus::ReadAsRoot; /// Metadata of an Arrow IPC file, written in the footer of the file. @@ -44,13 +45,14 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - buffer: Vec, + data_scratch: ReadBuffer, + message_scratch: ReadBuffer, } fn read_dictionary_message( reader: &mut R, offset: u64, - data: &mut Vec, + data: &mut ReadBuffer, ) -> Result<()> { let mut message_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::Start(offset))?; @@ -64,11 +66,8 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - // prepare `data` to read the message - data.clear(); - data.resize(message_length, 0); - - reader.read_exact(data)?; + data.set_len(message_length); + reader.read_exact(data.as_mut())?; Ok(()) } @@ -77,7 +76,8 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - scratch: &mut Vec, + message_scratch: &mut ReadBuffer, + dictionary_scratch: &mut ReadBuffer, ) -> Result<()> { let offset: u64 = block .offset @@ -87,9 +87,9 @@ fn read_dictionary_block( .meta_data_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - read_dictionary_message(reader, offset, scratch)?; + read_dictionary_message(reader, offset, message_scratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -108,6 +108,7 @@ fn read_dictionary_block( reader, block_offset, metadata.size, + dictionary_scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -119,18 +120,27 @@ fn read_dictionary_block( pub fn read_file_dictionaries( reader: &mut R, metadata: &FileMetadata, + scratch: &mut ReadBuffer, ) -> Result { let mut dictionaries = Default::default(); - let mut data = vec![]; let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { blocks } else { return Ok(HashMap::new()); }; + // use a temporary smaller scratch for the messages + let mut message_scratch = Default::default(); for block in blocks { - read_dictionary_block(reader, metadata, block, &mut dictionaries, &mut data)?; + read_dictionary_block( + reader, + metadata, + block, + &mut dictionaries, + &mut message_scratch, + scratch, + )?; } Ok(dictionaries) } @@ -249,7 +259,8 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - stratch: &mut Vec, + message_scratch: &mut ReadBuffer, + data_scratch: &mut ReadBuffer, ) -> Result>> { let block = metadata.blocks[index]; @@ -270,11 +281,10 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - stratch.clear(); - stratch.resize(meta_len, 0); - reader.read_exact(stratch)?; + message_scratch.set_len(meta_len); + reader.read_exact(message_scratch.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(stratch) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -301,6 +311,7 @@ pub fn read_batch( reader, offset + length, metadata.size, + data_scratch, ) } @@ -323,7 +334,8 @@ impl FileReader { dictionaries: Default::default(), projection, current_block: 0, - buffer: vec![], + data_scratch: Default::default(), + message_scratch: Default::default(), } } @@ -347,7 +359,11 @@ impl FileReader { fn read_dictionaries(&mut self) -> Result<()> { if self.dictionaries.is_none() { - self.dictionaries = Some(read_file_dictionaries(&mut self.reader, &self.metadata)?); + self.dictionaries = Some(read_file_dictionaries( + &mut self.reader, + &self.metadata, + &mut self.data_scratch, + )?); }; Ok(()) } @@ -376,7 +392,8 @@ impl Iterator for FileReader { &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), block, - &mut self.buffer, + &mut self.message_scratch, + &mut self.data_scratch, ); let chunk = if let Some((_, map, _)) = &self.projection { diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 3149e5e5b1f..5dd7b5cddda 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -15,6 +15,7 @@ use super::common::*; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; /// Metadata of an Arrow IPC stream, written at the start of the stream #[derive(Debug, Clone)] @@ -90,9 +91,10 @@ fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut Dictionaries, - message_buffer: &mut Vec, - data_buffer: &mut Vec, + message_buffer: &mut ReadBuffer, + data_buffer: &mut ReadBuffer, projection: &Option<(Vec, HashMap, Schema)>, + scratch: &mut ReadBuffer, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -129,11 +131,10 @@ fn read_next( return Ok(None); } - message_buffer.clear(); - message_buffer.resize(meta_length, 0); - reader.read_exact(message_buffer)?; + message_buffer.set_len(meta_length); + reader.read_exact(message_buffer.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -147,13 +148,12 @@ fn read_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - data_buffer.clear(); - data_buffer.resize(block_length, 0); - reader.read_exact(data_buffer)?; + reader.read_exact(data_buffer.as_mut())?; - let file_size = data_buffer.len() as u64; + let file_size = data_buffer.as_ref().len() as u64; let mut reader = std::io::Cursor::new(data_buffer); @@ -167,6 +167,7 @@ fn read_next( &mut reader, 0, file_size, + scratch, ); if let Some((_, map, _)) = projection { @@ -192,6 +193,7 @@ fn read_next( &mut dict_reader, 0, buf.len() as u64, + scratch, )?; // read the next message until we encounter a RecordBatch message @@ -202,6 +204,7 @@ fn read_next( message_buffer, data_buffer, projection, + scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -219,9 +222,10 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: Dictionaries, finished: bool, - data_buffer: Vec, - message_buffer: Vec, + data_buffer: ReadBuffer, + message_buffer: ReadBuffer, projection: Option<(Vec, HashMap, Schema)>, + scratch: ReadBuffer, } impl StreamReader { @@ -245,9 +249,10 @@ impl StreamReader { metadata, dictionaries: Default::default(), finished: false, - data_buffer: vec![], - message_buffer: vec![], + data_buffer: Default::default(), + message_buffer: Default::default(), projection, + scratch: Default::default(), } } @@ -280,6 +285,7 @@ impl StreamReader { &mut self.message_buffer, &mut self.data_buffer, &self.projection, + &mut self.scratch, )?; if batch.is_none() { self.finished = true; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 1e130b48496..cfbfe5ad3e0 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -16,6 +16,7 @@ use super::common::{read_dictionary, read_record_batch}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; use super::StreamMetadata; /// A (private) state of stream messages @@ -24,9 +25,9 @@ struct ReadState { pub metadata: StreamMetadata, pub dictionaries: Dictionaries, /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: Vec, + pub data_buffer: ReadBuffer, /// The internal buffer to read messages to - pub message_buffer: Vec, + pub message_buffer: ReadBuffer, } /// The state of an Arrow stream @@ -68,6 +69,7 @@ pub async fn read_stream_metadata_async( async fn maybe_next( mut state: ReadState, ) -> Result>> { + let mut scratch = Default::default(); // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -103,11 +105,13 @@ async fn maybe_next( return Ok(None); } - state.message_buffer.clear(); - state.message_buffer.resize(meta_length, 0); - state.reader.read_exact(&mut state.message_buffer).await?; + state.message_buffer.set_len(meta_length); + state + .reader + .read_exact(state.message_buffer.as_mut()) + .await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -121,11 +125,11 @@ async fn maybe_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + state.data_buffer.set_len(block_length); + match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.data_buffer.clear(); - state.data_buffer.resize(block_length, 0); - state.reader.read_exact(&mut state.data_buffer).await?; + state.reader.read_exact(state.data_buffer.as_mut()).await?; read_record_batch( batch, @@ -136,7 +140,8 @@ async fn maybe_next( state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), 0, - state.data_buffer.len() as u64, + state.data_buffer.as_ref().len() as u64, + &mut scratch, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } @@ -156,6 +161,7 @@ async fn maybe_next( &mut dict_reader, 0, file_size, + &mut scratch, )?; // read the next message until we encounter a Chunk> message diff --git a/src/io/mod.rs b/src/io/mod.rs index 9343d4281ce..95a17ee8bf3 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -53,3 +53,9 @@ pub mod print; #[cfg(any(feature = "io_csv_write", feature = "io_avro", feature = "io_json"))] mod iterator; + +#[cfg(feature = "io_ipc")] +mod readbuf; + +#[cfg(feature = "io_ipc")] +pub use readbuf::ReadBuffer; diff --git a/src/io/readbuf.rs b/src/io/readbuf.rs new file mode 100644 index 00000000000..d30396a8289 --- /dev/null +++ b/src/io/readbuf.rs @@ -0,0 +1,50 @@ +/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. +/// This may improve performance of the [`Read`] trait. +#[derive(Clone, Default)] +pub struct ReadBuffer { + data: Vec, + // length to be read or is read + length: usize, +} + +impl ReadBuffer { + /// Set the minimal length of the [`ReadBuf`]. Contrary to the + /// method on `Vec` this is `safe` because this function guarantees that + /// the underlying data always is initialized. + pub fn set_len(&mut self, length: usize) { + if length > self.data.capacity() { + // exponential growing strategy + // benchmark showed it was ~5% faster + // in reading lz4 yellow-trip dataset + self.data = vec![0; length * 2]; + } else if length > self.data.len() { + self.data.resize(length, 0); + } + self.length = length; + } +} + +impl AsRef<[u8]> for ReadBuffer { + fn as_ref(&self) -> &[u8] { + &self.data[..self.length] + } +} + +impl AsMut<[u8]> for ReadBuffer { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.data[..self.length] + } +} + +impl From> for ReadBuffer { + fn from(data: Vec) -> Self { + let length = data.len(); + Self { data, length } + } +} + +impl From for Vec { + fn from(buf: ReadBuffer) -> Self { + buf.data + } +}