Skip to content

Commit

Permalink
Merge pull request #285 from quantatic/main
Browse files Browse the repository at this point in the history
Enable customizing Zstd decoding parameters.
  • Loading branch information
NobodyXu authored Jul 21, 2024
2 parents 7c2d530 + c0c6559 commit 9967f2e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 1 deletion.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bzip2 = { version = "0.4.4", optional = true }
flate2 = { version = "1.0.13", optional = true }
futures-core = { version = "0.3", default-features = false }
futures-io = { version = "0.3", default-features = false, features = ["std"], optional = true }
libzstd = { package = "zstd", version = "0.13", optional = true, default-features = false }
libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false }
memchr = "2"
pin-project-lite = "0.2"
tokio = { version = "1.24.2", optional = true, default-features = false }
Expand Down Expand Up @@ -92,6 +92,10 @@ required-features = ["zstd"]
name = "zstd-dict"
required-features = ["zstd", "tokio"]

[[test]]
name = "zstd-window-size"
required-features = ["zstd", "tokio"]

[[example]]
name = "zlib_tokio_write"
required-features = ["zlib", "tokio"]
Expand Down
10 changes: 10 additions & 0 deletions src/codec/zstd/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ impl ZstdDecoder {
}
}

pub(crate) fn new_with_params(params: &[crate::zstd::DParameter]) -> Self {
let mut decoder = Decoder::new().unwrap();
for param in params {
decoder.set_parameter(param.as_zstd()).unwrap();
}
Self {
decoder: Unshared::new(decoder),
}
}

pub(crate) fn new_with_dict(dictionary: &[u8]) -> io::Result<Self> {
let mut decoder = Decoder::with_dictionary(dictionary)?;
Ok(Self {
Expand Down
11 changes: 11 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ macro_rules! algos {
}
}
{ @dec
/// Creates a new decoder, using the specified parameters, which will read compressed
/// data from the given stream and emit a decompressed stream.
pub fn with_params(inner: $inner, params: &[crate::zstd::DParameter]) -> Self {
Self {
inner: crate::$($mod::)+generic::Decoder::new(
inner,
crate::codec::ZstdDecoder::new_with_params(params),
),
}
}

/// Creates a new decoder, using the specified compression level and pre-trained
/// dictionary, which will read compressed data from the given stream and emit an
/// uncompressed stream.
Expand Down
20 changes: 20 additions & 0 deletions src/zstd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module contains zstd-specific types for async-compression.
use libzstd::stream::raw::CParameter::*;
use libzstd::stream::raw::DParameter::*;

/// A compression parameter for zstd. This is a stable wrapper around zstd's own `CParameter`
/// type, to abstract over different versions of the zstd library.
Expand Down Expand Up @@ -110,3 +111,22 @@ impl CParameter {
self.0
}
}

/// A decompression parameter for zstd. This is a stable wrapper around zstd's own `DParameter`
/// type, to abstract over different versions of the zstd library.
///
/// See the [zstd documentation](https://facebook.github.io/zstd/zstd_manual.html) for more
/// information on these parameters.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct DParameter(libzstd::stream::raw::DParameter);

impl DParameter {
/// Maximum window size in bytes (as a power of two)
pub fn window_log_max(value: u32) -> Self {
Self(WindowLogMax(value))
}

pub(crate) fn as_zstd(&self) -> libzstd::stream::raw::DParameter {
self.0
}
}
Binary file added tests/artifacts/long-window-size-lib.rs.zst
Binary file not shown.
45 changes: 45 additions & 0 deletions tests/zstd-window-size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#![cfg(not(windows))]

use async_compression::zstd::DParameter;
use tokio::io::AsyncWriteExt as _;

#[tokio::test]
async fn zstd_decode_large_window_size_default() {
let compressed = include_bytes!("./artifacts/long-window-size-lib.rs.zst");

// Default decoder should throw with an error, window size maximum is too low.
let mut decoder = async_compression::tokio::write::ZstdDecoder::new(Vec::new());
decoder.write_all(compressed).await.unwrap_err();
}

#[tokio::test]
async fn zstd_decode_large_window_size_explicit_small_window_size() {
let compressed = include_bytes!("./artifacts/long-window-size-lib.rs.zst");

// Short window decoder should throw with an error, window size maximum is too low.
let mut decoder = async_compression::tokio::write::ZstdDecoder::with_params(
Vec::new(),
&[DParameter::window_log_max(16)],
);
decoder.write_all(compressed).await.unwrap_err();
}

#[tokio::test]
async fn zstd_decode_large_window_size_explicit_large_window_size() {
let compressed = include_bytes!("./artifacts/long-window-size-lib.rs.zst");
let source = include_bytes!("./artifacts/lib.rs");

// Long window decoder should succeed as the window size is large enough to decompress the given input.
let mut long_window_size_decoder = async_compression::tokio::write::ZstdDecoder::with_params(
Vec::new(),
&[DParameter::window_log_max(31)],
);
// Long window size decoder should successfully decode the given input data.
long_window_size_decoder
.write_all(compressed)
.await
.unwrap();
long_window_size_decoder.shutdown().await.unwrap();

assert_eq!(long_window_size_decoder.into_inner(), source);
}

0 comments on commit 9967f2e

Please sign in to comment.