diff --git a/src/io/ipc/compression.rs b/src/io/ipc/compression.rs index 3598d869c16..a26a95b450c 100644 --- a/src/io/ipc/compression.rs +++ b/src/io/ipc/compression.rs @@ -27,3 +27,48 @@ pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> Result<()> use crate::error::ArrowError; Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC.".to_string())) } + +#[cfg(feature = "io_ipc_compression")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))] +pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + use std::io::Write; + let mut encoder = lz4::EncoderBuilder::new().build(output_buf).unwrap(); + encoder.write_all(input_buf).map_err(|e| e.into()) +} + +#[cfg(feature = "io_ipc_compression")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))] +pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + use std::io::Write; + let mut encoder = zstd::Encoder::new(output_buf, 17)?.auto_finish(); + encoder.write_all(input_buf).map_err(|e| e.into()) +} + +#[cfg(not(feature = "io_ipc_compression"))] +pub fn compress_lz4(_input_buf: &[u8], _output_buf: &mut Vec) -> Result<()> { + use crate::error::ArrowError; + Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string())) +} + +#[cfg(not(feature = "io_ipc_compression"))] +pub fn compress_zstd(_input_buf: &[u8], _output_buf: &mut Vec) -> Result<()> { + use crate::error::ArrowError; + Err(ArrowError::Ipc("The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC.".to_string())) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "io_ipc_compression")] + #[test] + fn round_trip() { + let data: Vec = (0..200u8).map(|x| x % 10).collect(); + let mut buffer = vec![]; + compress_zstd(&data, &mut buffer).unwrap(); + + let mut result = vec![0; 200]; + decompress_zstd(&buffer, &mut result).unwrap(); + assert_eq!(data, result); + } +} diff --git a/src/io/ipc/read/array/boolean.rs b/src/io/ipc/read/array/boolean.rs index ff40527b4fe..d9ea780d170 100644 --- a/src/io/ipc/read/array/boolean.rs +++ b/src/io/ipc/read/array/boolean.rs @@ -17,6 +17,7 @@ pub fn read_boolean( reader: &mut R, block_offset: u64, is_little_endian: bool, + compression: Option, ) -> Result { let field_node = field_nodes.pop_front().unwrap().0; @@ -27,7 +28,7 @@ pub fn read_boolean( reader, block_offset, is_little_endian, - None, + compression, )?; let values = read_bitmap( @@ -36,7 +37,7 @@ pub fn read_boolean( reader, block_offset, is_little_endian, - None, + compression, )?; Ok(BooleanArray::from_data(data_type, values, validity)) } diff --git a/src/io/ipc/read/array/dictionary.rs b/src/io/ipc/read/array/dictionary.rs index b89b9abdb55..0357457d014 100644 --- a/src/io/ipc/read/array/dictionary.rs +++ b/src/io/ipc/read/array/dictionary.rs @@ -15,6 +15,7 @@ pub fn read_dictionary( buffers: &mut VecDeque<&ipc::Schema::Buffer>, reader: &mut R, block_offset: u64, + compression: Option, is_little_endian: bool, ) -> Result> where @@ -29,7 +30,7 @@ where reader, block_offset, is_little_endian, - None, + compression, )?; Ok(DictionaryArray::::from_data(keys, values.clone())) diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index e3f8e7aabe5..e0fe6f7b755 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -44,6 +44,7 @@ pub fn read( reader, block_offset, is_little_endian, + compression, ) .map(|x| Arc::new(x) as Arc), Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { @@ -169,6 +170,7 @@ pub fn read( buffers, reader, block_offset, + compression, is_little_endian, ) .map(|x| Arc::new(x) as Arc) diff --git a/src/io/ipc/read/read_basic.rs b/src/io/ipc/read/read_basic.rs index 33c01a67d1a..be9562ff8b4 100644 --- a/src/io/ipc/read/read_basic.rs +++ b/src/io/ipc/read/read_basic.rs @@ -107,33 +107,26 @@ fn read_compressed_buffer( let mut slice = vec![0u8; buffer_length]; reader.read_exact(&mut slice)?; + // Safety: + // This is safe because T is NativeType, which by definition can be transmuted to u8 + let out_slice = unsafe { + std::slice::from_raw_parts_mut( + buffer.as_mut_ptr() as *mut u8, + length * std::mem::size_of::(), + ) + }; + match compression.codec() { CompressionType::LZ4_FRAME => { - // fast case where we can just copy the contents as is - unsafe { - // transmute T to bytes. - let out_slice = std::slice::from_raw_parts_mut( - buffer.as_mut_ptr() as *mut u8, - length * std::mem::size_of::(), - ); - compression::decompress_lz4(&slice[8..], out_slice)? - } + compression::decompress_lz4(&slice[8..], out_slice)?; Ok(buffer) } CompressionType::ZSTD => { - // fast case where we can just copy the contents as is - unsafe { - // transmute T to bytes. - let out_slice = std::slice::from_raw_parts_mut( - buffer.as_mut_ptr() as *mut u8, - length * std::mem::size_of::(), - ); - compression::decompress_zstd(&slice[8..], out_slice)? - } + compression::decompress_zstd(&slice[8..], out_slice)?; Ok(buffer) } _ => Err(ArrowError::NotYetImplemented( - "Non LZ4 compressed IPC".to_string(), + "Compression format".to_string(), )), } } @@ -184,25 +177,19 @@ fn read_compressed_bitmap( reader: &mut R, ) -> Result> { let mut buffer = MutableBuffer::::from_len_zeroed((length + 7) / 8); + + // read all first + // todo: move this allocation to an external buffer for re-use + let mut slice = vec![0u8; bytes]; + reader.read_exact(&mut slice)?; + match compression.codec() { CompressionType::LZ4_FRAME => { - // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; - compression::decompress_lz4(&slice[8..], &mut buffer)?; - Ok(buffer) } CompressionType::ZSTD => { - // decompress first - // todo: move this allocation to an external buffer for re-use - let mut slice = vec![0u8; bytes]; - reader.read_exact(&mut slice)?; - compression::decompress_zstd(&slice[8..], &mut buffer)?; - Ok(buffer) } _ => Err(ArrowError::NotYetImplemented( diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index ae5151e20cc..699a85ae2ad 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_format::ipc; use arrow_format::ipc::flatbuffers::FlatBufferBuilder; +use arrow_format::ipc::Message::CompressionType; use crate::array::Array; use crate::error::{ArrowError, Result}; @@ -31,8 +32,17 @@ use crate::{array::DictionaryArray, datatypes::*}; use super::super::CONTINUATION_MARKER; use super::{write, write_dictionary}; +/// Compression codec +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Compression { + /// LZ4 (framed) + LZ4, + /// ZSTD + ZSTD, +} + /// IPC write options used to control the behaviour of the writer -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct IpcWriteOptions { /// Write padding after memory buffers to this multiple of bytes. /// Generally 8 or 64, defaults to 8 @@ -48,6 +58,8 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 4.0.0: V5 metadata_version: ipc::Schema::MetadataVersion, + /// Whether the buffers should be compressed + compression: Option, } impl IpcWriteOptions { @@ -56,6 +68,7 @@ impl IpcWriteOptions { alignment: usize, write_legacy_ipc_format: bool, metadata_version: ipc::Schema::MetadataVersion, + compression: Option, ) -> Result { if alignment == 0 || alignment % 8 != 0 { return Err(ArrowError::InvalidArgumentError( @@ -72,6 +85,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + compression, }), ipc::Schema::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -83,6 +97,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + compression, }) } } @@ -101,6 +116,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::Schema::MetadataVersion::V5, + compression: None, } } } @@ -157,6 +173,7 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - &mut nodes, &mut offset, is_native_little_endian(), + write_options.compression, ) } @@ -164,11 +181,26 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let compression = if let Some(compression) = write_options.compression { + let compression = match compression { + Compression::LZ4 => CompressionType::LZ4_FRAME, + Compression::ZSTD => CompressionType::ZSTD, + }; + let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb); + compression_builder.add_codec(compression); + Some(compression_builder.finish()) + } else { + None + }; + let root = { let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(compression) = compression { + batch_builder.add_compression(compression) + } let b = batch_builder.finish(); b.as_union_value() }; @@ -209,6 +241,7 @@ fn dictionary_batch_to_bytes( &mut nodes, &mut 0, is_little_endian, + write_options.compression, false, ); @@ -216,11 +249,26 @@ fn dictionary_batch_to_bytes( let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let compression = if let Some(compression) = write_options.compression { + let compression = match compression { + Compression::LZ4 => CompressionType::LZ4_FRAME, + Compression::ZSTD => CompressionType::ZSTD, + }; + let mut compression_builder = ipc::Message::BodyCompressionBuilder::new(&mut fbb); + compression_builder.add_codec(compression); + Some(compression_builder.finish()) + } else { + None + }; + let root = { let mut batch_builder = ipc::Message::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(length as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(compression) = compression { + batch_builder.add_compression(compression) + } batch_builder.finish() }; @@ -358,18 +406,18 @@ pub fn write_message( } fn write_body_buffers(mut writer: W, data: &[u8]) -> Result { - let len = data.len() as u32; - let pad_len = pad_to_8(len) as u32; + let len = data.len(); + let pad_len = pad_to_8(data.len()); let total_len = len + pad_len; // write body buffer writer.write_all(data)?; if pad_len > 0 { - writer.write_all(&vec![0u8; pad_len as usize][..])?; + writer.write_all(&vec![0u8; pad_len][..])?; } writer.flush()?; - Ok(total_len as usize) + Ok(total_len) } /// Write a record batch to the writer, writing the message size before the message @@ -411,6 +459,6 @@ pub fn write_continuation( /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes #[inline] -pub(crate) fn pad_to_8(len: u32) -> usize { +pub(crate) fn pad_to_8(len: usize) -> usize { (((len + 7) & !7) - len) as usize } diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index 20b50a23178..4d5627959cd 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -6,7 +6,7 @@ mod stream; mod writer; pub use arrow_format::ipc::Schema::MetadataVersion; -pub use common::IpcWriteOptions; +pub use common::{Compression, IpcWriteOptions}; pub use schema::schema_to_bytes; pub use serialize::{write, write_dictionary}; pub use stream::StreamWriter; diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index 773971db5c8..6b1078e2224 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -1,20 +1,3 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - use arrow_format::ipc::{Message, Schema}; use crate::{ @@ -25,8 +8,9 @@ use crate::{ types::NativeType, }; +use super::super::compression; use super::super::endianess::is_native_little_endian; -use super::common::pad_to_8; +use super::common::{pad_to_8, Compression}; fn _write_primitive( array: &PrimitiveArray, @@ -34,8 +18,16 @@ fn _write_primitive( arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { - write_bitmap(array.validity(), array.len(), buffers, arrow_data, offset); + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); write_buffer( array.values(), @@ -43,6 +35,7 @@ fn _write_primitive( arrow_data, offset, is_little_endian, + compression, ) } @@ -52,9 +45,17 @@ fn write_primitive( arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::>().unwrap(); - _write_primitive(array, buffers, arrow_data, offset, is_little_endian); + _write_primitive( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); } fn write_boolean( @@ -63,19 +64,29 @@ fn write_boolean( arrow_data: &mut Vec, offset: &mut i64, _: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::().unwrap(); - write_bitmap(array.validity(), array.len(), buffers, arrow_data, offset); + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); write_bitmap( Some(&array.values().clone()), array.len(), buffers, arrow_data, offset, + compression, ); } +#[allow(clippy::too_many_arguments)] fn write_generic_binary( validity: Option<&Bitmap>, offsets: &[O], @@ -84,13 +95,28 @@ fn write_generic_binary( arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { - write_bitmap(validity, offsets.len() - 1, buffers, arrow_data, offset); + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); let first = *offsets.first().unwrap(); let last = *offsets.last().unwrap(); if first == O::default() { - write_buffer(offsets, buffers, arrow_data, offset, is_little_endian); + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); } else { write_buffer_from_iter( offsets.iter().map(|x| *x - first), @@ -98,15 +124,16 @@ fn write_generic_binary( arrow_data, offset, is_little_endian, + compression, ); } - write_buffer( + write_bytes( &values[first.to_usize()..last.to_usize()], buffers, arrow_data, offset, - is_little_endian, + compression, ); } @@ -116,6 +143,7 @@ fn write_binary( arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::>().unwrap(); write_generic_binary( @@ -126,6 +154,7 @@ fn write_binary( arrow_data, offset, is_little_endian, + compression, ); } @@ -135,6 +164,7 @@ fn write_utf8( arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::>().unwrap(); write_generic_binary( @@ -145,6 +175,7 @@ fn write_utf8( arrow_data, offset, is_little_endian, + compression, ); } @@ -153,20 +184,22 @@ fn write_fixed_size_binary( buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, - is_little_endian: bool, + _is_little_endian: bool, + compression: Option, ) { let array = array .as_any() .downcast_ref::() .unwrap(); - write_bitmap(array.validity(), array.len(), buffers, arrow_data, offset); - write_buffer( - array.values(), + write_bitmap( + array.validity(), + array.len(), buffers, arrow_data, offset, - is_little_endian, + compression, ); + write_bytes(array.values(), buffers, arrow_data, offset, compression); } fn write_list( @@ -176,17 +209,32 @@ fn write_list( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::>().unwrap(); let offsets = array.offsets(); let validity = array.validity(); - write_bitmap(validity, offsets.len() - 1, buffers, arrow_data, offset); + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); let first = *offsets.first().unwrap(); let last = *offsets.last().unwrap(); if first == O::default() { - write_buffer(offsets, buffers, arrow_data, offset, is_little_endian); + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); } else { write_buffer_from_iter( offsets.iter().map(|x| *x - first), @@ -194,6 +242,7 @@ fn write_list( arrow_data, offset, is_little_endian, + compression, ); } @@ -207,6 +256,7 @@ fn write_list( nodes, offset, is_little_endian, + compression, ); } @@ -217,9 +267,17 @@ pub fn write_struct( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::().unwrap(); - write_bitmap(array.validity(), array.len(), buffers, arrow_data, offset); + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); array.values().iter().for_each(|array| { write( array.as_ref(), @@ -228,6 +286,7 @@ pub fn write_struct( nodes, offset, is_little_endian, + compression, ); }); } @@ -239,13 +298,28 @@ pub fn write_union( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::().unwrap(); - write_buffer(array.types(), buffers, arrow_data, offset, is_little_endian); + write_buffer( + array.types(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); if let Some(offsets) = array.offsets() { - write_buffer(offsets, buffers, arrow_data, offset, is_little_endian); + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); } array.fields().iter().for_each(|array| { write( @@ -255,6 +329,7 @@ pub fn write_union( nodes, offset, is_little_endian, + compression, ) }); } @@ -266,17 +341,32 @@ fn write_map( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::().unwrap(); let offsets = array.offsets(); let validity = array.validity(); - write_bitmap(validity, offsets.len() - 1, buffers, arrow_data, offset); + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); let first = *offsets.first().unwrap(); let last = *offsets.last().unwrap(); if first == 0 { - write_buffer(offsets, buffers, arrow_data, offset, is_little_endian); + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); } else { write_buffer_from_iter( offsets.iter().map(|x| *x - first), @@ -284,6 +374,7 @@ fn write_map( arrow_data, offset, is_little_endian, + compression, ); } @@ -297,6 +388,7 @@ fn write_map( nodes, offset, is_little_endian, + compression, ); } @@ -307,9 +399,17 @@ fn write_fixed_size_list( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { let array = array.as_any().downcast_ref::().unwrap(); - write_bitmap(array.validity(), array.len(), buffers, arrow_data, offset); + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); write( array.values().as_ref(), buffers, @@ -317,10 +417,12 @@ fn write_fixed_size_list( nodes, offset, is_little_endian, + compression, ); } // use `write_keys` to either write keys or values +#[allow(clippy::too_many_arguments)] pub fn _write_dictionary( array: &dyn Array, buffers: &mut Vec, @@ -328,11 +430,19 @@ pub fn _write_dictionary( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, write_keys: bool, ) -> usize { let array = array.as_any().downcast_ref::>().unwrap(); if write_keys { - _write_primitive(array.keys(), buffers, arrow_data, offset, is_little_endian); + _write_primitive( + array.keys(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); array.keys().len() } else { write( @@ -342,11 +452,13 @@ pub fn _write_dictionary( nodes, offset, is_little_endian, + compression, ); array.values().len() } } +#[allow(clippy::too_many_arguments)] pub fn write_dictionary( array: &dyn Array, buffers: &mut Vec, @@ -354,6 +466,7 @@ pub fn write_dictionary( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, write_keys: bool, ) -> usize { match array.data_type() { @@ -366,6 +479,7 @@ pub fn write_dictionary( nodes, offset, is_little_endian, + compression, write_keys, ) }) @@ -381,6 +495,7 @@ pub fn write( nodes: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { nodes.push(Message::FieldNode::new( array.len() as i64, @@ -389,23 +504,93 @@ pub fn write( use PhysicalType::*; match array.data_type().to_physical_type() { Null => (), - Boolean => write_boolean(array, buffers, arrow_data, offset, is_little_endian), + Boolean => write_boolean( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { - write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian) + write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression) }), - Binary => write_binary::(array, buffers, arrow_data, offset, is_little_endian), - LargeBinary => write_binary::(array, buffers, arrow_data, offset, is_little_endian), - FixedSizeBinary => { - write_fixed_size_binary(array, buffers, arrow_data, offset, is_little_endian) - } - Utf8 => write_utf8::(array, buffers, arrow_data, offset, is_little_endian), - LargeUtf8 => write_utf8::(array, buffers, arrow_data, offset, is_little_endian), - List => write_list::(array, buffers, arrow_data, nodes, offset, is_little_endian), - LargeList => write_list::(array, buffers, arrow_data, nodes, offset, is_little_endian), - FixedSizeList => { - write_fixed_size_list(array, buffers, arrow_data, nodes, offset, is_little_endian) - } - Struct => write_struct(array, buffers, arrow_data, nodes, offset, is_little_endian), + Binary => write_binary::( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + LargeBinary => write_binary::( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + FixedSizeBinary => write_fixed_size_binary( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + Utf8 => write_utf8::( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + LargeUtf8 => write_utf8::( + array, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + List => write_list::( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ), + LargeList => write_list::( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ), + FixedSizeList => write_fixed_size_list( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ), + Struct => write_struct( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ), Dictionary(_) => { write_dictionary( array, @@ -414,50 +599,68 @@ pub fn write( nodes, offset, is_little_endian, + compression, true, ); } Union => { - write_union(array, buffers, arrow_data, nodes, offset, is_little_endian); + write_union( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); } Map => { - write_map(array, buffers, arrow_data, nodes, offset, is_little_endian); + write_map( + array, + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); } } } -/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. #[inline] -fn write_bytes( - bytes: &[u8], - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, -) { - let len = bytes.len(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(Schema::Buffer::new(*offset, total_len)); - arrow_data.extend_from_slice(bytes); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - *offset += total_len; +fn pad_buffer_to_8(buffer: &mut Vec, length: usize) { + let pad_len = pad_to_8(length); + buffer.extend_from_slice(&vec![0u8; pad_len]); } /// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. -fn write_bytes_from_iter>( - bytes: I, +fn write_bytes( + bytes: &[u8], buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, + compression: Option, ) { - let len = bytes.size_hint().0; - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); + let start = arrow_data.len(); + if let Some(compression) = compression { + arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes()); + match compression { + Compression::LZ4 => { + compression::compress_lz4(bytes, arrow_data).unwrap(); + } + Compression::ZSTD => { + compression::compress_zstd(bytes, arrow_data).unwrap(); + } + } + } else { + arrow_data.extend_from_slice(bytes); + }; + + pad_buffer_to_8(arrow_data, arrow_data.len() - start); + + let total_len = (arrow_data.len() - start) as i64; buffers.push(Schema::Buffer::new(*offset, total_len)); - arrow_data.extend(bytes); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); *offset += total_len; } @@ -467,6 +670,7 @@ fn write_bitmap( buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, + compression: Option, ) { match bitmap { Some(bitmap) => { @@ -476,23 +680,42 @@ fn write_bitmap( // case where we can't slice the bitmap as the offsets are not multiple of 8 let bytes = Bitmap::from_trusted_len_iter(bitmap.iter()); let (slice, _, _) = bytes.as_slice(); - write_bytes(slice, buffers, arrow_data, offset) + write_bytes(slice, buffers, arrow_data, offset, compression) } else { - write_bytes(slice, buffers, arrow_data, offset) + write_bytes(slice, buffers, arrow_data, offset, compression) } } None => { // in IPC, the null bitmap is always be present - write_bytes_from_iter( - std::iter::repeat(1).take(length.saturating_add(7) / 8), - buffers, - arrow_data, - offset, - ) + let slice = vec![0u8; length.saturating_add(7) / 8]; + write_bytes(&slice, buffers, arrow_data, offset, compression) } } } +/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. +fn write_buffer( + buffer: &[T], + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + let start = arrow_data.len(); + if let Some(compression) = compression { + _write_compressed_buffer(buffer, arrow_data, is_little_endian, compression); + } else { + _write_buffer(buffer, arrow_data, is_little_endian); + }; + + pad_buffer_to_8(arrow_data, arrow_data.len() - start); + + let total_len = (arrow_data.len() - start) as i64; + buffers.push(Schema::Buffer::new(*offset, total_len)); + *offset += total_len; +} + #[inline] fn _write_buffer_from_iter>( buffer: I, @@ -512,8 +735,38 @@ fn _write_buffer_from_iter>( } } +#[inline] +fn _write_compressed_buffer_from_iter>( + buffer: I, + arrow_data: &mut Vec, + is_little_endian: bool, + compression: Compression, +) { + let len = buffer.size_hint().0; + let mut swapped = Vec::with_capacity(len * std::mem::size_of::()); + if is_little_endian { + buffer + .map(|x| T::to_le_bytes(&x)) + .for_each(|x| swapped.extend_from_slice(x.as_ref())); + } else { + buffer + .map(|x| T::to_be_bytes(&x)) + .for_each(|x| swapped.extend_from_slice(x.as_ref())) + }; + arrow_data.extend_from_slice(&(swapped.len() as i64).to_le_bytes()); + match compression { + Compression::LZ4 => { + compression::compress_lz4(&swapped, arrow_data).unwrap(); + } + Compression::ZSTD => { + compression::compress_zstd(&swapped, arrow_data).unwrap(); + } + } +} + fn _write_buffer(buffer: &[T], arrow_data: &mut Vec, is_little_endian: bool) { if is_little_endian == is_native_little_endian() { + // in native endianess we can use the bytes directly. let buffer = unsafe { std::slice::from_raw_parts( buffer.as_ptr() as *const u8, @@ -526,42 +779,54 @@ fn _write_buffer(buffer: &[T], arrow_data: &mut Vec, is_littl } } -/// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. -fn write_buffer( +fn _write_compressed_buffer( buffer: &[T], - buffers: &mut Vec, arrow_data: &mut Vec, - offset: &mut i64, is_little_endian: bool, + compression: Compression, ) { - let len = buffer.len() * std::mem::size_of::(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(Schema::Buffer::new(*offset, total_len)); - - _write_buffer(buffer, arrow_data, is_little_endian); - - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - *offset += total_len; + if is_little_endian == is_native_little_endian() { + let bytes = unsafe { + std::slice::from_raw_parts( + buffer.as_ptr() as *const u8, + buffer.len() * std::mem::size_of::(), + ) + }; + arrow_data.extend_from_slice(&(bytes.len() as i64).to_le_bytes()); + match compression { + Compression::LZ4 => { + compression::compress_lz4(bytes, arrow_data).unwrap(); + } + Compression::ZSTD => { + compression::compress_zstd(bytes, arrow_data).unwrap(); + } + } + } else { + todo!() + } } /// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. +#[inline] fn write_buffer_from_iter>( buffer: I, buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, + compression: Option, ) { - let len = buffer.size_hint().0 * std::mem::size_of::(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(Schema::Buffer::new(*offset, total_len)); + let start = arrow_data.len(); - _write_buffer_from_iter(buffer, arrow_data, is_little_endian); + if let Some(compression) = compression { + _write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression); + } else { + _write_buffer_from_iter(buffer, arrow_data, is_little_endian); + } + + pad_buffer_to_8(arrow_data, arrow_data.len() - start); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); + let total_len = (arrow_data.len() - start) as i64; + buffers.push(Schema::Buffer::new(*offset, total_len)); *offset += total_len; } diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index b9061361ce4..132768b1f7c 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -13,7 +13,8 @@ fn round_trip(batch: RecordBatch) -> Result<()> { // write IPC version 5 let written_result = { - let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; + let options = + IpcWriteOptions::try_new(8, false, MetadataVersion::V5, Some(Compression::ZSTD))?; let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?; writer.write(&batch)?; writer.finish()?; @@ -36,14 +37,20 @@ fn round_trip(batch: RecordBatch) -> Result<()> { Ok(()) } -fn test_file(version: &str, file_name: &str) -> Result<()> { +fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { let (schema, batches) = read_gzip_json(version, file_name)?; let result = Vec::::new(); + let compression = if compressed { + Some(Compression::ZSTD) + } else { + None + }; + // write IPC version 5 let written_result = { - let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?; + let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5, compression)?; let mut writer = FileWriter::try_new_with_options(result, &schema, options)?; for batch in batches { writer.write(&batch)?; @@ -64,140 +71,207 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let batches = reader.collect::>>()?; - assert_eq!(batches, expected_batches); + for (a, b) in batches.iter().zip(expected_batches.iter()) { + for (c1, c2) in a.columns().iter().zip(b.columns().iter()) { + assert_eq!(c1, c2) + } + } + + //assert_eq!(batches, expected_batches); Ok(()) } #[test] fn write_100_primitive() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive")?; - test_file("1.0.0-bigendian", "generated_primitive") + test_file("1.0.0-littleendian", "generated_primitive", false)?; + test_file("1.0.0-bigendian", "generated_primitive", false)?; + test_file("1.0.0-littleendian", "generated_primitive", true)?; + test_file("1.0.0-bigendian", "generated_primitive", true) } #[test] fn write_100_datetime() -> Result<()> { - test_file("1.0.0-littleendian", "generated_datetime")?; - test_file("1.0.0-bigendian", "generated_datetime") + test_file("1.0.0-littleendian", "generated_datetime", false)?; + test_file("1.0.0-bigendian", "generated_datetime", false)?; + test_file("1.0.0-littleendian", "generated_datetime", true)?; + test_file("1.0.0-bigendian", "generated_datetime", true) } #[test] fn write_100_dictionary_unsigned() -> Result<()> { - test_file("1.0.0-littleendian", "generated_dictionary_unsigned")?; - test_file("1.0.0-bigendian", "generated_dictionary_unsigned") + test_file("1.0.0-littleendian", "generated_dictionary_unsigned", false)?; + test_file("1.0.0-bigendian", "generated_dictionary_unsigned", false)?; + test_file("1.0.0-littleendian", "generated_dictionary_unsigned", true)?; + test_file("1.0.0-bigendian", "generated_dictionary_unsigned", true) } #[test] fn write_100_dictionary() -> Result<()> { - test_file("1.0.0-littleendian", "generated_dictionary")?; - test_file("1.0.0-bigendian", "generated_dictionary") + test_file("1.0.0-littleendian", "generated_dictionary", false)?; + test_file("1.0.0-bigendian", "generated_dictionary", false)?; + test_file("1.0.0-littleendian", "generated_dictionary", true)?; + test_file("1.0.0-bigendian", "generated_dictionary", true) } #[test] fn write_100_interval() -> Result<()> { - test_file("1.0.0-littleendian", "generated_interval")?; - test_file("1.0.0-bigendian", "generated_interval") + test_file("1.0.0-littleendian", "generated_interval", false)?; + test_file("1.0.0-bigendian", "generated_interval", false)?; + test_file("1.0.0-littleendian", "generated_interval", true)?; + test_file("1.0.0-bigendian", "generated_interval", true) } #[test] fn write_100_large_batch() -> Result<()> { // this takes too long for unit-tests. It has been passing... - //test_file("1.0.0-littleendian", "generated_large_batch"); + //test_file("1.0.0-littleendian", "generated_large_batch", false); Ok(()) } #[test] fn write_100_nested() -> Result<()> { - test_file("1.0.0-littleendian", "generated_nested")?; - test_file("1.0.0-bigendian", "generated_nested") + test_file("1.0.0-littleendian", "generated_nested", false)?; + test_file("1.0.0-bigendian", "generated_nested", false)?; + test_file("1.0.0-littleendian", "generated_nested", true)?; + test_file("1.0.0-bigendian", "generated_nested", true) } #[test] fn write_100_nested_large_offsets() -> Result<()> { - test_file("1.0.0-littleendian", "generated_nested_large_offsets")?; - test_file("1.0.0-bigendian", "generated_nested_large_offsets") + test_file( + "1.0.0-littleendian", + "generated_nested_large_offsets", + false, + )?; + test_file("1.0.0-bigendian", "generated_nested_large_offsets", false)?; + test_file("1.0.0-littleendian", "generated_nested_large_offsets", true)?; + test_file("1.0.0-bigendian", "generated_nested_large_offsets", true) } #[test] fn write_100_null_trivial() -> Result<()> { - test_file("1.0.0-littleendian", "generated_null_trivial")?; - test_file("1.0.0-bigendian", "generated_null_trivial") + test_file("1.0.0-littleendian", "generated_null_trivial", false)?; + test_file("1.0.0-bigendian", "generated_null_trivial", false)?; + test_file("1.0.0-littleendian", "generated_null_trivial", true)?; + test_file("1.0.0-bigendian", "generated_null_trivial", true) } #[test] fn write_100_null() -> Result<()> { - test_file("1.0.0-littleendian", "generated_null")?; - test_file("1.0.0-bigendian", "generated_null") + test_file("1.0.0-littleendian", "generated_null", false)?; + test_file("1.0.0-bigendian", "generated_null", false)?; + test_file("1.0.0-littleendian", "generated_null", true)?; + test_file("1.0.0-bigendian", "generated_null", true) } #[test] fn write_100_primitive_large_offsets() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive_large_offsets")?; - test_file("1.0.0-bigendian", "generated_primitive_large_offsets") + test_file( + "1.0.0-littleendian", + "generated_primitive_large_offsets", + false, + )?; + test_file( + "1.0.0-bigendian", + "generated_primitive_large_offsets", + false, + )?; + test_file( + "1.0.0-littleendian", + "generated_primitive_large_offsets", + true, + )?; + test_file("1.0.0-bigendian", "generated_primitive_large_offsets", true) } #[test] fn write_100_primitive_no_batches() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive_no_batches")?; - test_file("1.0.0-bigendian", "generated_primitive_no_batches") + test_file( + "1.0.0-littleendian", + "generated_primitive_no_batches", + false, + )?; + test_file("1.0.0-bigendian", "generated_primitive_no_batches", false)?; + test_file("1.0.0-littleendian", "generated_primitive_no_batches", true)?; + test_file("1.0.0-bigendian", "generated_primitive_no_batches", true) } #[test] fn write_100_primitive_zerolength() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive_zerolength")?; - test_file("1.0.0-bigendian", "generated_primitive_zerolength") + test_file( + "1.0.0-littleendian", + "generated_primitive_zerolength", + false, + )?; + test_file("1.0.0-bigendian", "generated_primitive_zerolength", false)?; + test_file("1.0.0-littleendian", "generated_primitive_zerolength", true)?; + test_file("1.0.0-bigendian", "generated_primitive_zerolength", true) } #[test] fn write_0141_primitive_zerolength() -> Result<()> { - test_file("0.14.1", "generated_primitive_zerolength") + test_file("0.14.1", "generated_primitive_zerolength", false) } #[test] fn write_100_custom_metadata() -> Result<()> { - test_file("1.0.0-littleendian", "generated_custom_metadata")?; - test_file("1.0.0-bigendian", "generated_custom_metadata") + test_file("1.0.0-littleendian", "generated_custom_metadata", false)?; + test_file("1.0.0-bigendian", "generated_custom_metadata", false) } #[test] fn write_100_decimal() -> Result<()> { - test_file("1.0.0-littleendian", "generated_decimal")?; - test_file("1.0.0-bigendian", "generated_decimal") + test_file("1.0.0-littleendian", "generated_decimal", false)?; + test_file("1.0.0-bigendian", "generated_decimal", false) } #[test] fn write_100_extension() -> Result<()> { - test_file("1.0.0-littleendian", "generated_extension")?; - test_file("1.0.0-bigendian", "generated_extension") + test_file("1.0.0-littleendian", "generated_extension", false)?; + test_file("1.0.0-bigendian", "generated_extension", false) } #[test] fn write_100_union() -> Result<()> { - test_file("1.0.0-littleendian", "generated_union")?; - test_file("1.0.0-bigendian", "generated_union") + test_file("1.0.0-littleendian", "generated_union", false)?; + test_file("1.0.0-bigendian", "generated_union", false) } #[test] fn write_100_map() -> Result<()> { - test_file("1.0.0-littleendian", "generated_map")?; - test_file("1.0.0-bigendian", "generated_map") + test_file("1.0.0-littleendian", "generated_map", false)?; + test_file("1.0.0-bigendian", "generated_map", false) } #[test] fn write_100_map_non_canonical() -> Result<()> { - test_file("1.0.0-littleendian", "generated_map_non_canonical")?; - test_file("1.0.0-bigendian", "generated_map_non_canonical") + test_file("1.0.0-littleendian", "generated_map_non_canonical", false)?; + test_file("1.0.0-bigendian", "generated_map_non_canonical", false) } #[test] fn write_generated_017_union() -> Result<()> { - test_file("0.17.1", "generated_union") + test_file("0.17.1", "generated_union", false) +} + +#[test] +fn write_boolean() -> Result<()> { + use std::sync::Arc; + let array = Arc::new(BooleanArray::from([ + Some(true), + Some(false), + None, + Some(true), + ])) as Arc; + let batch = RecordBatch::try_from_iter(vec![("a", array)])?; + round_trip(batch) } #[test] fn write_sliced_utf8() -> Result<()> { use std::sync::Arc; let array = Arc::new(Utf8Array::::from_slice(["aa", "bb"]).slice(1, 1)) as Arc; - let batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", array)])?; round_trip(batch) } diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index 16a41530713..9da5587e692 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -15,7 +15,7 @@ fn test_file(version: &str, file_name: &str) { // write IPC version 5 { - let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(); + let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5, None).unwrap(); let mut writer = StreamWriter::try_new_with_options(&mut result, &schema, options).unwrap(); for batch in batches { writer.write(&batch).unwrap();