Skip to content

Commit

Permalink
feat: support block gzip for streams (#9175)
Browse files Browse the repository at this point in the history
* feat: support block gzip for streams

* test: add bgzip test

* style: remove unused imports
  • Loading branch information
tshauck authored Feb 14, 2024
1 parent 61e9605 commit 0c46d7f
Showing 1 changed file with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ impl FileCompressionType {
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
GZIP => {
let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
decoder.multiple_members(true);

ReaderStream::new(decoder)
.map_err(DataFusionError::from)
.boxed()
}
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
Expand Down Expand Up @@ -260,7 +265,9 @@ mod tests {
FileCompressionType, FileTypeExt,
};
use crate::error::DataFusionError;
use bytes::Bytes;
use datafusion_common::file_options::file_type::FileType;
use futures::StreamExt;
use std::str::FromStr;

#[test]
Expand Down Expand Up @@ -340,4 +347,45 @@ mod tests {
Err(DataFusionError::NotImplemented(_))
));
}

#[tokio::test]
async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
// As described in https://samtools.github.io/hts-specs/SAMv1.pdf ("The BGZF compression format")

// Ignore rust formatting so the byte array is easier to read
#[rustfmt::skip]
let data = [
// Block header
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00,
// Block 0, literal: 42
0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
// Block header
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00,
// Block 1, literal: 42
0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 0x03, 0x00, 0x00, 0x00,
// EOF
0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 0x00, 0x42, 0x43,
0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
];

// Create a byte stream
let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
Bytes::from(data.to_vec()),
)]);
let converted_stream =
FileCompressionType::GZIP.convert_stream(stream.boxed())?;

let vec = converted_stream
.map(|r| r.unwrap())
.collect::<Vec<Bytes>>()
.await;

let string_value = String::from_utf8_lossy(&vec[0]);

assert_eq!(string_value, "42\n42\n");

Ok(())
}
}

0 comments on commit 0c46d7f

Please sign in to comment.