Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made IPC reader less restrictive (#678)
Browse files Browse the repository at this point in the history
* Allow file to not contian dictionaries.

* Allow different versions between message and file
  • Loading branch information
jorgecarleitao committed Dec 13, 2021
1 parent 89921d3 commit 15ce4cf
Showing 1 changed file with 49 additions and 43 deletions.
92 changes: 49 additions & 43 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::VerifierOptions;
use arrow_format::ipc::File::Block;

use crate::array::*;
use crate::datatypes::Schema;
Expand Down Expand Up @@ -87,6 +88,48 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
schema: &Schema,
is_little_endian: bool,
blocks: &[Block],
) -> Result<HashMap<usize, Arc<dyn Array>>> {
let mut dictionaries = Default::default();
let mut data = vec![];

for block in blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
schema,
is_little_endian,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(ArrowError::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(dictionaries)
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header and footer contain correct magic bytes
Expand Down Expand Up @@ -138,43 +181,14 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let (schema, is_little_endian) = convert::fb_to_schema(ipc_schema);
let schema = Arc::new(schema);

let mut dictionaries = Default::default();
let dictionary_blocks = footer.dictionaries();

let dictionary_blocks = footer.dictionaries().ok_or_else(|| {
ArrowError::OutOfSpec("Unable to get dictionaries from footer".to_string())
})?;

let mut data = vec![];
for block in dictionary_blocks {
let offset = block.offset() as u64;
let length = block.metaDataLength() as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = ipc::Message::root_as_message(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema, is_little_endian, blocks)?
} else {
Default::default()
};

match message.header_type() {
ipc::Message::MessageHeader::DictionaryBatch => {
let block_offset = offset + length;
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary(
batch,
&schema,
is_little_endian,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(ArrowError::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(FileMetadata {
schema,
is_little_endian,
Expand Down Expand Up @@ -230,14 +244,6 @@ pub fn read_batch<R: Read + Seek>(
let message = ipc::Message::root_as_message(&block_data[..])
.map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

// some old test data's footer metadata is not set, so we account for that
if metadata.version != ipc::Schema::MetadataVersion::V1 && message.version() != metadata.version
{
return Err(ArrowError::OutOfSpec(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}

let batch = get_serialized_batch(&message)?;

read_record_batch(
Expand Down

0 comments on commit 15ce4cf

Please sign in to comment.