Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read snappy-compressed Avro #612

Merged
merged 1 commit into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ parquet2 = { version = "0.6", optional = true, default_features = false, feature
avro-rs = { version = "0.13", optional = true, default_features = false }

libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }
Expand All @@ -88,6 +89,7 @@ tokio = { version = "1", features = ["macros", "rt", "fs"] }
tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
avro-rs = { version = "0.13", features = ["snappy"] }

[package.metadata.docs.rs]
features = ["full"]
Expand All @@ -108,6 +110,7 @@ full = [
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"regex",
"merge_sort",
"compute",
Expand All @@ -132,7 +135,11 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -162,6 +169,7 @@ skip_feature_sets = [
["io_csv_async"],
["io_csv_read_async"],
["io_avro"],
["io_avro_compression"],
["io_json"],
["io_flight"],
["io_ipc"],
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub mod read;

use crate::error::ArrowError;

impl From<avro_rs::SerError> for ArrowError {
fn from(error: avro_rs::SerError) -> Self {
impl From<avro_rs::Error> for ArrowError {
fn from(error: avro_rs::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}
55 changes: 45 additions & 10 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
use std::io::Read;
use std::sync::Arc;

use avro_rs::{Codec, Schema as AvroSchema};
use avro_rs::Schema as AvroSchema;
use fallible_streaming_iterator::FallibleStreamingIterator;
use libflate::deflate::Decoder;

mod deserialize;
mod nested;
Expand All @@ -16,10 +15,20 @@ use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// Reads the avro metadata from `reader` into a [`Schema`], [`Codec`] and magic marker.
/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}

/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Codec, [u8; 16])> {
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;

Expand Down Expand Up @@ -70,18 +79,44 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])

/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(block: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
fn decompress_block(
block: &mut Vec<u8>,
decompress: &mut Vec<u8>,
codec: Option<Compression>,
) -> Result<bool> {
match codec {
Codec::Null => {
None => {
std::mem::swap(block, decompress);
Ok(true)
}
Codec::Deflate => {
#[cfg(feature = "io_avro_compression")]
Some(Compression::Deflate) => {
decompress.clear();
let mut decoder = Decoder::new(&block[..]);
let mut decoder = libflate::deflate::Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(false)
}
#[cfg(feature = "io_avro_compression")]
Some(Compression::Snappy) => {
let len = snap::raw::decompress_len(&block[..block.len() - 4])
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
decompress.clear();
decompress.resize(len, 0);
snap::raw::Decoder::new()
.decompress(&block[..block.len() - 4], decompress)
.map_err(|_| ArrowError::Other("Failed to decompress snap".to_string()))?;
Ok(false)
}
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Deflate) => Err(ArrowError::Other(
"The avro file is deflate-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
#[cfg(not(feature = "io_avro_compression"))]
Some(Compression::Snappy) => Err(ArrowError::Other(
"The avro file is snappy-encoded but feature 'io_avro_compression' is not active."
.to_string(),
)),
}
}

Expand Down Expand Up @@ -130,14 +165,14 @@ impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> {
/// [`StreamingIterator`] of blocks of decompressed avro data
pub struct Decompressor<'a, R: Read> {
blocks: BlockStreamIterator<'a, R>,
codec: Codec,
codec: Option<Compression>,
buf: (Vec<u8>, usize),
was_swapped: bool,
}

impl<'a, R: Read> Decompressor<'a, R> {
/// Creates a new [`Decompressor`].
pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Codec) -> Self {
pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Option<Compression>) -> Self {
Self {
blocks,
codec,
Expand Down
37 changes: 18 additions & 19 deletions src/io/avro/read/util.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::io::Read;
use std::str::FromStr;

use avro_rs::{from_avro_datum, types::Value, AvroResult, Error, Schema};
use serde_json::from_slice;

use crate::error::Result;

use avro_rs::{from_avro_datum, types::Value, AvroResult, Codec, Error, Schema};
use serde_json::from_slice;
use super::Compression;

pub fn zigzag_i64<R: Read>(reader: &mut R) -> Result<i64> {
let z = decode_variable(reader)?;
Expand Down Expand Up @@ -43,10 +44,10 @@ fn read_file_marker<R: std::io::Read>(reader: &mut R) -> AvroResult<[u8; 16]> {
Ok(marker)
}

/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Codec`].
/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Compression`].
/// # Error
/// This function errors iff the header is not a valid avro file header.
pub fn read_schema<R: Read>(reader: &mut R) -> AvroResult<(Schema, Codec, [u8; 16])> {
pub fn read_schema<R: Read>(reader: &mut R) -> AvroResult<(Schema, Option<Compression>, [u8; 16])> {
let meta_schema = Schema::Map(Box::new(Schema::Bytes));

let mut buf = [0u8; 4];
Expand All @@ -70,21 +71,19 @@ pub fn read_schema<R: Read>(reader: &mut R) -> AvroResult<(Schema, Codec, [u8; 1
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;

let codec = if let Some(codec) = meta
.get("avro.codec")
.and_then(|codec| {
if let Value::Bytes(ref bytes) = *codec {
simdutf8::basic::from_utf8(bytes.as_ref()).ok()
} else {
None
let codec = meta.get("avro.codec").and_then(|codec| {
if let Value::Bytes(bytes) = codec {
let bytes: &[u8] = bytes.as_ref();
match bytes {
b"snappy" => Some(Compression::Snappy),
b"deflate" => Some(Compression::Deflate),
_ => None,
}
})
.and_then(|codec| Codec::from_str(codec).ok())
{
codec
} else {
Codec::Null
};
} else {
None
}
});

let marker = read_file_marker(reader)?;

Ok((schema, codec, marker))
Expand Down
37 changes: 14 additions & 23 deletions tests/it/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,10 @@ fn schema() -> (AvroSchema, Schema) {
(AvroSchema::parse_str(raw_schema).unwrap(), schema)
}

fn write(has_codec: bool) -> Result<(Vec<u8>, RecordBatch)> {
fn write(codec: Codec) -> Result<(Vec<u8>, RecordBatch)> {
let (avro, schema) = schema();
// a writer needs a schema and something to write to
let mut writer: Writer<Vec<u8>>;
if has_codec {
writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate);
} else {
writer = Writer::new(&avro, Vec::new());
}
let mut writer = Writer::with_codec(&avro, Vec::new(), codec);

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
Expand Down Expand Up @@ -174,9 +169,8 @@ fn write(has_codec: bool) -> Result<(Vec<u8>, RecordBatch)> {
Ok((writer.into_inner().unwrap(), expected))
}

#[test]
fn read_without_codec() -> Result<()> {
let (data, expected) = write(false).unwrap();
fn test(codec: Codec) -> Result<()> {
let (data, expected) = write(codec).unwrap();

let file = &mut &data[..];

Expand All @@ -193,19 +187,16 @@ fn read_without_codec() -> Result<()> {
}

#[test]
fn read_with_codec() -> Result<()> {
let (data, expected) = write(true).unwrap();

let file = &mut &data[..];

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;
fn read_without_codec() -> Result<()> {
test(Codec::Null)
}

let mut reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
);
#[test]
fn read_deflate() -> Result<()> {
test(Codec::Deflate)
}

assert_eq!(reader.next().unwrap().unwrap(), expected);
Ok(())
#[test]
fn read_snappy() -> Result<()> {
test(Codec::Snappy)
}