Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
make sure that scratch is used in decompressing data
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 27, 2022
1 parent 53d1b54 commit 5bed494
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 22 deletions.
3 changes: 2 additions & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
let schema = metadata.schema.clone();

// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata, Default::default())?;
let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?;

let chunk_index = 0;

Expand All @@ -44,6 +44,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
None,
chunk_index,
&mut Default::default(),
&mut Default::default(),
)?;

Ok((schema, chunk))
Expand Down
1 change: 1 addition & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub fn deserialize_batch(
&mut reader,
0,
length as u64,
&mut Default::default(),
),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
Expand Down
5 changes: 5 additions & 0 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -17,6 +19,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +49,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -58,6 +62,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

BinaryArray::<O>::try_new(data_type, offsets, values, validity)
Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io::{Read, Seek};

use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node};
Expand All @@ -19,6 +20,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
block_offset: u64,
compression: Option<Compression>,
is_little_endian: bool,
scratch: &mut ReadBuffer,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down Expand Up @@ -47,6 +49,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values))
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use std::io::{Read, Seek};
use crate::array::FixedSizeBinaryArray;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +18,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +49,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

FixedSizeBinaryArray::try_new(data_type, values, validity)
Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io::{Read, Seek};
use crate::array::FixedSizeListArray;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
Expand All @@ -23,6 +24,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -53,6 +55,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)?;
FixedSizeListArray::try_new(data_type, values, validity)
}
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::array::{ListArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
Expand All @@ -25,6 +26,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<ListArray<O>>
where
Vec<u8>: TryInto<O::Bytes>,
Expand Down Expand Up @@ -57,6 +59,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -74,6 +77,7 @@ where
is_little_endian,
compression,
version,
scratch,
)?;
ListArray::try_new(data_type, offsets, values, validity)
}
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::array::MapArray;
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
Expand All @@ -24,6 +25,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -53,6 +55,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<i32>::from(vec![0i32])))?;
Expand All @@ -70,6 +73,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)?;
MapArray::try_new(data_type, offsets, field, validity)
}
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::{collections::VecDeque, convert::TryInto};

use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_primitive<T: NativeType, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +18,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<PrimitiveArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down Expand Up @@ -48,6 +51,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;
PrimitiveArray::<T>::try_new(data_type, values, validity)
}
Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io::{Read, Seek};
use crate::array::StructArray;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
Expand All @@ -23,6 +24,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -57,6 +59,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
5 changes: 5 additions & 0 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::array::UnionArray;
use crate::datatypes::DataType;
use crate::datatypes::UnionMode::Dense;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
Expand All @@ -24,6 +25,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -50,6 +52,7 @@ pub fn read_union<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let offsets = if let DataType::Union(_, _, mode) = data_type {
Expand All @@ -61,6 +64,7 @@ pub fn read_union<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?)
} else {
None
Expand All @@ -86,6 +90,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
5 changes: 5 additions & 0 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use crate::array::{Offset, Utf8Array};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{Error, Result};
use crate::io::ipc::read::common::ReadBuffer;

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_utf8<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -17,6 +19,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<Utf8Array<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +49,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -58,6 +62,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

Utf8Array::<O>::try_new(data_type, offsets, values, validity)
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub fn read_record_batch<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
Expand Down Expand Up @@ -136,6 +137,7 @@ pub fn read_record_batch<R: Read + Seek>(
Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
})?,
version,
scratch,
)?)),
ProjectionResult::NotSelected((field, _)) => {
skip(&mut field_nodes, &field.data_type, &mut buffers)?;
Expand All @@ -162,6 +164,7 @@ pub fn read_record_batch<R: Read + Seek>(
Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err))
})?,
version,
scratch,
)
})
.collect::<Result<Vec<_>>>()?
Expand Down Expand Up @@ -221,6 +224,7 @@ fn first_dict_field<'a>(

/// Read the dictionary from the buffer and provided metadata,
/// updating the `dictionaries` with the resulting dictionary
#[allow(clippy::too_many_arguments)]
pub fn read_dictionary<R: Read + Seek>(
batch: arrow_format::ipc::DictionaryBatchRef,
fields: &[Field],
Expand All @@ -229,6 +233,7 @@ pub fn read_dictionary<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
file_size: u64,
scratch: &mut ReadBuffer,
) -> Result<()> {
if batch
.is_delta()
Expand Down Expand Up @@ -270,6 +275,7 @@ pub fn read_dictionary<R: Read + Seek>(
reader,
block_offset,
file_size,
scratch,
)?;
let mut arrays = columns.into_arrays();
arrays.pop().unwrap()
Expand Down
Loading

0 comments on commit 5bed494

Please sign in to comment.