diff --git a/Cargo.toml b/Cargo.toml index 986c30a0f9c..1e9c8d3cefa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ parquet2 = { version = "0.6", optional = true, default_features = false, feature avro-rs = { version = "0.13", optional = true, default_features = false } libflate = { version = "1.1.1", optional = true } +snap = { version = "1", optional = true } # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -88,6 +89,7 @@ tokio = { version = "1", features = ["macros", "rt", "fs"] } tokio-util = { version = "0.6", features = ["compat"] } # used to run formal property testing proptest = { version = "1", default_features = false, features = ["std"] } +avro-rs = { version = "0.13", features = ["snappy"] } [package.metadata.docs.rs] features = ["full"] @@ -108,6 +110,7 @@ full = [ "io_parquet", "io_parquet_compression", "io_avro", + "io_avro_compression", "regex", "merge_sort", "compute", @@ -132,7 +135,11 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"] +io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"] +io_avro_compression = [ + "libflate", + "snap", +] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] @@ -162,6 +169,7 @@ skip_feature_sets = [ ["io_csv_async"], ["io_csv_read_async"], ["io_avro"], + ["io_avro_compression"], ["io_json"], ["io_flight"], ["io_ipc"], diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index ffc5fb2b3bd..75ce3db6984 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -5,8 +5,8 @@ pub mod read; use crate::error::ArrowError; -impl From for ArrowError { - fn from(error: avro_rs::SerError) -> Self { +impl From for ArrowError { + fn from(error: avro_rs::Error) -> Self { ArrowError::External("".to_string(), Box::new(error)) } } diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index a0669507e13..de3ec6c36a0 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -3,9 +3,8 @@ use std::io::Read; use std::sync::Arc; -use avro_rs::{Codec, Schema as AvroSchema}; +use avro_rs::Schema as AvroSchema; use fallible_streaming_iterator::FallibleStreamingIterator; -use libflate::deflate::Decoder; mod deserialize; mod nested; @@ -16,10 +15,20 @@ use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -/// Reads the avro metadata from `reader` into a [`Schema`], [`Codec`] and magic marker. +/// Valid compressions +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Compression { + /// Deflate + Deflate, + /// Snappy + Snappy, +} + +/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. +#[allow(clippy::type_complexity)] pub fn read_metadata( reader: &mut R, -) -> Result<(Vec, Schema, Codec, [u8; 16])> { +) -> Result<(Vec, Schema, Option, [u8; 16])> { let (avro_schema, codec, marker) = util::read_schema(reader)?; let schema = schema::convert_schema(&avro_schema)?; @@ -70,18 +79,44 @@ fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) /// Decompresses an avro block. /// Returns whether the buffers where swapped. -fn decompress_block(block: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { +fn decompress_block( + block: &mut Vec, + decompress: &mut Vec, + codec: Option, +) -> Result { match codec { - Codec::Null => { + None => { std::mem::swap(block, decompress); Ok(true) } - Codec::Deflate => { + #[cfg(feature = "io_avro_compression")] + Some(Compression::Deflate) => { decompress.clear(); - let mut decoder = Decoder::new(&block[..]); + let mut decoder = libflate::deflate::Decoder::new(&block[..]); decoder.read_to_end(decompress)?; Ok(false) } + #[cfg(feature = "io_avro_compression")] + Some(Compression::Snappy) => { + let len = snap::raw::decompress_len(&block[..block.len() - 4]) + .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; + decompress.clear(); + decompress.resize(len, 0); + snap::raw::Decoder::new() + .decompress(&block[..block.len() - 4], decompress) + .map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?; + Ok(false) + } + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Deflate) => Err(ArrowError::Other( + "The avro file is deflate-encoded but feature 'io_avro_compression' is not active." + .to_string(), + )), + #[cfg(not(feature = "io_avro_compression"))] + Some(Compression::Snappy) => Err(ArrowError::Other( + "The avro file is snappy-encoded but feature 'io_avro_compression' is not active." + .to_string(), + )), } } @@ -130,14 +165,14 @@ impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> { /// [`StreamingIterator`] of blocks of decompressed avro data pub struct Decompressor<'a, R: Read> { blocks: BlockStreamIterator<'a, R>, - codec: Codec, + codec: Option, buf: (Vec, usize), was_swapped: bool, } impl<'a, R: Read> Decompressor<'a, R> { /// Creates a new [`Decompressor`]. - pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Codec) -> Self { + pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Option) -> Self { Self { blocks, codec, diff --git a/src/io/avro/read/util.rs b/src/io/avro/read/util.rs index 19934d4f3b2..a47cb6617bc 100644 --- a/src/io/avro/read/util.rs +++ b/src/io/avro/read/util.rs @@ -1,10 +1,11 @@ use std::io::Read; -use std::str::FromStr; + +use avro_rs::{from_avro_datum, types::Value, AvroResult, Error, Schema}; +use serde_json::from_slice; use crate::error::Result; -use avro_rs::{from_avro_datum, types::Value, AvroResult, Codec, Error, Schema}; -use serde_json::from_slice; +use super::Compression; pub fn zigzag_i64(reader: &mut R) -> Result { let z = decode_variable(reader)?; @@ -43,10 +44,10 @@ fn read_file_marker(reader: &mut R) -> AvroResult<[u8; 16]> { Ok(marker) } -/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Codec`]. +/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Compression`]. /// # Error /// This function errors iff the header is not a valid avro file header. -pub fn read_schema(reader: &mut R) -> AvroResult<(Schema, Codec, [u8; 16])> { +pub fn read_schema(reader: &mut R) -> AvroResult<(Schema, Option, [u8; 16])> { let meta_schema = Schema::Map(Box::new(Schema::Bytes)); let mut buf = [0u8; 4]; @@ -70,21 +71,19 @@ pub fn read_schema(reader: &mut R) -> AvroResult<(Schema, Codec, [u8; 1 .ok_or(Error::GetAvroSchemaFromMap)?; let schema = Schema::parse(&json)?; - let codec = if let Some(codec) = meta - .get("avro.codec") - .and_then(|codec| { - if let Value::Bytes(ref bytes) = *codec { - simdutf8::basic::from_utf8(bytes.as_ref()).ok() - } else { - None + let codec = meta.get("avro.codec").and_then(|codec| { + if let Value::Bytes(bytes) = codec { + let bytes: &[u8] = bytes.as_ref(); + match bytes { + b"snappy" => Some(Compression::Snappy), + b"deflate" => Some(Compression::Deflate), + _ => None, } - }) - .and_then(|codec| Codec::from_str(codec).ok()) - { - codec - } else { - Codec::Null - }; + } else { + None + } + }); + let marker = read_file_marker(reader)?; Ok((schema, codec, marker)) diff --git a/tests/it/io/avro/read/mod.rs b/tests/it/io/avro/read/mod.rs index a17fb06387d..a6387b27be6 100644 --- a/tests/it/io/avro/read/mod.rs +++ b/tests/it/io/avro/read/mod.rs @@ -82,15 +82,10 @@ fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -fn write(has_codec: bool) -> Result<(Vec, RecordBatch)> { +fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { let (avro, schema) = schema(); // a writer needs a schema and something to write to - let mut writer: Writer>; - if has_codec { - writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate); - } else { - writer = Writer::new(&avro, Vec::new()); - } + let mut writer = Writer::with_codec(&avro, Vec::new(), codec); // the Record type models our Record schema let mut record = Record::new(writer.schema()).unwrap(); @@ -174,9 +169,8 @@ fn write(has_codec: bool) -> Result<(Vec, RecordBatch)> { Ok((writer.into_inner().unwrap(), expected)) } -#[test] -fn read_without_codec() -> Result<()> { - let (data, expected) = write(false).unwrap(); +fn test(codec: Codec) -> Result<()> { + let (data, expected) = write(codec).unwrap(); let file = &mut &data[..]; @@ -193,19 +187,16 @@ fn read_without_codec() -> Result<()> { } #[test] -fn read_with_codec() -> Result<()> { - let (data, expected) = write(true).unwrap(); - - let file = &mut &data[..]; - - let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; +fn read_without_codec() -> Result<()> { + test(Codec::Null) +} - let mut reader = read::Reader::new( - read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), - avro_schema, - Arc::new(schema), - ); +#[test] +fn read_deflate() -> Result<()> { + test(Codec::Deflate) +} - assert_eq!(reader.next().unwrap().unwrap(), expected); - Ok(()) +#[test] +fn read_snappy() -> Result<()> { + test(Codec::Snappy) }