Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add support to read compressed avro (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Oct 15, 2021
1 parent 0b37568 commit 47a632c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ parquet2 = { version = "0.5.2", optional = true, default_features = false, featu

avro-rs = { version = "0.13", optional = true, default_features = false }

libflate = { version = "1.1.1", optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }

Expand Down Expand Up @@ -114,7 +116,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json"]
io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down
6 changes: 5 additions & 1 deletion src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io::Read;
use std::sync::Arc;

use avro_rs::{Codec, Schema as AvroSchema};
use libflate::deflate::Decoder;
use streaming_iterator::StreamingIterator;

mod deserialize;
Expand Down Expand Up @@ -74,7 +75,10 @@ fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -
Ok(false)
}
Codec::Deflate => {
todo!()
decompress.clear();
let mut decoder = Decoder::new(&buf[..]);
decoder.read_to_end(decompress)?;
Ok(true)
}
}
}
Expand Down
33 changes: 28 additions & 5 deletions tests/it/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use arrow2::types::months_days_ns;
use avro_rs::types::{Record, Value};
use avro_rs::Writer;
use avro_rs::{Codec, Writer};
use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema};

use arrow2::array::*;
Expand Down Expand Up @@ -82,10 +82,15 @@ fn schema() -> (AvroSchema, Schema) {
(AvroSchema::parse_str(raw_schema).unwrap(), schema)
}

fn write() -> Result<(Vec<u8>, RecordBatch)> {
fn write(has_codec: bool) -> Result<(Vec<u8>, RecordBatch)> {
let (avro, schema) = schema();
// a writer needs a schema and something to write to
let mut writer = Writer::new(&avro, Vec::new());
let mut writer: Writer<Vec<u8>>;
if has_codec {
writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate);
} else {
writer = Writer::new(&avro, Vec::new());
}

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
Expand Down Expand Up @@ -170,8 +175,26 @@ fn write() -> Result<(Vec<u8>, RecordBatch)> {
}

#[test]
fn read() -> Result<()> {
let (data, expected) = write().unwrap();
fn read_without_codec() -> Result<()> {
let (data, expected) = write(false).unwrap();

let file = &mut &data[..];

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;

let mut reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
Arc::new(schema),
);

assert_eq!(reader.next().unwrap().unwrap(), expected);
Ok(())
}

#[test]
fn read_with_codec() -> Result<()> {
let (data, expected) = write(true).unwrap();

let file = &mut &data[..];

Expand Down

0 comments on commit 47a632c

Please sign in to comment.