From 5b3e897be986189308d04683daf04c88a6fa9473 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 29 Jun 2022 14:24:57 +0000 Subject: [PATCH] Improved reading of bitmaps from IPC --- src/io/ipc/read/array/binary.rs | 1 + src/io/ipc/read/array/boolean.rs | 4 ++++ src/io/ipc/read/array/fixed_size_binary.rs | 1 + src/io/ipc/read/array/fixed_size_list.rs | 1 + src/io/ipc/read/array/list.rs | 1 + src/io/ipc/read/array/map.rs | 1 + src/io/ipc/read/array/primitive.rs | 1 + src/io/ipc/read/array/struct_.rs | 1 + src/io/ipc/read/array/utf8.rs | 1 + src/io/ipc/read/deserialize.rs | 3 ++- src/io/ipc/read/read_basic.rs | 28 +++++++++++++--------- src/io/ipc/read/reader.rs | 9 +++++-- 12 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/io/ipc/read/array/binary.rs b/src/io/ipc/read/array/binary.rs index 917f46b4017..f32ca3e8713 100644 --- a/src/io/ipc/read/array/binary.rs +++ b/src/io/ipc/read/array/binary.rs @@ -34,6 +34,7 @@ pub fn read_binary( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index 4bfe9063a57..4440062a299 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -8,6 +8,7 @@ use crate::error::{Error, Result}; use super::super::read_basic::*; use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +#[allow(clippy::too_many_arguments)] pub fn read_boolean( field_nodes: &mut VecDeque, data_type: DataType, @@ -16,6 +17,7 @@ pub fn read_boolean( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut Vec, ) -> Result { let field_node = field_nodes.pop_front().ok_or_else(|| { Error::oos(format!( @@ -36,6 +38,7 @@ pub fn read_boolean( block_offset, is_little_endian, compression, + scratch, )?; let values = read_bitmap( @@ -45,6 +48,7 @@ pub fn read_boolean( block_offset, is_little_endian, compression, + scratch, )?; BooleanArray::try_new(data_type, values, validity) } diff --git a/src/io/ipc/read/array/fixed_size_binary.rs b/src/io/ipc/read/array/fixed_size_binary.rs index e4cb0480def..ffceb00bc94 100644 --- a/src/io/ipc/read/array/fixed_size_binary.rs +++ b/src/io/ipc/read/array/fixed_size_binary.rs @@ -33,6 +33,7 @@ pub fn read_fixed_size_binary( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/fixed_size_list.rs b/src/io/ipc/read/array/fixed_size_list.rs index d8eb9725ed8..3ea4ef42b30 100644 --- a/src/io/ipc/read/array/fixed_size_list.rs +++ b/src/io/ipc/read/array/fixed_size_list.rs @@ -38,6 +38,7 @@ pub fn read_fixed_size_list( block_offset, is_little_endian, compression, + scratch, )?; let (field, _) = FixedSizeListArray::get_child_and_size(&data_type); diff --git a/src/io/ipc/read/array/list.rs b/src/io/ipc/read/array/list.rs index 44fb17bd2a6..e12dc5e02ec 100644 --- a/src/io/ipc/read/array/list.rs +++ b/src/io/ipc/read/array/list.rs @@ -43,6 +43,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs index 5020a157a0a..8a2a5e4a124 100644 --- a/src/io/ipc/read/array/map.rs +++ b/src/io/ipc/read/array/map.rs @@ -39,6 +39,7 @@ pub fn read_map( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/primitive.rs b/src/io/ipc/read/array/primitive.rs index 75a62e5a835..434aab0bd9c 100644 --- a/src/io/ipc/read/array/primitive.rs +++ b/src/io/ipc/read/array/primitive.rs @@ -36,6 +36,7 @@ where block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/array/struct_.rs b/src/io/ipc/read/array/struct_.rs index 469689d247b..3bbd777a019 100644 --- a/src/io/ipc/read/array/struct_.rs +++ b/src/io/ipc/read/array/struct_.rs @@ -38,6 +38,7 @@ pub fn read_struct( block_offset, is_little_endian, compression, + scratch, )?; let fields = StructArray::get_fields(&data_type); diff --git a/src/io/ipc/read/array/utf8.rs b/src/io/ipc/read/array/utf8.rs index af25eaed896..342da0c4de8 100644 --- a/src/io/ipc/read/array/utf8.rs +++ b/src/io/ipc/read/array/utf8.rs @@ -34,6 +34,7 @@ pub fn read_utf8( block_offset, is_little_endian, compression, + scratch, )?; let length: usize = field_node diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 3ed5c9349c7..780f33e0d59 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -39,6 +39,7 @@ pub fn read( block_offset, is_little_endian, compression, + scratch, ) .map(|x| x.boxed()), Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { @@ -50,7 +51,7 @@ pub fn read( block_offset, is_little_endian, compression, - scratch + scratch, ) .map(|x| x.boxed()) }), diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 7a5ae859a6a..e58b01b58ed 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -181,10 +181,13 @@ fn read_uncompressed_bitmap( number_of_bits: bytes * 8, })); } - // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read - // see also https://github.com/MaikKlein/ash/issues/354#issue-781730580 - let mut buffer = vec![0; bytes]; - reader.read_exact(buffer.as_mut_slice())?; + + let mut buffer = vec![]; + buffer.try_reserve(bytes)?; + reader + .by_ref() + .take(bytes as u64) + .read_to_end(&mut buffer)?; Ok(buffer) } @@ -194,13 +197,13 @@ fn read_compressed_bitmap( bytes: usize, compression: Compression, reader: &mut R, + scratch: &mut Vec, ) -> Result> { let mut buffer = vec![0; (length + 7) / 8]; - // read all first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; + scratch.clear(); + scratch.try_reserve(bytes)?; + reader.by_ref().take(bytes as u64).read_to_end(scratch)?; let compression = compression .codec() @@ -208,10 +211,10 @@ fn read_compressed_bitmap( match compression { arrow_format::ipc::CompressionType::Lz4Frame => { - compression::decompress_lz4(&slice[8..], &mut buffer)?; + compression::decompress_lz4(&scratch[8..], &mut buffer)?; } arrow_format::ipc::CompressionType::Zstd => { - compression::decompress_zstd(&slice[8..], &mut buffer)?; + compression::decompress_zstd(&scratch[8..], &mut buffer)?; } } Ok(buffer) @@ -224,6 +227,7 @@ pub fn read_bitmap( block_offset: u64, _: bool, compression: Option, + scratch: &mut Vec, ) -> Result { let buf = buf .pop_front() @@ -242,7 +246,7 @@ pub fn read_bitmap( reader.seek(SeekFrom::Start(block_offset + offset))?; let buffer = if let Some(compression) = compression { - read_compressed_bitmap(length, bytes, compression, reader) + read_compressed_bitmap(length, bytes, compression, reader, scratch) } else { read_uncompressed_bitmap(length, bytes, reader) }?; @@ -257,6 +261,7 @@ pub fn read_validity( block_offset: u64, is_little_endian: bool, compression: Option, + scratch: &mut Vec, ) -> Result> { let length: usize = field_node .length() @@ -271,6 +276,7 @@ pub fn read_validity( block_offset, is_little_endian, compression, + scratch, )?) } else { let _ = buffers diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 776ee684e82..801f5041aed 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -230,9 +230,14 @@ pub fn read_file_metadata(reader: &mut R) -> Result