Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added IPC compressed writing (#566)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Nov 4, 2021
1 parent 3c10b16 commit ec11db5
Show file tree
Hide file tree
Showing 10 changed files with 617 additions and 194 deletions.
45 changes: 45 additions & 0 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,48 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()>
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC.".to_string()))
}

#[cfg(feature = "io_ipc_compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap();
encoder.write_all(input_buf).map_err(|e| e.into())
}

#[cfg(feature = "io_ipc_compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = zstd::Encoder::new(output_buf, 17)?.auto_finish();
encoder.write_all(input_buf).map_err(|e| e.into())
}

#[cfg(not(feature = "io_ipc_compression"))]
pub fn compress_lz4(_input_buf: &[u8], _output_buf: &mut Vec<u8>) -> Result<()> {
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string()))
}

#[cfg(not(feature = "io_ipc_compression"))]
pub fn compress_zstd(_input_buf: &[u8], _output_buf: &mut Vec<u8>) -> Result<()> {
use crate::error::ArrowError;
Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string()))
}

#[cfg(test)]
mod tests {
use super::*;

#[cfg(feature = "io_ipc_compression")]
#[test]
fn round_trip() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
compress_zstd(&data, &mut buffer).unwrap();

let mut result = vec![0; 200];
decompress_zstd(&buffer, &mut result).unwrap();
assert_eq!(data, result);
}
}
5 changes: 3 additions & 2 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap().0;

Expand All @@ -27,7 +28,7 @@ pub fn read_boolean<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
None,
compression,
)?;

let values = read_bitmap(
Expand All @@ -36,7 +37,7 @@ pub fn read_boolean<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
None,
compression,
)?;
Ok(BooleanArray::from_data(data_type, values, validity))
}
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
compression: Option<ipc::Message::BodyCompression>,
is_little_endian: bool,
) -> Result<DictionaryArray<T>>
where
Expand All @@ -29,7 +30,7 @@ where
reader,
block_offset,
is_little_endian,
None,
compression,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values.clone()))
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub fn read<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
compression,
)
.map(|x| Arc::new(x) as Arc<dyn Array>),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Expand Down Expand Up @@ -169,6 +170,7 @@ pub fn read<R: Read + Seek>(
buffers,
reader,
block_offset,
compression,
is_little_endian,
)
.map(|x| Arc::new(x) as Arc<dyn Array>)
Expand Down
49 changes: 18 additions & 31 deletions src/io/ipc/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,33 +107,26 @@ fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
let mut slice = vec![0u8; buffer_length];
reader.read_exact(&mut slice)?;

// Safety:
// This is safe because T is NativeType, which by definition can be transmuted to u8
let out_slice = unsafe {
std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
)
};

match compression.codec() {
CompressionType::LZ4_FRAME => {
// fast case where we can just copy the contents as is
unsafe {
// transmute T to bytes.
let out_slice = std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
);
compression::decompress_lz4(&slice[8..], out_slice)?
}
compression::decompress_lz4(&slice[8..], out_slice)?;
Ok(buffer)
}
CompressionType::ZSTD => {
// fast case where we can just copy the contents as is
unsafe {
// transmute T to bytes.
let out_slice = std::slice::from_raw_parts_mut(
buffer.as_mut_ptr() as *mut u8,
length * std::mem::size_of::<T>(),
);
compression::decompress_zstd(&slice[8..], out_slice)?
}
compression::decompress_zstd(&slice[8..], out_slice)?;
Ok(buffer)
}
_ => Err(ArrowError::NotYetImplemented(
"Non LZ4 compressed IPC".to_string(),
"Compression format".to_string(),
)),
}
}
Expand Down Expand Up @@ -184,25 +177,19 @@ fn read_compressed_bitmap<R: Read + Seek>(
reader: &mut R,
) -> Result<MutableBuffer<u8>> {
let mut buffer = MutableBuffer::<u8>::from_len_zeroed((length + 7) / 8);

// read all first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

match compression.codec() {
CompressionType::LZ4_FRAME => {
// decompress first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

compression::decompress_lz4(&slice[8..], &mut buffer)?;

Ok(buffer)
}
CompressionType::ZSTD => {
// decompress first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;

compression::decompress_zstd(&slice[8..], &mut buffer)?;

Ok(buffer)
}
_ => Err(ArrowError::NotYetImplemented(
Expand Down
60 changes: 54 additions & 6 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc};

use arrow_format::ipc;
use arrow_format::ipc::flatbuffers::FlatBufferBuilder;
use arrow_format::ipc::Message::CompressionType;

use crate::array::Array;
use crate::error::{ArrowError, Result};
Expand All @@ -31,8 +32,17 @@ use crate::{array::DictionaryArray, datatypes::*};
use super::super::CONTINUATION_MARKER;
use super::{write, write_dictionary};

/// Compression codec
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Compression {
/// LZ4 (framed)
LZ4,
/// ZSTD
ZSTD,
}

/// IPC write options used to control the behaviour of the writer
#[derive(Debug)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Generally 8 or 64, defaults to 8
Expand All @@ -48,6 +58,8 @@ pub struct IpcWriteOptions {
/// version 2.0.0: V4, with legacy format enabled
/// version 4.0.0: V5
metadata_version: ipc::Schema::MetadataVersion,
/// Whether the buffers should be compressed
compression: Option<Compression>,
}

impl IpcWriteOptions {
Expand All @@ -56,6 +68,7 @@ impl IpcWriteOptions {
alignment: usize,
write_legacy_ipc_format: bool,
metadata_version: ipc::Schema::MetadataVersion,
compression: Option<Compression>,
) -> Result<Self> {
if alignment == 0 || alignment % 8 != 0 {
return Err(ArrowError::InvalidArgumentError(
Expand All @@ -72,6 +85,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
compression,
}),
ipc::Schema::MetadataVersion::V5 => {
if write_legacy_ipc_format {
Expand All @@ -83,6 +97,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
compression,
})
}
}
Expand All @@ -101,6 +116,7 @@ impl Default for IpcWriteOptions {
alignment: 8,
write_legacy_ipc_format: false,
metadata_version: ipc::Schema::MetadataVersion::V5,
compression: None,
}
}
}
Expand Down Expand Up @@ -157,18 +173,34 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) -
&mut nodes,
&mut offset,
is_native_little_endian(),
write_options.compression,
)
}

// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);

let compression = if let Some(compression) = write_options.compression {
let compression = match compression {
Compression::LZ4 => CompressionType::LZ4_FRAME,
Compression::ZSTD => CompressionType::ZSTD,
};
let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb);
compression_builder.add_codec(compression);
Some(compression_builder.finish())
} else {
None
};

let root = {
let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
if let Some(compression) = compression {
batch_builder.add_compression(compression)
}
let b = batch_builder.finish();
b.as_union_value()
};
Expand Down Expand Up @@ -209,18 +241,34 @@ fn dictionary_batch_to_bytes(
&mut nodes,
&mut 0,
is_little_endian,
write_options.compression,
false,
);

// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);

let compression = if let Some(compression) = write_options.compression {
let compression = match compression {
Compression::LZ4 => CompressionType::LZ4_FRAME,
Compression::ZSTD => CompressionType::ZSTD,
};
let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb);
compression_builder.add_codec(compression);
Some(compression_builder.finish())
} else {
None
};

let root = {
let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(length as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
if let Some(compression) = compression {
batch_builder.add_compression(compression)
}
batch_builder.finish()
};

Expand Down Expand Up @@ -358,18 +406,18 @@ pub fn write_message<W: Write>(
}

fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
let len = data.len() as u32;
let pad_len = pad_to_8(len) as u32;
let len = data.len();
let pad_len = pad_to_8(data.len());
let total_len = len + pad_len;

// write body buffer
writer.write_all(data)?;
if pad_len > 0 {
writer.write_all(&vec![0u8; pad_len as usize][..])?;
writer.write_all(&vec![0u8; pad_len][..])?;
}

writer.flush()?;
Ok(total_len as usize)
Ok(total_len)
}

/// Write a record batch to the writer, writing the message size before the message
Expand Down Expand Up @@ -411,6 +459,6 @@ pub fn write_continuation<W: Write>(

/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
#[inline]
pub(crate) fn pad_to_8(len: u32) -> usize {
pub(crate) fn pad_to_8(len: usize) -> usize {
(((len + 7) & !7) - len) as usize
}
2 changes: 1 addition & 1 deletion src/io/ipc/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod stream;
mod writer;

pub use arrow_format::ipc::Schema::MetadataVersion;
pub use common::IpcWriteOptions;
pub use common::{Compression, IpcWriteOptions};
pub use schema::schema_to_bytes;
pub use serialize::{write, write_dictionary};
pub use stream::StreamWriter;
Expand Down
Loading

0 comments on commit ec11db5

Please sign in to comment.