Skip to content

Commit

Permalink
Refactor block module, rename to bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
gyscos committed Jan 14, 2022
1 parent d682288 commit 7e05cc1
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 130 deletions.
70 changes: 0 additions & 70 deletions src/block/compressor.rs

This file was deleted.

161 changes: 161 additions & 0 deletions src/bulk/compressor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use crate::map_error_code;

use std::io;
use zstd_safe;

/// Allows to compress independently multiple chunks of data.
///
/// Each job will be processed entirely in-memory without streaming, so this
/// is most fitting for many small jobs. To compress larger volume that don't
/// easily fit in memory, a streaming compression may be more appropriate.
///
/// It is more efficient than a streaming compressor for 2 reasons:
/// * It re-uses the zstd context between jobs to avoid re-allocations
/// * It avoids copying data from a `Read` into a temporary buffer before compression.
#[derive(Default)]
pub struct Compressor<'a> {
context: zstd_safe::CCtx<'a>,
}

impl Compressor<'static> {
/// Creates a new zstd compressor
pub fn new(level: i32) -> io::Result<Self> {
Self::with_dictionary(level, &[])
}

/// Creates a new zstd compressor, using the given dictionary.
///
/// Note that using a dictionary means that decompression will need to use
/// the same dictionary.
pub fn with_dictionary(level: i32, dictionary: &[u8]) -> io::Result<Self> {
let mut compressor = Self::default();

compressor.set_dictionary(level, dictionary)?;

Ok(compressor)
}
}

impl<'a> Compressor<'a> {
/// Creates a new compressor using an existing `EncoderDictionary`.
///
/// The compression level will be the one specified when creating the dictionary.
///
/// Note that using a dictionary means that decompression will need to use
/// the same dictionary.
pub fn with_prepared_dictionary<'b>(
dictionary: &crate::dict::EncoderDictionary<'b>,
) -> io::Result<Self>
where
'b: 'a,
{
let mut compressor = Self::default();

compressor.set_prepared_dictionary(dictionary)?;

Ok(compressor)
}

/// Changes the compression level used by this compressor.
///
/// *This will clear any dictionary previously registered.*
///
/// If you want to keep the existing dictionary, you will need to pass it again to
/// `Self::set_dictionary` instead of using this method.
pub fn set_compression_level(&mut self, level: i32) -> io::Result<()> {
self.set_dictionary(level, &[])
}

/// Changes the dictionary and compression level used by this compressor.
///
/// Will affect future compression jobs.
///
/// Note that using a dictionary means that decompression will need to use
/// the same dictionary.
pub fn set_dictionary(
&mut self,
level: i32,
dictionary: &[u8],
) -> io::Result<()> {
self.context
.set_parameter(zstd_safe::CParameter::CompressionLevel(level))
.map_err(map_error_code)?;

self.context
.load_dictionary(dictionary)
.map_err(map_error_code)?;

Ok(())
}

/// Changes the dictionary used by this compressor.
///
/// The compression level used when preparing the dictionary will be used.
///
/// Note that using a dictionary means that decompression will need to use
/// the same dictionary.
pub fn set_prepared_dictionary(
&mut self,
dictionary: &crate::dict::EncoderDictionary<'a>,
) -> io::Result<()> {
self.context
.ref_cdict(dictionary.as_cdict())
.map_err(map_error_code)?;

Ok(())
}

/// Compress a single block of data to the given destination buffer.
///
/// Returns the number of bytes written, or an error if something happened
/// (for instance if the destination buffer was too small).
///
/// A level of `0` uses zstd's default (currently `3`).
pub fn compress_to_buffer<C: zstd_safe::WriteBuf + ?Sized>(
&mut self,
source: &[u8],
destination: &mut C,
) -> io::Result<usize> {
self.context
.compress2(destination, source)
.map_err(map_error_code)
}

/// Compresses a block of data and returns the compressed result.
///
/// A level of `0` uses zstd's default (currently `3`).
pub fn compress(&mut self, data: &[u8]) -> io::Result<Vec<u8>> {
// We allocate a big buffer, slightly larger than the input data.
let buffer_len = zstd_safe::compress_bound(data.len());
let mut buffer = Vec::with_capacity(buffer_len);

self.compress_to_buffer(data, &mut buffer)?;

// Should we shrink the vec? Meh, let the user do it if he wants.
Ok(buffer)
}

/// Gives mutable access to the internal context.
pub fn context_mut(&mut self) -> &mut zstd_safe::CCtx<'a> {
&mut self.context
}

/// Sets a compression parameter for this compressor.
pub fn set_parameter(
&mut self,
parameter: zstd_safe::CParameter,
) -> io::Result<()> {
self.context
.set_parameter(parameter)
.map_err(map_error_code)?;
Ok(())
}

crate::encoder_parameters!();
}

fn _assert_traits() {
fn _assert_send<T: Send>(_: T) {}

_assert_send(Compressor::new(0));
}
84 changes: 72 additions & 12 deletions src/block/decompressor.rs → src/bulk/decompressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,68 @@ use zstd_safe;
///
/// This reduces memory usage compared to calling `decompress` multiple times.
#[derive(Default)]
pub struct Decompressor {
context: zstd_safe::DCtx<'static>,
dict: Vec<u8>,
pub struct Decompressor<'a> {
context: zstd_safe::DCtx<'a>,
}

impl Decompressor {
impl Decompressor<'static> {
/// Creates a new zstd decompressor.
pub fn new() -> Self {
Decompressor::with_dict(Vec::new())
pub fn new() -> io::Result<Self> {
Self::with_dictionary(&[])
}

/// Creates a new zstd decompressor, using the given dictionary.
pub fn with_dict(dict: Vec<u8>) -> Self {
Decompressor {
context: zstd_safe::create_dctx(),
dict,
}
pub fn with_dictionary(dictionary: &[u8]) -> io::Result<Self> {
let mut decompressor = Self::default();

decompressor.set_dictionary(dictionary)?;

Ok(decompressor)
}
}

impl<'a> Decompressor<'a> {
/// Creates a new decompressor using an existing `DecoderDictionary`.
///
/// Note that using a dictionary means that compression will need to use
/// the same dictionary.
pub fn with_prepared_dictionary(
dictionary: &crate::dict::DecoderDictionary<'a>,
) -> io::Result<Self> {
let mut decompressor = Self::default();

decompressor.set_prepared_dictionary(dictionary)?;

Ok(decompressor)
}

/// Changes the dictionary used by this decompressor.
///
/// Will affect future compression jobs.
///
/// Note that using a dictionary means that compression will need to use
/// the same dictionary.
pub fn set_dictionary(&mut self, dictionary: &[u8]) -> io::Result<()> {
self.context
.load_dictionary(dictionary)
.map_err(map_error_code)?;

Ok(())
}

/// Changes the dictionary used by this decompressor.
///
/// Note that using a dictionary means that compression will need to use
/// the same dictionary.
pub fn set_prepared_dictionary(
&mut self,
dictionary: &crate::dict::DecoderDictionary<'a>,
) -> io::Result<()> {
self.context
.ref_ddict(dictionary.as_ddict())
.map_err(map_error_code)?;

Ok(())
}

/// Deompress a single block of data to the given destination buffer.
Expand All @@ -38,7 +83,7 @@ impl Decompressor {
destination: &mut C,
) -> io::Result<usize> {
self.context
.decompress_using_dict(destination, source, &self.dict)
.decompress(destination, source)
.map_err(map_error_code)
}

Expand All @@ -58,13 +103,28 @@ impl Decompressor {
Ok(buffer)
}

/// Sets a decompression parameter for this decompressor.
pub fn set_parameter(
&mut self,
parameter: zstd_safe::DParameter,
) -> io::Result<()> {
self.context
.set_parameter(parameter)
.map_err(map_error_code)?;
Ok(())
}

crate::decoder_parameters!();

/// Get an upper bound on the decompressed size of data, if available
///
/// This can be used to pre-allocate enough capacity for `decompress_to_buffer`
/// and is used by `decompress` to ensure that it does not over-allocate if
/// you supply a large `capacity`.
///
/// Will return `None` if the upper bound cannot be determined or is larger than `usize::MAX`
///
/// Note that unless the `experimental` feature is enabled, this will always return `None`.
pub fn upper_bound(_data: &[u8]) -> Option<usize> {
#[cfg(feature = "experimental")]
{
Expand Down
10 changes: 5 additions & 5 deletions src/block/mod.rs → src/bulk/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Compress and decompress individual blocks.
//! Compress and decompress data in bulk.
//!
//! These methods process all the input data at once.
//! It is therefore best used with relatively small blocks
Expand All @@ -23,14 +23,14 @@ pub fn compress_to_buffer(
destination: &mut [u8],
level: i32,
) -> io::Result<usize> {
Compressor::new().compress_to_buffer(source, destination, level)
Compressor::new(level)?.compress_to_buffer(source, destination)
}

/// Compresses a block of data and returns the compressed result.
///
/// A level of `0` uses zstd's default (currently `3`).
pub fn compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
Compressor::new().compress(data, level)
Compressor::new(level)?.compress(data)
}

/// Deompress a single block of data to the given destination buffer.
Expand All @@ -41,15 +41,15 @@ pub fn decompress_to_buffer(
source: &[u8],
destination: &mut [u8],
) -> io::Result<usize> {
Decompressor::new().decompress_to_buffer(source, destination)
Decompressor::new()?.decompress_to_buffer(source, destination)
}

/// Decompresses a block of data and returns the decompressed result.
///
/// The decompressed data should be less than `capacity` bytes,
/// or an error will be returned.
pub fn decompress(data: &[u8], capacity: usize) -> io::Result<Vec<u8>> {
Decompressor::new().decompress(data, capacity)
Decompressor::new()?.decompress(data, capacity)
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 7e05cc1

Please sign in to comment.