From 15ce4cf11e6c67807b991f7437c55525bd3b3a4a Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 13 Dec 2021 17:11:31 +0100 Subject: [PATCH] Made IPC reader less restrictive (#678) * Allow file to not contian dictionaries. * Allow different versions between message and file --- src/io/ipc/read/reader.rs | 92 +++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index be8668de929..d8f09c0e9bf 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow_format::ipc; use arrow_format::ipc::flatbuffers::VerifierOptions; +use arrow_format::ipc::File::Block; use crate::array::*; use crate::datatypes::Schema; @@ -87,6 +88,48 @@ fn read_dictionary_message( Ok(()) } +fn read_dictionaries( + reader: &mut R, + schema: &Schema, + is_little_endian: bool, + blocks: &[Block], +) -> Result>> { + let mut dictionaries = Default::default(); + let mut data = vec![]; + + for block in blocks { + let offset = block.offset() as u64; + let length = block.metaDataLength() as u64; + read_dictionary_message(reader, offset, &mut data)?; + + let message = ipc::Message::root_as_message(&data).map_err(|err| { + ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) + })?; + + match message.header_type() { + ipc::Message::MessageHeader::DictionaryBatch => { + let block_offset = offset + length; + let batch = message.header_as_dictionary_batch().unwrap(); + read_dictionary( + batch, + schema, + is_little_endian, + &mut dictionaries, + reader, + block_offset, + )?; + } + t => { + return Err(ArrowError::OutOfSpec(format!( + "Expecting DictionaryBatch in dictionary blocks, found {:?}.", + t + ))); + } + }; + } + Ok(dictionaries) +} + /// Read the IPC file's metadata pub fn read_file_metadata(reader: &mut R) -> Result { // check if header and footer contain correct magic bytes @@ -138,43 +181,14 @@ pub fn read_file_metadata(reader: &mut R) -> Result { - let block_offset = offset + length; - let batch = message.header_as_dictionary_batch().unwrap(); - read_dictionary( - batch, - &schema, - is_little_endian, - &mut dictionaries, - reader, - block_offset, - )?; - } - t => { - return Err(ArrowError::OutOfSpec(format!( - "Expecting DictionaryBatch in dictionary blocks, found {:?}.", - t - ))); - } - }; - } Ok(FileMetadata { schema, is_little_endian, @@ -230,14 +244,6 @@ pub fn read_batch( let message = ipc::Message::root_as_message(&block_data[..]) .map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?; - // some old test data's footer metadata is not set, so we account for that - if metadata.version != ipc::Schema::MetadataVersion::V1 && message.version() != metadata.version - { - return Err(ArrowError::OutOfSpec( - "Could not read IPC message as metadata versions mismatch".to_string(), - )); - } - let batch = get_serialized_batch(&message)?; read_record_batch(