diff --git a/Cargo.toml b/Cargo.toml index f85f01a0afd..8014b6f4279 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ avro-schema = { version = "0.2", optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } +crc = { version = "1", optional = true } # async avro async-stream = { version = "0.3.2", optional = true } @@ -142,6 +143,7 @@ io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", " io_avro_compression = [ "libflate", "snap", + "crc", ] io_avro_async = ["io_avro", "futures", "async-stream"] # io_json: its dependencies + error handling diff --git a/examples/avro_write.rs b/examples/avro_write.rs index 6d970bd9e01..c2a57581fed 100644 --- a/examples/avro_write.rs +++ b/examples/avro_write.rs @@ -22,16 +22,11 @@ fn write_avro( .collect::>(); let mut block = write::Block::new(arrays[0].len(), vec![]); - write::serialize(&mut serializers, &mut block)?; + write::serialize(&mut serializers, &mut block); let mut compressed_block = write::CompressedBlock::default(); - if let Some(compression) = compression { - write::compress(&block, &mut compressed_block, compression)?; - } else { - compressed_block.number_of_rows = block.number_of_rows; - std::mem::swap(&mut compressed_block.data, &mut block.data); - } + let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?; write::write_metadata(file, avro_fields.clone(), compression)?; diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs index 243f815221f..03fb6261bd6 100644 --- a/src/io/avro/read/decompress.rs +++ b/src/io/avro/read/decompress.rs @@ -34,13 +34,24 @@ pub fn decompress_block( } #[cfg(feature = "io_avro_compression")] Some(Compression::Snappy) => { - let len = snap::raw::decompress_len(&block[..block.len() - 4]) - .map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?; + let crc = &block[block.len() - 4..]; + let block = &block[..block.len() - 4]; + + let len = snap::raw::decompress_len(block) + .map_err(|e| ArrowError::ExternalFormat(e.to_string()))?; decompressed.clear(); decompressed.resize(len, 0); snap::raw::Decoder::new() - .decompress(&block[..block.len() - 4], decompressed) - .map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?; + .decompress(block, decompressed) + .map_err(|e| ArrowError::ExternalFormat(e.to_string()))?; + + let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]); + let actual_crc = crc::crc32::checksum_ieee(decompressed); + if expected_crc != actual_crc { + return Err(ArrowError::ExternalFormat( + "The crc of snap-compressed block does not match".to_string(), + )); + } Ok(false) } #[cfg(not(feature = "io_avro_compression"))] diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs index 5729097d7b3..0758c95c4ac 100644 --- a/src/io/avro/write/block.rs +++ b/src/io/avro/write/block.rs @@ -1,8 +1,8 @@ use std::io::Write; -use crate::{error::Result, io::avro::Compression}; +use crate::error::Result; -use super::super::{Block, CompressedBlock}; +use super::super::CompressedBlock; use super::{util::zigzag_encode, SYNC_NUMBER}; /// Writes a [`CompressedBlock`] to `writer` @@ -17,16 +17,3 @@ pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) Ok(()) } - -/// Compresses an [`Block`] to a [`CompressedBlock`]. -pub fn compress( - block: &Block, - compressed_block: &mut CompressedBlock, - compression: Compression, -) -> Result<()> { - compressed_block.number_of_rows = block.number_of_rows; - match compression { - Compression::Deflate => todo!(), - Compression::Snappy => todo!(), - } -} diff --git a/src/io/avro/write/compress.rs b/src/io/avro/write/compress.rs new file mode 100644 index 00000000000..1e35311ef81 --- /dev/null +++ b/src/io/avro/write/compress.rs @@ -0,0 +1,60 @@ +//! APIs to read from Avro format to arrow. + +use crate::error::Result; + +use super::Compression; +use super::{Block, CompressedBlock}; + +/// Compresses a [`Block`] to a [`CompressedBlock`]. +pub fn compress( + block: &mut Block, + compressed: &mut CompressedBlock, + compression: Option, +) -> Result { + compressed.number_of_rows = block.number_of_rows; + let block = &mut block.data; + let compressed = &mut compressed.data; + + match compression { + None => { + std::mem::swap(block, compressed); + Ok(true) + } + #[cfg(feature = "io_avro_compression")] + Some(Compression::Deflate) => { + use std::io::Write; + compressed.clear(); + let mut encoder = libflate::deflate::Encoder::new(compressed); + encoder.write_all(block)?; + encoder.finish(); + Ok(false) + } + #[cfg(feature = "io_avro_compression")] + Some(Compression::Snappy) => { + use snap::raw::{max_compress_len, Encoder}; + + compressed.clear(); + + let required_len = max_compress_len(block.len()); + compressed.resize(required_len, 0); + let compressed_bytes = Encoder::new() + .compress(block, compressed) + .map_err(|e| crate::error::ArrowError::ExternalFormat(e.to_string()))?; + compressed.truncate(compressed_bytes); + + let crc = crc::crc32::checksum_ieee(block); + compressed.extend(crc.to_be_bytes()); + Ok(false) + } + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Deflate) => Err(crate::error::ArrowError::InvalidArgumentError( + "Trying to compress Avro with deflate but feature 'io_avro_compression' is not active." + .to_string(), + )), + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Snappy) => Err(crate::error::ArrowError::InvalidArgumentError( + "Trying to compress Avro with snappy but feature 'io_avro_compression' is not active." + .to_string(), + )), + } +} diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs index c080498cfea..cfb227d449e 100644 --- a/src/io/avro/write/mod.rs +++ b/src/io/avro/write/mod.rs @@ -1,6 +1,4 @@ //! APIs to write to Avro format. -use std::io::Write; - use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; use crate::error::Result; @@ -15,7 +13,9 @@ mod serialize; pub use serialize::{can_serialize, new_serializer, BoxSerializer}; mod block; pub use block::*; +mod compress; mod util; +pub use compress::compress; pub use super::{Block, CompressedBlock}; @@ -52,7 +52,7 @@ pub fn write_metadata( /// # Panics /// Panics iff the number of items in any of the serializers is not equal to the number of rows /// declared in the `block`. -pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> { +pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) { let Block { data, number_of_rows, @@ -64,8 +64,7 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) - for _ in 0..*number_of_rows { for serializer in &mut *serializers { let item_data = serializer.next().unwrap(); - data.write_all(item_data)?; + data.extend(item_data); } } - Ok(()) } diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index b4e026c9c90..6e605828dab 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -51,16 +51,11 @@ fn write_avro>( .collect::>(); let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]); - write::serialize(&mut serializers, &mut block)?; + write::serialize(&mut serializers, &mut block); let mut compressed_block = write::CompressedBlock::default(); - if let Some(compression) = compression { - write::compress(&block, &mut compressed_block, compression)?; - } else { - compressed_block.number_of_rows = block.number_of_rows; - std::mem::swap(&mut compressed_block.data, &mut block.data); - } + write::compress(&mut block, &mut compressed_block, compression)?; let mut file = vec![]; @@ -93,3 +88,13 @@ fn roundtrip(compression: Option) -> Result<()> { fn no_compression() -> Result<()> { roundtrip(None) } + +#[test] +fn snappy() -> Result<()> { + roundtrip(Some(write::Compression::Snappy)) +} + +#[test] +fn deflate() -> Result<()> { + roundtrip(Some(write::Compression::Deflate)) +}