Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Reduced reallocations when reading from IPC (~12%) (#1105)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jun 27, 2022
1 parent a11e258 commit 09817a4
Show file tree
Hide file tree
Showing 25 changed files with 252 additions and 98 deletions.
5 changes: 3 additions & 2 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
let schema = metadata.schema.clone();

// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?;
let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?;

let chunk_index = 0;

Expand All @@ -43,7 +43,8 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
&metadata,
None,
chunk_index,
&mut vec![],
&mut Default::default(),
&mut Default::default(),
)?;

Ok((schema, chunk))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async fn receive_batch_flight_data(
{
let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64)
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default())
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ async fn record_batch_from_message(
&mut reader,
0,
length as u64,
&mut Default::default()
);

arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
Expand All @@ -312,7 +313,7 @@ async fn dictionary_from_message(
let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result =
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64);
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64, &mut Default::default());
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand Down
1 change: 1 addition & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub fn deserialize_batch(
&mut reader,
0,
length as u64,
&mut Default::default(),
),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ impl<R: Read + Seek + Write> FileWriter<R> {
));
}

let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?;
let dictionaries =
read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;

let last_block = metadata.blocks.last().ok_or_else(|| {
Error::oos("An Arrow IPC file must have at least 1 message (the schema message)")
Expand Down
6 changes: 5 additions & 1 deletion src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};

#[allow(clippy::too_many_arguments)]
pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -17,6 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +48,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -58,6 +61,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

BinaryArray::<O>::try_new(data_type, offsets, values, validity)
Expand Down
4 changes: 3 additions & 1 deletion src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::{Error, Result};

use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, ReadBuffer};
use super::{read_primitive, skip_primitive};

#[allow(clippy::too_many_arguments)]
Expand All @@ -19,6 +19,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
block_offset: u64,
compression: Option<Compression>,
is_little_endian: bool,
scratch: &mut ReadBuffer,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down Expand Up @@ -47,6 +48,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values))
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +48,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

FixedSizeBinaryArray::try_new(data_type, values, validity)
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
Expand All @@ -23,6 +22,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -53,6 +53,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)?;
FixedSizeListArray::try_new(data_type, values, validity)
}
Expand Down
8 changes: 6 additions & 2 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand All @@ -25,6 +26,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<ListArray<O>>
where
Vec<u8>: TryInto<O::Bytes>,
Expand Down Expand Up @@ -57,6 +59,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -74,6 +77,7 @@ where
is_little_endian,
compression,
version,
scratch,
)?;
ListArray::try_new(data_type, offsets, values, validity)
}
Expand Down
8 changes: 6 additions & 2 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand All @@ -24,6 +25,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -53,6 +55,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<i32>::from(vec![0i32])))?;
Expand All @@ -70,6 +73,7 @@ pub fn read_map<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)?;
MapArray::try_new(data_type, offsets, field, validity)
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};

#[allow(clippy::too_many_arguments)]
pub fn read_primitive<T: NativeType, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +17,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<PrimitiveArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
Expand Down Expand Up @@ -48,6 +50,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;
PrimitiveArray::<T>::try_new(data_type, values, validity)
}
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, ReadBuffer, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_struct<R: Read + Seek>(
Expand All @@ -23,6 +22,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -57,6 +57,7 @@ pub fn read_struct<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use crate::error::{Error, Result};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};
use super::super::{
Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, ReadBuffer, Version,
};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand All @@ -24,6 +25,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian: bool,
compression: Option<Compression>,
version: Version,
scratch: &mut ReadBuffer,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -50,6 +52,7 @@ pub fn read_union<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let offsets = if let DataType::Union(_, _, mode) = data_type {
Expand All @@ -61,6 +64,7 @@ pub fn read_union<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?)
} else {
None
Expand All @@ -86,6 +90,7 @@ pub fn read_union<R: Read + Seek>(
is_little_endian,
compression,
version,
scratch,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
6 changes: 5 additions & 1 deletion src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, ReadBuffer};

#[allow(clippy::too_many_arguments)]
pub fn read_utf8<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -17,6 +18,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut ReadBuffer,
) -> Result<Utf8Array<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand Down Expand Up @@ -46,6 +48,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;
Expand All @@ -58,6 +61,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

Utf8Array::<O>::try_new(data_type, offsets, values, validity)
Expand Down
Loading

0 comments on commit 09817a4

Please sign in to comment.