From 653956e2a14a5ab9b95af8f727cc2b347b4b94d1 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 25 Jun 2022 14:51:09 +0200 Subject: [PATCH 01/10] IPC: don't reassign all bytes before overwriting them --- src/io/ipc/read/file_async.rs | 25 +++++++++++++------------ src/io/ipc/read/reader.rs | 24 ++++++++++++++---------- src/io/ipc/read/stream.rs | 9 +++------ src/io/ipc/read/stream_async.rs | 15 +++++++++------ 4 files changed, 39 insertions(+), 34 deletions(-) diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 8397eb4a016..3ec50482c74 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -11,6 +11,7 @@ use crate::array::*; use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; @@ -176,9 +177,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - meta_buffer.clear(); - meta_buffer.resize(meta_len, 0); - reader.read_exact(meta_buffer).await?; + reader + .read_exact(prepare_scratch(meta_buffer, meta_len)) + .await?; let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -191,9 +192,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - block_buffer.clear(); - block_buffer.resize(block_length, 0); - reader.read_exact(block_buffer).await?; + reader + .read_exact(prepare_scratch(block_buffer, block_length)) + .await?; let mut cursor = std::io::Cursor::new(block_buffer); read_record_batch( @@ -247,9 +248,9 @@ where match header { MessageHeaderRef::DictionaryBatch(batch) => { - buffer.clear(); - buffer.resize(length, 0); - reader.read_exact(&mut buffer).await?; + reader + .read_exact(prepare_scratch(&mut buffer, length)) + .await?; let mut cursor = std::io::Cursor::new(&mut buffer); read_dictionary( batch, @@ -283,9 +284,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - data.clear(); - data.resize(footer_size, 0); - reader.read_exact(data).await?; + reader + .read_exact(prepare_scratch(data, footer_size)) + .await?; Ok(()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9807c198496..ff6bf752ab3 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -36,6 +36,16 @@ pub struct FileMetadata { pub(crate) size: u64, } +/// prepare `scratch` to read the message +pub(super) fn prepare_scratch(scratch: &mut Vec, message_length: usize) -> &mut [u8] { + // ensure that we have enough scratch space + if message_length > scratch.len() { + scratch.resize(message_length, 0); + } + // return the buffer that will be overwritten by read + &mut scratch[..message_length] +} + /// Arrow File reader pub struct FileReader { reader: R, @@ -64,11 +74,7 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - // prepare `data` to read the message - data.clear(); - data.resize(message_length, 0); - - reader.read_exact(data)?; + reader.read_exact(prepare_scratch(data, message_length))?; Ok(()) } @@ -249,7 +255,7 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - stratch: &mut Vec, + scratch: &mut Vec, ) -> Result>> { let block = metadata.blocks[index]; @@ -270,11 +276,9 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - stratch.clear(); - stratch.resize(meta_len, 0); - reader.read_exact(stratch)?; + reader.read_exact(prepare_scratch(scratch, meta_len))?; - let message = arrow_format::ipc::MessageRef::read_as_root(stratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 3149e5e5b1f..f2730f20f3a 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -8,6 +8,7 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::IpcSchema; use super::super::CONTINUATION_MARKER; @@ -129,9 +130,7 @@ fn read_next( return Ok(None); } - message_buffer.clear(); - message_buffer.resize(meta_length, 0); - reader.read_exact(message_buffer)?; + reader.read_exact(prepare_scratch(message_buffer, meta_length))?; let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -149,9 +148,7 @@ fn read_next( match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - data_buffer.clear(); - data_buffer.resize(block_length, 0); - reader.read_exact(data_buffer)?; + reader.read_exact(prepare_scratch(data_buffer, block_length))?; let file_size = data_buffer.len() as u64; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 1e130b48496..2e51860d052 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -10,6 +10,7 @@ use futures::Stream; use crate::array::*; use crate::chunk::Chunk; use crate::error::{Error, Result}; +use crate::io::ipc::read::reader::prepare_scratch; use super::super::CONTINUATION_MARKER; use super::common::{read_dictionary, read_record_batch}; @@ -103,9 +104,10 @@ async fn maybe_next( return Ok(None); } - state.message_buffer.clear(); - state.message_buffer.resize(meta_length, 0); - state.reader.read_exact(&mut state.message_buffer).await?; + state + .reader + .read_exact(prepare_scratch(&mut state.message_buffer, meta_length)) + .await?; let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; @@ -123,9 +125,10 @@ async fn maybe_next( match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.data_buffer.clear(); - state.data_buffer.resize(block_length, 0); - state.reader.read_exact(&mut state.data_buffer).await?; + state + .reader + .read_exact(prepare_scratch(&mut state.data_buffer, block_length)) + .await?; read_record_batch( batch, From ede4df32cf2efbc498ac6d59ba6e5afd13901e61 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 26 Jun 2022 15:20:21 +0200 Subject: [PATCH 02/10] use ReadBuffer abstraction --- examples/ipc_file_read.rs | 2 +- src/io/ipc/read/common.rs | 54 +++++++++++++++++++++++++++++++++ src/io/ipc/read/file_async.rs | 47 ++++++++++++++-------------- src/io/ipc/read/reader.rs | 32 ++++++++----------- src/io/ipc/read/stream.rs | 23 +++++++------- src/io/ipc/read/stream_async.rs | 20 ++++++------ 6 files changed, 110 insertions(+), 68 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 23d179adb47..d4ea15a3595 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -43,7 +43,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { &metadata, None, chunk_index, - &mut vec![], + &mut Default::default(), )?; Ok((schema, chunk)) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 7e32cde5d0c..04504c0f6d7 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -286,6 +286,60 @@ pub fn read_dictionary( Ok(()) } +/// A small wrapper around `[Vec]` that allows us to reuse memory once it is initialized. +/// This may improve performance of the `[Read]` trait. +#[derive(Clone, Default)] +pub struct ReadBuffer { + data: Vec, + // length to be read or is read + length: usize, +} + +impl ReadBuffer { + /// Create a new [`ReadBuf`] initialized to `length` + pub fn new(length: usize) -> Self { + let data = vec![0; length]; + Self { data, length } + } + + /// Set the length of the [`ReadBuf`]. Contrary to the + /// method on `Vec` this is `safe` because this function guarantees that + /// the underlying data always is initialized. + pub fn set_len(&mut self, length: usize) { + if length > self.data.capacity() { + self.data = vec![0; length]; + } else if length > self.data.len() { + self.data.resize(length, 0); + } + self.length = length; + } +} + +impl AsRef<[u8]> for ReadBuffer { + fn as_ref(&self) -> &[u8] { + &self.data[..self.length] + } +} + +impl AsMut<[u8]> for ReadBuffer { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.data[..self.length] + } +} + +impl From> for ReadBuffer { + fn from(data: Vec) -> Self { + let length = data.len(); + Self { data, length } + } +} + +impl From for Vec { + fn from(buf: ReadBuffer) -> Self { + buf.data + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 3ec50482c74..194a41a12e9 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -11,10 +11,11 @@ use crate::array::*; use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; -use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; +use super::common::{ + apply_projection, prepare_projection, read_dictionary, read_record_batch, ReadBuffer, +}; use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; @@ -78,8 +79,8 @@ impl<'a> FileStream<'a> { // read dictionaries cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?; - let mut meta_buffer = vec![]; - let mut block_buffer = vec![]; + let mut meta_buffer = Default::default(); + let mut block_buffer = Default::default(); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, @@ -153,8 +154,8 @@ async fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, block: usize, - meta_buffer: &mut Vec, - block_buffer: &mut Vec, + meta_buffer: &mut ReadBuffer, + block_buffer: &mut ReadBuffer, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -177,11 +178,10 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader - .read_exact(prepare_scratch(meta_buffer, meta_len)) - .await?; + meta_buffer.set_len(meta_len); + reader.read_exact(meta_buffer.as_mut()).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -192,10 +192,9 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader - .read_exact(prepare_scratch(block_buffer, block_length)) - .await?; - let mut cursor = std::io::Cursor::new(block_buffer); + block_buffer.set_len(block_length); + reader.read_exact(block_buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(block_buffer.as_ref()); read_record_batch( batch, @@ -222,8 +221,8 @@ where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data = vec![]; - let mut buffer = vec![]; + let mut data = ReadBuffer::new(0); + let mut buffer = ReadBuffer::new(0); for block in blocks { let offset: u64 = block @@ -238,7 +237,7 @@ where read_dictionary_message(&mut reader, offset, &mut data).await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&data) + let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -246,12 +245,11 @@ where .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + buffer.set_len(length); match header { MessageHeaderRef::DictionaryBatch(batch) => { - reader - .read_exact(prepare_scratch(&mut buffer, length)) - .await?; - let mut cursor = std::io::Cursor::new(&mut buffer); + reader.read_exact(buffer.as_mut()).await?; + let mut cursor = std::io::Cursor::new(buffer.as_ref()); read_dictionary( batch, fields, @@ -268,7 +266,7 @@ where Ok(dictionaries) } -async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut Vec) -> Result<()> +async fn read_dictionary_message(mut reader: R, offset: u64, data: &mut ReadBuffer) -> Result<()> where R: AsyncRead + AsyncSeek + Unpin, { @@ -284,9 +282,8 @@ where .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - reader - .read_exact(prepare_scratch(data, footer_size)) - .await?; + data.set_len(footer_size); + reader.read_exact(data.as_mut()).await?; Ok(()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ff6bf752ab3..c40adbdd694 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -36,16 +36,6 @@ pub struct FileMetadata { pub(crate) size: u64, } -/// prepare `scratch` to read the message -pub(super) fn prepare_scratch(scratch: &mut Vec, message_length: usize) -> &mut [u8] { - // ensure that we have enough scratch space - if message_length > scratch.len() { - scratch.resize(message_length, 0); - } - // return the buffer that will be overwritten by read - &mut scratch[..message_length] -} - /// Arrow File reader pub struct FileReader { reader: R, @@ -54,13 +44,13 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - buffer: Vec, + buffer: ReadBuffer, } fn read_dictionary_message( reader: &mut R, offset: u64, - data: &mut Vec, + data: &mut ReadBuffer, ) -> Result<()> { let mut message_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::Start(offset))?; @@ -74,7 +64,8 @@ fn read_dictionary_message( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - reader.read_exact(prepare_scratch(data, message_length))?; + data.set_len(message_length); + reader.read_exact(data.as_mut())?; Ok(()) } @@ -83,7 +74,7 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - scratch: &mut Vec, + scratch: &mut ReadBuffer, ) -> Result<()> { let offset: u64 = block .offset @@ -95,7 +86,7 @@ fn read_dictionary_block( .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; read_dictionary_message(reader, offset, scratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -127,7 +118,7 @@ pub fn read_file_dictionaries( metadata: &FileMetadata, ) -> Result { let mut dictionaries = Default::default(); - let mut data = vec![]; + let mut data = vec![].into(); let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { blocks @@ -255,7 +246,7 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - scratch: &mut Vec, + scratch: &mut ReadBuffer, ) -> Result>> { let block = metadata.blocks[index]; @@ -276,9 +267,10 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - reader.read_exact(prepare_scratch(scratch, meta_len))?; + scratch.set_len(meta_len); + reader.read_exact(scratch.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -327,7 +319,7 @@ impl FileReader { dictionaries: Default::default(), projection, current_block: 0, - buffer: vec![], + buffer: Default::default(), } } diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index f2730f20f3a..a5b1610ebda 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -8,7 +8,6 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use crate::io::ipc::IpcSchema; use super::super::CONTINUATION_MARKER; @@ -91,8 +90,8 @@ fn read_next( reader: &mut R, metadata: &StreamMetadata, dictionaries: &mut Dictionaries, - message_buffer: &mut Vec, - data_buffer: &mut Vec, + message_buffer: &mut ReadBuffer, + data_buffer: &mut ReadBuffer, projection: &Option<(Vec, HashMap, Schema)>, ) -> Result> { // determine metadata length @@ -130,9 +129,10 @@ fn read_next( return Ok(None); } - reader.read_exact(prepare_scratch(message_buffer, meta_length))?; + message_buffer.set_len(meta_length); + reader.read_exact(message_buffer.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -146,11 +146,12 @@ fn read_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - reader.read_exact(prepare_scratch(data_buffer, block_length))?; + reader.read_exact(data_buffer.as_mut())?; - let file_size = data_buffer.len() as u64; + let file_size = data_buffer.as_ref().len() as u64; let mut reader = std::io::Cursor::new(data_buffer); @@ -216,8 +217,8 @@ pub struct StreamReader { metadata: StreamMetadata, dictionaries: Dictionaries, finished: bool, - data_buffer: Vec, - message_buffer: Vec, + data_buffer: ReadBuffer, + message_buffer: ReadBuffer, projection: Option<(Vec, HashMap, Schema)>, } @@ -242,8 +243,8 @@ impl StreamReader { metadata, dictionaries: Default::default(), finished: false, - data_buffer: vec![], - message_buffer: vec![], + data_buffer: Default::default(), + message_buffer: Default::default(), projection, } } diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 2e51860d052..6730999c5e4 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -10,10 +10,9 @@ use futures::Stream; use crate::array::*; use crate::chunk::Chunk; use crate::error::{Error, Result}; -use crate::io::ipc::read::reader::prepare_scratch; use super::super::CONTINUATION_MARKER; -use super::common::{read_dictionary, read_record_batch}; +use super::common::{read_dictionary, read_record_batch, ReadBuffer}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; @@ -25,9 +24,9 @@ struct ReadState { pub metadata: StreamMetadata, pub dictionaries: Dictionaries, /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: Vec, + pub data_buffer: ReadBuffer, /// The internal buffer to read messages to - pub message_buffer: Vec, + pub message_buffer: ReadBuffer, } /// The state of an Arrow stream @@ -104,12 +103,13 @@ async fn maybe_next( return Ok(None); } + state.message_buffer.set_len(meta_length); state .reader - .read_exact(prepare_scratch(&mut state.message_buffer, meta_length)) + .read_exact(state.message_buffer.as_mut()) .await?; - let message = arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer) + let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -123,12 +123,10 @@ async fn maybe_next( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; + state.data_buffer.set_len(block_length); match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state - .reader - .read_exact(prepare_scratch(&mut state.data_buffer, block_length)) - .await?; + state.reader.read_exact(state.data_buffer.as_mut()).await?; read_record_batch( batch, @@ -139,7 +137,7 @@ async fn maybe_next( state.metadata.version, &mut std::io::Cursor::new(&state.data_buffer), 0, - state.data_buffer.len() as u64, + state.data_buffer.as_ref().len() as u64, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } From 53d1b545e7fafee202b91778d2dae7a80010171c Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 26 Jun 2022 15:43:38 +0200 Subject: [PATCH 03/10] reuse scratch when reading dictionaries --- examples/ipc_file_read.rs | 2 +- src/io/ipc/append/mod.rs | 3 ++- src/io/ipc/read/common.rs | 1 + src/io/ipc/read/reader.rs | 10 +++++++--- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index d4ea15a3595..27373ece01f 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { let schema = metadata.schema.clone(); // advanced way: read the dictionary - let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?; + let dictionaries = read::read_file_dictionaries(&mut file, &metadata, Default::default())?; let chunk_index = 0; diff --git a/src/io/ipc/append/mod.rs b/src/io/ipc/append/mod.rs index 4a225c4db61..9e084a45896 100644 --- a/src/io/ipc/append/mod.rs +++ b/src/io/ipc/append/mod.rs @@ -32,7 +32,8 @@ impl FileWriter { )); } - let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?; + let dictionaries = + read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?; let last_block = metadata.blocks.last().ok_or_else(|| { Error::oos("An Arrow IPC file must have at least 1 message (the schema message)") diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 04504c0f6d7..dededdb817d 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -306,6 +306,7 @@ impl ReadBuffer { /// method on `Vec` this is `safe` because this function guarantees that /// the underlying data always is initialized. pub fn set_len(&mut self, length: usize) { + dbg!(length, self.data.len(), self.data.capacity()); if length > self.data.capacity() { self.data = vec![0; length]; } else if length > self.data.len() { diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index c40adbdd694..9f7ab43346f 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -116,9 +116,9 @@ fn read_dictionary_block( pub fn read_file_dictionaries( reader: &mut R, metadata: &FileMetadata, + scratch: &mut ReadBuffer, ) -> Result { let mut dictionaries = Default::default(); - let mut data = vec![].into(); let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { blocks @@ -127,7 +127,7 @@ pub fn read_file_dictionaries( }; for block in blocks { - read_dictionary_block(reader, metadata, block, &mut dictionaries, &mut data)?; + read_dictionary_block(reader, metadata, block, &mut dictionaries, scratch)?; } Ok(dictionaries) } @@ -343,7 +343,11 @@ impl FileReader { fn read_dictionaries(&mut self) -> Result<()> { if self.dictionaries.is_none() { - self.dictionaries = Some(read_file_dictionaries(&mut self.reader, &self.metadata)?); + self.dictionaries = Some(read_file_dictionaries( + &mut self.reader, + &self.metadata, + &mut self.buffer, + )?); }; Ok(()) } From 5bed494782e2f1d4b4b05ec7a1dcd9f73b286ecb Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:03:13 +0200 Subject: [PATCH 04/10] make sure that scratch is used in decompressing data --- examples/ipc_file_read.rs | 3 +- src/io/flight/mod.rs | 1 + src/io/ipc/read/array/binary.rs | 5 +++ src/io/ipc/read/array/dictionary.rs | 3 ++ src/io/ipc/read/array/fixed_size_binary.rs | 4 +++ src/io/ipc/read/array/fixed_size_list.rs | 3 ++ src/io/ipc/read/array/list.rs | 4 +++ src/io/ipc/read/array/map.rs | 4 +++ src/io/ipc/read/array/primitive.rs | 4 +++ src/io/ipc/read/array/struct_.rs | 3 ++ src/io/ipc/read/array/union.rs | 5 +++ src/io/ipc/read/array/utf8.rs | 5 +++ src/io/ipc/read/common.rs | 6 ++++ src/io/ipc/read/deserialize.rs | 15 ++++++++ src/io/ipc/read/file_async.rs | 8 +++++ src/io/ipc/read/read_basic.rs | 25 +++++++++----- src/io/ipc/read/reader.rs | 40 +++++++++++++++------- src/io/ipc/read/stream.rs | 7 ++++ src/io/ipc/read/stream_async.rs | 4 +++ 19 files changed, 127 insertions(+), 22 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 27373ece01f..6478b25d484 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { let schema = metadata.schema.clone(); // advanced way: read the dictionary - let dictionaries = read::read_file_dictionaries(&mut file, &metadata, Default::default())?; + let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?; let chunk_index = 0; @@ -44,6 +44,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { None, chunk_index, &mut Default::default(), + &mut Default::default(), )?; Ok((schema, chunk)) diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 8aa4a1de0e1..9ed0835bb84 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -142,6 +142,7 @@ pub fn deserialize_batch( &mut reader, 0, length as u64, + &mut Default::default(), ), _ => Err(Error::nyi( "flight currently only supports reading RecordBatch messages", diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index f68471f699d..d800408247e 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -5,10 +5,12 @@ use crate::array::{BinaryArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +19,7 @@ pub fn read_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +62,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, )?; BinaryArray::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index 6589790c946..fee1d6a046f 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::Dictionaries; use super::super::{Compression, IpcBuffer, Node}; @@ -19,6 +20,7 @@ pub fn read_dictionary( block_offset: u64, compression: Option, is_little_endian: bool, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -47,6 +49,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; Ok(DictionaryArray::::from_data(keys, values)) diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 04029cd7c87..9b03127901a 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -4,10 +4,12 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeBinaryArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +18,7 @@ pub fn read_fixed_size_binary( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + scratch, )?; FixedSizeBinaryArray::try_new(data_type, values, validity) diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index f8e314fc080..7aa5c30c92e 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeListArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -23,6 +24,7 @@ pub fn read_fixed_size_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +55,7 @@ pub fn read_fixed_size_list( is_little_endian, compression, version, + scratch, )?; FixedSizeListArray::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index f5d2eb5465e..d913c856b64 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -6,6 +6,7 @@ use crate::array::{ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -25,6 +26,7 @@ pub fn read_list( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -57,6 +59,7 @@ where block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -74,6 +77,7 @@ where is_little_endian, compression, version, + scratch, )?; ListArray::try_new(data_type, offsets, values, validity) } diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 594f8a495b1..fd887d7efb5 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -5,6 +5,7 @@ use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -24,6 +25,7 @@ pub fn read_map( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -53,6 +55,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![0i32])))?; @@ -70,6 +73,7 @@ pub fn read_map( is_little_endian, compression, version, + scratch, )?; MapArray::try_new(data_type, offsets, field, validity) } diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index c7c7b780d89..1ae4c94bc2a 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -3,11 +3,13 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_primitive( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +18,7 @@ pub fn read_primitive( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> where Vec: TryInto, @@ -48,6 +51,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; PrimitiveArray::::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index c87440a9782..76894f03c14 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -4,6 +4,7 @@ use std::io::{Read, Seek}; use crate::array::StructArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -23,6 +24,7 @@ pub fn read_struct( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -57,6 +59,7 @@ pub fn read_struct( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 7c86f5e30bb..4c7ea723004 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -5,6 +5,7 @@ use crate::array::UnionArray; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; @@ -24,6 +25,7 @@ pub fn read_union( is_little_endian: bool, compression: Option, version: Version, + scratch: &mut ReadBuffer, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -50,6 +52,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?; let offsets = if let DataType::Union(_, _, mode) = data_type { @@ -61,6 +64,7 @@ pub fn read_union( block_offset, is_little_endian, compression, + scratch, )?) } else { None @@ -86,6 +90,7 @@ pub fn read_union( is_little_endian, compression, version, + scratch, ) }) .collect::>>()?; diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 8424fbb2e73..1e9b52eacad 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -5,10 +5,12 @@ use crate::array::{Offset, Utf8Array}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_utf8( field_nodes: &mut VecDeque, data_type: DataType, @@ -17,6 +19,7 @@ pub fn read_utf8( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -46,6 +49,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, ) // Older versions of the IPC format sometimes do not report an offset .or_else(|_| Result::Ok(Buffer::::from(vec![O::default()])))?; @@ -58,6 +62,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, )?; Utf8Array::::try_new(data_type, offsets, values, validity) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index dededdb817d..4e52d22ddde 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -85,6 +85,7 @@ pub fn read_record_batch( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch @@ -136,6 +137,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, )?)), ProjectionResult::NotSelected((field, _)) => { skip(&mut field_nodes, &field.data_type, &mut buffers)?; @@ -162,6 +164,7 @@ pub fn read_record_batch( Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)) })?, version, + scratch, ) }) .collect::>>()? @@ -221,6 +224,7 @@ fn first_dict_field<'a>( /// Read the dictionary from the buffer and provided metadata, /// updating the `dictionaries` with the resulting dictionary +#[allow(clippy::too_many_arguments)] pub fn read_dictionary( batch: arrow_format::ipc::DictionaryBatchRef, fields: &[Field], @@ -229,6 +233,7 @@ pub fn read_dictionary( reader: &mut R, block_offset: u64, file_size: u64, + scratch: &mut ReadBuffer, ) -> Result<()> { if batch .is_delta() @@ -270,6 +275,7 @@ pub fn read_dictionary( reader, block_offset, file_size, + scratch, )?; let mut arrays = columns.into_arrays(); arrays.pop().unwrap() diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index f5c8b97c00f..337ee58bd9a 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -7,6 +7,7 @@ use arrow_format::ipc::MetadataVersion; use crate::array::*; use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Result; +use crate::io::ipc::read::common::ReadBuffer; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; @@ -24,6 +25,7 @@ pub fn read( is_little_endian: bool, compression: Option, version: MetadataVersion, + scratch: &mut ReadBuffer, ) -> Result> { use PhysicalType::*; let data_type = field.data_type.clone(); @@ -49,6 +51,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch ) .map(|x| x.boxed()) }), @@ -61,6 +64,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -73,6 +77,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -85,6 +90,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -97,6 +103,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -109,6 +116,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, )?; Ok(Box::new(array)) } @@ -123,6 +131,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), LargeList => read_list::( @@ -136,6 +145,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), FixedSizeList => read_fixed_size_list( @@ -149,6 +159,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Struct => read_struct( @@ -162,6 +173,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Dictionary(key_type) => { @@ -175,6 +187,7 @@ pub fn read( block_offset, compression, is_little_endian, + scratch ) .map(|x| x.boxed()) }) @@ -190,6 +203,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), Map => read_map( @@ -203,6 +217,7 @@ pub fn read( is_little_endian, compression, version, + scratch, ) .map(|x| x.boxed()), } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 194a41a12e9..59ddd51c6f0 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -81,6 +81,7 @@ impl<'a> FileStream<'a> { let mut meta_buffer = Default::default(); let mut block_buffer = Default::default(); + let mut scratch = Default::default(); for block in 0..metadata.blocks.len() { let chunk = read_batch( &mut reader, @@ -90,6 +91,7 @@ impl<'a> FileStream<'a> { block, &mut meta_buffer, &mut block_buffer, + &mut scratch ).await?; let chunk = if let Some((_, map)) = &projection { @@ -148,6 +150,7 @@ where deserialize_footer(&footer, u64::MAX) } +#[allow(clippy::too_many_arguments)] async fn read_batch( mut reader: R, dictionaries: &mut Dictionaries, @@ -156,6 +159,7 @@ async fn read_batch( block: usize, meta_buffer: &mut ReadBuffer, block_buffer: &mut ReadBuffer, + scratch: &mut ReadBuffer, ) -> Result>> where R: AsyncRead + AsyncSeek + Unpin, @@ -208,6 +212,7 @@ where &mut cursor, 0, metadata.size, + scratch, ) } @@ -216,6 +221,7 @@ async fn read_dictionaries( fields: &[Field], ipc_schema: &IpcSchema, blocks: &[Block], + scratch: &mut ReadBuffer, ) -> Result where R: AsyncRead + AsyncSeek + Unpin, @@ -258,6 +264,7 @@ where &mut cursor, 0, u64::MAX, + scratch, )?; } _ => return Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -300,6 +307,7 @@ async fn cached_read_dictionaries( &metadata.schema.fields, &metadata.ipc_schema, blocks, + &mut Default::default(), ) .await?; *dictionaries = Some(new_dictionaries); diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 378b2976e4f..585e0e33eff 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -3,6 +3,7 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::buffer::Buffer; use crate::error::{Error, Result}; +use crate::io::ipc::read::common::ReadBuffer; use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; @@ -93,6 +94,7 @@ fn read_compressed_buffer( length: usize, is_little_endian: bool, compression: Compression, + scratch: &mut ReadBuffer, ) -> Result> { if is_little_endian != is_native_little_endian() { return Err(Error::NotYetImplemented( @@ -105,9 +107,8 @@ fn read_compressed_buffer( let mut buffer = vec![T::default(); length]; // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; buffer_length]; - reader.read_exact(&mut slice)?; + scratch.set_len(buffer_length); + reader.read_exact(scratch.as_mut())?; let out_slice = bytemuck::cast_slice_mut(&mut buffer); @@ -117,10 +118,10 @@ fn read_compressed_buffer( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&slice[8..], out_slice)?; + compression::decompress_lz4(&scratch.as_ref()[8..], out_slice)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&slice[8..], out_slice)?; + compression::decompress_zstd(&scratch.as_ref()[8..], out_slice)?; } } Ok(buffer) @@ -133,6 +134,7 @@ pub fn read_buffer( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut ReadBuffer, ) -> Result> { let buf = buf .pop_front() @@ -151,10 +153,15 @@ pub fn read_buffer( reader.seek(SeekFrom::Start(block_offset + offset))?; if let Some(compression) = compression { - Ok( - read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)? - .into(), - ) + Ok(read_compressed_buffer( + reader, + buffer_length, + length, + is_little_endian, + compression, + scratch, + )? + .into()) } else { Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into()) } diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 9f7ab43346f..430b4560bed 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -44,7 +44,8 @@ pub struct FileReader { dictionaries: Option, current_block: usize, projection: Option<(Vec, HashMap, Schema)>, - buffer: ReadBuffer, + data_scratch: ReadBuffer, + message_scratch: ReadBuffer, } fn read_dictionary_message( @@ -74,7 +75,8 @@ fn read_dictionary_block( metadata: &FileMetadata, block: &arrow_format::ipc::Block, dictionaries: &mut Dictionaries, - scratch: &mut ReadBuffer, + message_scratch: &mut ReadBuffer, + dictionary_scratch: &mut ReadBuffer, ) -> Result<()> { let offset: u64 = block .offset @@ -84,9 +86,9 @@ fn read_dictionary_block( .meta_data_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - read_dictionary_message(reader, offset, scratch)?; + read_dictionary_message(reader, offset, message_scratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let header = message @@ -105,6 +107,7 @@ fn read_dictionary_block( reader, block_offset, metadata.size, + dictionary_scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -125,9 +128,18 @@ pub fn read_file_dictionaries( } else { return Ok(HashMap::new()); }; + // use a temporary smaller scratch for the messages + let mut message_scratch = Default::default(); for block in blocks { - read_dictionary_block(reader, metadata, block, &mut dictionaries, scratch)?; + read_dictionary_block( + reader, + metadata, + block, + &mut dictionaries, + &mut message_scratch, + scratch, + )?; } Ok(dictionaries) } @@ -246,7 +258,8 @@ pub fn read_batch( metadata: &FileMetadata, projection: Option<&[usize]>, index: usize, - scratch: &mut ReadBuffer, + message_scratch: &mut ReadBuffer, + data_scratch: &mut ReadBuffer, ) -> Result>> { let block = metadata.blocks[index]; @@ -267,10 +280,10 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; - scratch.set_len(meta_len); - reader.read_exact(scratch.as_mut())?; + message_scratch.set_len(meta_len); + reader.read_exact(message_scratch.as_mut())?; - let message = arrow_format::ipc::MessageRef::read_as_root(scratch.as_ref()) + let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; let batch = get_serialized_batch(&message)?; @@ -297,6 +310,7 @@ pub fn read_batch( reader, offset + length, metadata.size, + data_scratch, ) } @@ -319,7 +333,8 @@ impl FileReader { dictionaries: Default::default(), projection, current_block: 0, - buffer: Default::default(), + data_scratch: Default::default(), + message_scratch: Default::default(), } } @@ -346,7 +361,7 @@ impl FileReader { self.dictionaries = Some(read_file_dictionaries( &mut self.reader, &self.metadata, - &mut self.buffer, + &mut self.data_scratch, )?); }; Ok(()) @@ -376,7 +391,8 @@ impl Iterator for FileReader { &self.metadata, self.projection.as_ref().map(|x| x.0.as_ref()), block, - &mut self.buffer, + &mut self.message_scratch, + &mut self.data_scratch, ); let chunk = if let Some((_, map, _)) = &self.projection { diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index a5b1610ebda..95bb148ff12 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -93,6 +93,7 @@ fn read_next( message_buffer: &mut ReadBuffer, data_buffer: &mut ReadBuffer, projection: &Option<(Vec, HashMap, Schema)>, + scratch: &mut ReadBuffer, ) -> Result> { // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -165,6 +166,7 @@ fn read_next( &mut reader, 0, file_size, + scratch, ); if let Some((_, map, _)) = projection { @@ -190,6 +192,7 @@ fn read_next( &mut dict_reader, 0, buf.len() as u64, + scratch, )?; // read the next message until we encounter a RecordBatch message @@ -200,6 +203,7 @@ fn read_next( message_buffer, data_buffer, projection, + scratch, ) } _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), @@ -220,6 +224,7 @@ pub struct StreamReader { data_buffer: ReadBuffer, message_buffer: ReadBuffer, projection: Option<(Vec, HashMap, Schema)>, + scratch: ReadBuffer, } impl StreamReader { @@ -246,6 +251,7 @@ impl StreamReader { data_buffer: Default::default(), message_buffer: Default::default(), projection, + scratch: Default::default(), } } @@ -278,6 +284,7 @@ impl StreamReader { &mut self.message_buffer, &mut self.data_buffer, &self.projection, + &mut self.scratch, )?; if batch.is_none() { self.finished = true; diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 6730999c5e4..6ce5593f75b 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -68,6 +68,7 @@ pub async fn read_stream_metadata_async( async fn maybe_next( mut state: ReadState, ) -> Result>> { + let mut scratch = Default::default(); // determine metadata length let mut meta_length: [u8; 4] = [0; 4]; @@ -124,6 +125,7 @@ async fn maybe_next( .map_err(|_| Error::from(OutOfSpecKind::UnexpectedNegativeInteger))?; state.data_buffer.set_len(block_length); + match header { arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { state.reader.read_exact(state.data_buffer.as_mut()).await?; @@ -138,6 +140,7 @@ async fn maybe_next( &mut std::io::Cursor::new(&state.data_buffer), 0, state.data_buffer.as_ref().len() as u64, + &mut scratch, ) .map(|chunk| Some(StreamState::Some((state, chunk)))) } @@ -157,6 +160,7 @@ async fn maybe_next( &mut dict_reader, 0, file_size, + &mut scratch, )?; // read the next message until we encounter a Chunk> message From 311eee836aed8aa1efd80e9743b05e330c63423f Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:27:32 +0200 Subject: [PATCH 05/10] Commit suggestion src/io/ipc/read/file_async.rs Co-authored-by: Jorge Leitao --- src/io/ipc/read/file_async.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 59ddd51c6f0..a33df9dafbe 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -227,8 +227,8 @@ where R: AsyncRead + AsyncSeek + Unpin, { let mut dictionaries = Default::default(); - let mut data = ReadBuffer::new(0); - let mut buffer = ReadBuffer::new(0); + let mut data: ReadBuffer = vec![].into(); + let mut buffer: ReadBuffer = vec![].into(); for block in blocks { let offset: u64 = block From 0c5972a7330dacfa6dcb6547325ddcd151dcdbac Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:27:54 +0200 Subject: [PATCH 06/10] Commit suggestion src/io/ipc/read/common.rs Co-authored-by: Jorge Leitao --- src/io/ipc/read/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 4e52d22ddde..34ac199cab0 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -293,7 +293,7 @@ pub fn read_dictionary( } /// A small wrapper around `[Vec]` that allows us to reuse memory once it is initialized. -/// This may improve performance of the `[Read]` trait. +/// This may improve performance of the [`Read`] trait. #[derive(Clone, Default)] pub struct ReadBuffer { data: Vec, From 33e91e77d4ce492a5c5fd545413cb63c590477eb Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:28:06 +0200 Subject: [PATCH 07/10] Commit suggestion src/io/ipc/read/common.rs Co-authored-by: Jorge Leitao --- src/io/ipc/read/common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 34ac199cab0..68f92ab2d53 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -292,7 +292,7 @@ pub fn read_dictionary( Ok(()) } -/// A small wrapper around `[Vec]` that allows us to reuse memory once it is initialized. +/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. /// This may improve performance of the [`Read`] trait. #[derive(Clone, Default)] pub struct ReadBuffer { From 023273ebaa0d39e1f6ad34cd4eba5ea16fbe523d Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 10:42:29 +0200 Subject: [PATCH 08/10] use exponential growing strategy --- src/io/ipc/read/common.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 4e52d22ddde..2999072184d 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -308,13 +308,15 @@ impl ReadBuffer { Self { data, length } } - /// Set the length of the [`ReadBuf`]. Contrary to the + /// Set the minimal length of the [`ReadBuf`]. Contrary to the /// method on `Vec` this is `safe` because this function guarantees that /// the underlying data always is initialized. pub fn set_len(&mut self, length: usize) { - dbg!(length, self.data.len(), self.data.capacity()); if length > self.data.capacity() { - self.data = vec![0; length]; + // exponential growing strategy + // benchmark showed it was ~5% faster + // in reading lz4 yellow-trip dataset + self.data = vec![0; length * 2]; } else if length > self.data.len() { self.data.resize(length, 0); } From 62a3d80fc145eab601d22fd45aa48036a87f6ab2 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 11:29:33 +0200 Subject: [PATCH 09/10] move ReadBuffer to io, as it is more general than IPC --- src/io/ipc/read/array/binary.rs | 3 +- src/io/ipc/read/array/dictionary.rs | 3 +- src/io/ipc/read/array/fixed_size_binary.rs | 3 +- src/io/ipc/read/array/fixed_size_list.rs | 4 +- src/io/ipc/read/array/list.rs | 6 +-- src/io/ipc/read/array/map.rs | 6 +-- src/io/ipc/read/array/primitive.rs | 3 +- src/io/ipc/read/array/struct_.rs | 4 +- src/io/ipc/read/array/union.rs | 6 +-- src/io/ipc/read/array/utf8.rs | 3 +- src/io/ipc/read/common.rs | 58 +--------------------- src/io/ipc/read/deserialize.rs | 3 +- src/io/ipc/read/file_async.rs | 5 +- src/io/ipc/read/mod.rs | 2 + src/io/ipc/read/read_basic.rs | 3 +- src/io/ipc/read/reader.rs | 1 + src/io/ipc/read/stream.rs | 1 + src/io/ipc/read/stream_async.rs | 3 +- src/io/mod.rs | 6 +++ src/io/readbuf.rs | 50 +++++++++++++++++++ 20 files changed, 83 insertions(+), 90 deletions(-) create mode 100644 src/io/readbuf.rs diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index d800408247e..51d3d7d26b8 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -5,10 +5,9 @@ use crate::array::{BinaryArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read_binary( diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index fee1d6a046f..0b212c45012 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -4,10 +4,9 @@ use std::io::{Read, Seek}; use crate::array::{DictionaryArray, DictionaryKey}; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node}; +use super::super::{Compression, IpcBuffer, Node, ReadBuffer}; use super::{read_primitive, skip_primitive}; #[allow(clippy::too_many_arguments)] diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index 9b03127901a..5dc1b2ab0bf 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -4,10 +4,9 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeBinaryArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index 7aa5c30c92e..ac169ed3f67 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -4,13 +4,11 @@ use std::io::{Read, Seek}; use crate::array::FixedSizeListArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index d913c856b64..bb85f233cf4 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -6,13 +6,13 @@ use crate::array::{ListArray, Offset}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_list( diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index fd887d7efb5..40b4f5c0389 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -5,13 +5,13 @@ use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_map( diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 1ae4c94bc2a..1e4000123b6 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -3,11 +3,10 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use crate::{array::PrimitiveArray, types::NativeType}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read_primitive( diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 76894f03c14..ae23c0f8829 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -4,13 +4,11 @@ use std::io::{Read, Seek}; use crate::array::StructArray; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version}; #[allow(clippy::too_many_arguments)] pub fn read_struct( diff --git a/src/io/ipc/read/array/union.rs b/src/io/ipc/read/array/union.rs index 4c7ea723004..fc907a32395 100644 --- a/src/io/ipc/read/array/union.rs +++ b/src/io/ipc/read/array/union.rs @@ -5,13 +5,13 @@ use crate::array::UnionArray; use crate::datatypes::DataType; use crate::datatypes::UnionMode::Dense; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::Dictionaries; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{ + Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version, +}; #[allow(clippy::too_many_arguments)] pub fn read_union( diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index 1e9b52eacad..10f1260f5ac 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -5,10 +5,9 @@ use crate::array::{Offset, Utf8Array}; use crate::buffer::Buffer; use crate::datatypes::DataType; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read_utf8( diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index eb0f6e0bf97..18ddd8cbb8b 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -9,6 +9,7 @@ use crate::datatypes::{DataType, Field}; use crate::error::{Error, Result}; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::{IpcField, IpcSchema}; +use crate::io::ReadBuffer; use super::deserialize::{read, skip}; use super::Dictionaries; @@ -292,63 +293,6 @@ pub fn read_dictionary( Ok(()) } -/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. -/// This may improve performance of the [`Read`] trait. -#[derive(Clone, Default)] -pub struct ReadBuffer { - data: Vec, - // length to be read or is read - length: usize, -} - -impl ReadBuffer { - /// Create a new [`ReadBuf`] initialized to `length` - pub fn new(length: usize) -> Self { - let data = vec![0; length]; - Self { data, length } - } - - /// Set the minimal length of the [`ReadBuf`]. Contrary to the - /// method on `Vec` this is `safe` because this function guarantees that - /// the underlying data always is initialized. - pub fn set_len(&mut self, length: usize) { - if length > self.data.capacity() { - // exponential growing strategy - // benchmark showed it was ~5% faster - // in reading lz4 yellow-trip dataset - self.data = vec![0; length * 2]; - } else if length > self.data.len() { - self.data.resize(length, 0); - } - self.length = length; - } -} - -impl AsRef<[u8]> for ReadBuffer { - fn as_ref(&self) -> &[u8] { - &self.data[..self.length] - } -} - -impl AsMut<[u8]> for ReadBuffer { - fn as_mut(&mut self) -> &mut [u8] { - &mut self.data[..self.length] - } -} - -impl From> for ReadBuffer { - fn from(data: Vec) -> Self { - let length = data.len(); - Self { data, length } - } -} - -impl From for Vec { - fn from(buf: ReadBuffer) -> Self { - buf.data - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 337ee58bd9a..3378f99a894 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -7,11 +7,10 @@ use arrow_format::ipc::MetadataVersion; use crate::array::*; use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::Result; -use crate::io::ipc::read::common::ReadBuffer; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; -use super::{IpcBuffer, Node}; +use super::{IpcBuffer, Node, ReadBuffer}; #[allow(clippy::too_many_arguments)] pub fn read( diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index a33df9dafbe..cabfdd43b70 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -13,13 +13,12 @@ use crate::datatypes::{Field, Schema}; use crate::error::{Error, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; -use super::common::{ - apply_projection, prepare_projection, read_dictionary, read_record_batch, ReadBuffer, -}; +use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; +use super::ReadBuffer; /// Async reader for Arrow IPC files pub struct FileStream<'a> { diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 5ffe6426e20..f25da7a4a1c 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,6 +27,8 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; +use super::super::ReadBuffer; + pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 585e0e33eff..cc04035eec7 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -3,12 +3,11 @@ use std::{collections::VecDeque, convert::TryInto}; use crate::buffer::Buffer; use crate::error::{Error, Result}; -use crate::io::ipc::read::common::ReadBuffer; use crate::{bitmap::Bitmap, types::NativeType}; use super::super::compression; use super::super::endianess::is_native_little_endian; -use super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer}; fn read_swapped( reader: &mut R, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 430b4560bed..b01d85cf499 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -13,6 +13,7 @@ use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; use arrow_format::ipc::planus::ReadAsRoot; /// Metadata of an Arrow IPC file, written in the footer of the file. diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 95bb148ff12..5dd7b5cddda 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -15,6 +15,7 @@ use super::common::*; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; /// Metadata of an Arrow IPC stream, written at the start of the stream #[derive(Debug, Clone)] diff --git a/src/io/ipc/read/stream_async.rs b/src/io/ipc/read/stream_async.rs index 6ce5593f75b..cfbfe5ad3e0 100644 --- a/src/io/ipc/read/stream_async.rs +++ b/src/io/ipc/read/stream_async.rs @@ -12,10 +12,11 @@ use crate::chunk::Chunk; use crate::error::{Error, Result}; use super::super::CONTINUATION_MARKER; -use super::common::{read_dictionary, read_record_batch, ReadBuffer}; +use super::common::{read_dictionary, read_record_batch}; use super::schema::deserialize_stream_metadata; use super::Dictionaries; use super::OutOfSpecKind; +use super::ReadBuffer; use super::StreamMetadata; /// A (private) state of stream messages diff --git a/src/io/mod.rs b/src/io/mod.rs index 9343d4281ce..95a17ee8bf3 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -53,3 +53,9 @@ pub mod print; #[cfg(any(feature = "io_csv_write", feature = "io_avro", feature = "io_json"))] mod iterator; + +#[cfg(feature = "io_ipc")] +mod readbuf; + +#[cfg(feature = "io_ipc")] +pub use readbuf::ReadBuffer; diff --git a/src/io/readbuf.rs b/src/io/readbuf.rs new file mode 100644 index 00000000000..d30396a8289 --- /dev/null +++ b/src/io/readbuf.rs @@ -0,0 +1,50 @@ +/// A small wrapper around [`Vec`] that allows us to reuse memory once it is initialized. +/// This may improve performance of the [`Read`] trait. +#[derive(Clone, Default)] +pub struct ReadBuffer { + data: Vec, + // length to be read or is read + length: usize, +} + +impl ReadBuffer { + /// Set the minimal length of the [`ReadBuf`]. Contrary to the + /// method on `Vec` this is `safe` because this function guarantees that + /// the underlying data always is initialized. + pub fn set_len(&mut self, length: usize) { + if length > self.data.capacity() { + // exponential growing strategy + // benchmark showed it was ~5% faster + // in reading lz4 yellow-trip dataset + self.data = vec![0; length * 2]; + } else if length > self.data.len() { + self.data.resize(length, 0); + } + self.length = length; + } +} + +impl AsRef<[u8]> for ReadBuffer { + fn as_ref(&self) -> &[u8] { + &self.data[..self.length] + } +} + +impl AsMut<[u8]> for ReadBuffer { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.data[..self.length] + } +} + +impl From> for ReadBuffer { + fn from(data: Vec) -> Self { + let length = data.len(); + Self { data, length } + } +} + +impl From for Vec { + fn from(buf: ReadBuffer) -> Self { + buf.data + } +} From 3fe8248e6473f15f889d6e9890b17481788119d9 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 27 Jun 2022 11:46:48 +0200 Subject: [PATCH 10/10] fix integration tests --- .../src/flight_client_scenarios/integration_test.rs | 2 +- .../src/flight_server_scenarios/integration_test.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 19a4b1a4ef1..a731a06eab1 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -276,7 +276,7 @@ async fn receive_batch_flight_data( { let length = data.data_body.len(); let mut reader = std::io::Cursor::new(&data.data_body); - read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64) + read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default()) .expect("Error reading dictionary"); data = resp.next().await?.ok()?; diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 07453f5c724..10424759e87 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -296,6 +296,7 @@ async fn record_batch_from_message( &mut reader, 0, length as u64, + &mut Default::default() ); arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e))) @@ -312,7 +313,7 @@ async fn dictionary_from_message( let mut reader = std::io::Cursor::new(data_body); let dictionary_batch_result = - ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64); + ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default()); dictionary_batch_result .map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e))) }