Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
amortize encoded_message
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 16, 2023
1 parent 3fc6241 commit 11e5369
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl<R: Read + Seek + Write> FileWriter<R> {
dictionaries,
cannot_replace: true,
},
encoded_message: Default::default(),
})
}
}
42 changes: 32 additions & 10 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,25 @@ pub fn encode_chunk(
dictionary_tracker: &mut DictionaryTracker,
options: &WriteOptions,
) -> Result<(Vec<EncodedData>, EncodedData)> {
let mut encoded_message = EncodedData::default();
let encoded_dictionaries = encode_chunk_amortized(
chunk,
fields,
dictionary_tracker,
options,
&mut encoded_message,
)?;
Ok((encoded_dictionaries, encoded_message))
}

// Amortizes `EncodedData` allocation.
pub fn encode_chunk_amortized(
chunk: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
dictionary_tracker: &mut DictionaryTracker,
options: &WriteOptions,
encoded_message: &mut EncodedData,
) -> Result<Vec<EncodedData>> {
let mut encoded_dictionaries = vec![];

for (field, array) in fields.iter().zip(chunk.as_ref()) {
Expand All @@ -189,9 +208,9 @@ pub fn encode_chunk(
)?;
}

let encoded_message = chunk_to_bytes(chunk, options);
chunk_to_bytes_amortized(chunk, options, encoded_message);

Ok((encoded_dictionaries, encoded_message))
Ok(encoded_dictionaries)
}

fn serialize_compression(
Expand All @@ -213,10 +232,16 @@ fn serialize_compression(

/// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the
/// other for the batch's data
fn chunk_to_bytes(chunk: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> EncodedData {
fn chunk_to_bytes_amortized(
chunk: &Chunk<Box<dyn Array>>,
options: &WriteOptions,
encoded_message: &mut EncodedData,
) {
let mut nodes: Vec<arrow_format::ipc::FieldNode> = vec![];
let mut buffers: Vec<arrow_format::ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut arrow_data = std::mem::take(&mut encoded_message.arrow_data);
arrow_data.clear();

let mut offset = 0;
for array in chunk.arrays() {
write(
Expand Down Expand Up @@ -248,11 +273,8 @@ fn chunk_to_bytes(chunk: &Chunk<Box<dyn Array>>, options: &WriteOptions) -> Enco

let mut builder = Builder::new();
let ipc_message = builder.finish(&message, None);

EncodedData {
ipc_message: ipc_message.to_vec(),
arrow_data,
}
encoded_message.ipc_message = ipc_message.to_vec();
encoded_message.arrow_data = arrow_data
}

/// Write dictionary values into two sets of bytes, one for the header (ipc::Schema::Message) and the
Expand Down Expand Up @@ -360,7 +382,7 @@ impl DictionaryTracker {
}

/// Stores the encoded data, which is an ipc::Schema::Message, and optional Arrow data
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct EncodedData {
/// An encoded ipc::Schema::Message
pub ipc_message: Vec<u8>,
Expand Down
14 changes: 10 additions & 4 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow_format::ipc::planus::Builder;
use super::{
super::IpcField,
super::ARROW_MAGIC,
common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions},
common::{DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
default_ipc_fields, schema, schema_to_bytes,
};
Expand All @@ -14,6 +14,7 @@ use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::io::ipc::write::common::encode_chunk_amortized;

#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) enum State {
Expand Down Expand Up @@ -41,6 +42,10 @@ pub struct FileWriter<W: Write> {
pub(crate) state: State,
/// Keeps track of dictionaries that have been written
pub(crate) dictionary_tracker: DictionaryTracker,
/// Buffer/scratch that is reused between writes
/// This is public so a user can swap this in between
/// creating new writers to save/amortize allocations.
pub encoded_message: EncodedData,
}

impl<W: Write> FileWriter<W> {
Expand Down Expand Up @@ -83,6 +88,7 @@ impl<W: Write> FileWriter<W> {
dictionaries: Default::default(),
cannot_replace: true,
},
encoded_message: Default::default(),
}
}

Expand Down Expand Up @@ -132,12 +138,12 @@ impl<W: Write> FileWriter<W> {
} else {
self.ipc_fields.as_ref()
};

let (encoded_dictionaries, encoded_message) = encode_chunk(
let encoded_dictionaries = encode_chunk_amortized(
chunk,
ipc_fields,
&mut self.dictionary_tracker,
&self.options,
&mut self.encoded_message,
)?;

// add all dictionaries
Expand All @@ -153,7 +159,7 @@ impl<W: Write> FileWriter<W> {
self.block_offsets += meta + data;
}

let (meta, data) = write_message(&mut self.writer, &encoded_message)?;
let (meta, data) = write_message(&mut self.writer, &self.encoded_message)?;
// add a record block for the footer
let block = arrow_format::ipc::Block {
offset: self.block_offsets as i64,
Expand Down

0 comments on commit 11e5369

Please sign in to comment.