Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added compression to Avro write (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Dec 23, 2021
1 parent 349b276 commit 830bf5f
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "1", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

Expand Down Expand Up @@ -142,6 +143,7 @@ io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
# io_json: its dependencies + error handling
Expand Down
9 changes: 2 additions & 7 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ fn write_avro<W: std::io::Write>(
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block)?;
write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?;

write::write_metadata(file, avro_fields.clone(), compression)?;

Expand Down
19 changes: 15 additions & 4 deletions src/io/avro/read/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,24 @@ pub fn decompress_block(
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
let crc = &block[block.len() - 4..];
let block = &block[..block.len() - 4];

let len = snap::raw::decompress_len(block)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;
decompressed.clear();
decompressed.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompressed)
.map_err(|_| ArrowError::ExternalFormat("Failed to decompress snap".to_string()))?;
.decompress(block, decompressed)
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]);
let actual_crc = crc::crc32::checksum_ieee(decompressed);
if expected_crc != actual_crc {
return Err(ArrowError::ExternalFormat(
"The crc of snap-compressed block does not match".to_string(),
));
}
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Expand Down
17 changes: 2 additions & 15 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};
use crate::error::Result;

use super::super::{Block, CompressedBlock};
use super::super::CompressedBlock;
use super::{util::zigzag_encode, SYNC_NUMBER};

/// Writes a [`CompressedBlock`] to `writer`
Expand All @@ -17,16 +17,3 @@ pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock)

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
compressed_block.number_of_rows = block.number_of_rows;
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
}
60 changes: 60 additions & 0 deletions src/io/avro/write/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! APIs to read from Avro format to arrow.
use crate::error::Result;

use super::Compression;
use super::{Block, CompressedBlock};

/// Compresses a [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &mut Block,
compressed: &mut CompressedBlock,
compression: Option<Compression>,
) -> Result<bool> {
compressed.number_of_rows = block.number_of_rows;
let block = &mut block.data;
let compressed = &mut compressed.data;

match compression {
None => {
std::mem::swap(block, compressed);
Ok(true)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
use std::io::Write;
compressed.clear();
let mut encoder = libflate::deflate::Encoder::new(compressed);
encoder.write_all(block)?;
encoder.finish();
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
use snap::raw::{max_compress_len, Encoder};

compressed.clear();

let required_len = max_compress_len(block.len());
compressed.resize(required_len, 0);
let compressed_bytes = Encoder::new()
.compress(block, compressed)
.map_err(|e| crate::error::ArrowError::ExternalFormat(e.to_string()))?;
compressed.truncate(compressed_bytes);

let crc = crc::crc32::checksum_ieee(block);
compressed.extend(crc.to_be_bytes());
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with deflate but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(crate::error::ArrowError::InvalidArgumentError(
"Trying to compress Avro with snappy but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}
9 changes: 4 additions & 5 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};

use crate::error::Result;
Expand All @@ -15,7 +13,9 @@ mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod compress;
mod util;
pub use compress::compress;

pub use super::{Block, CompressedBlock};

Expand Down Expand Up @@ -52,7 +52,7 @@ pub fn write_metadata<W: std::io::Write>(
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) {
let Block {
data,
number_of_rows,
Expand All @@ -64,8 +64,7 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
data.extend(item_data);
}
}
Ok(())
}
19 changes: 12 additions & 7 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,11 @@ fn write_avro<R: AsRef<dyn Array>>(
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].as_ref().len(), vec![]);

write::serialize(&mut serializers, &mut block)?;
write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}
write::compress(&mut block, &mut compressed_block, compression)?;

let mut file = vec![];

Expand Down Expand Up @@ -93,3 +88,13 @@ fn roundtrip(compression: Option<write::Compression>) -> Result<()> {
fn no_compression() -> Result<()> {
roundtrip(None)
}

#[test]
fn snappy() -> Result<()> {
roundtrip(Some(write::Compression::Snappy))
}

#[test]
fn deflate() -> Result<()> {
roundtrip(Some(write::Compression::Deflate))
}

0 comments on commit 830bf5f

Please sign in to comment.