Skip to content

Commit

Permalink
✨ Generalize compression algorithm (#99)
Browse files Browse the repository at this point in the history
The container configuration now accepts a variable for the compression algorithm to use.
Currently, the supported values are zlib, with levels from 1 to 9, but this can be expanded in the future.
  • Loading branch information
giovannipizzi authored Oct 4, 2020
1 parent 1b84d6b commit d786296
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 54 deletions.
47 changes: 34 additions & 13 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import shutil
import uuid
import warnings
import zlib

from collections import defaultdict, namedtuple
from contextlib import contextmanager
Expand All @@ -20,9 +19,9 @@

from .models import Base, Obj
from .utils import (
ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator,
is_known_hash, nullcontext, rename_callback, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted,
detect_where_sorted, yield_first_element
ObjectWriter, PackedObjectReader, CallbackStreamWrapper, Location, chunk_iterator, is_known_hash, nullcontext,
rename_callback, safe_flush_to_disk, get_hash, get_compressobj_instance, get_stream_decompresser,
compute_hash_and_size, merge_sorted, detect_where_sorted, yield_first_element
)
from .exceptions import NotExistent, NotInitialised, InconsistentContent

Expand Down Expand Up @@ -51,9 +50,6 @@ class Container: # pylint: disable=too-many-public-methods
"""A class representing a container of objects (which is stored on a disk folder)"""

_PACK_INDEX_SUFFIX = '.idx'
# Default compression level (when compression is requested)
# This is the lowest one, to get some reasonable compression without too much CPU time required
_COMPRESSLEVEL = 1
# Size in bytes of each of the chunks used when (internally) reading or writing in chunks, e.g.
# when packing.
_CHUNKSIZE = 65536
Expand Down Expand Up @@ -289,7 +285,8 @@ def is_initialised(self):
return True

def init_container( # pylint: disable=bad-continuation
self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256'
self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256',
compression_algorithm='zlib+1'
):
"""Initialise the container folder, if not already done.
Expand All @@ -304,6 +301,7 @@ def init_container( # pylint: disable=bad-continuation
The longer the length, the more folders will be used to store loose
objects. Suggested values: 0 (for not using subfolders) or 2.
:param hash_type: a string defining the hash type to use.
:param compression_algorithm: a string defining the compression algorithm to use for compressed objects.
"""
if loose_prefix_len < 0:
raise ValueError('The loose prefix length can only be zero or a positive integer')
Expand Down Expand Up @@ -345,6 +343,11 @@ def init_container( # pylint: disable=bad-continuation
'There is already some file or folder in the Container folder, I cannot initialise it!'
)

# validate the compression algorithm: check if I'm able to load the classes to compress and decompress
# with the given specified string
get_compressobj_instance(compression_algorithm)
get_stream_decompresser(compression_algorithm)

# Create config file
with open(self._get_config_file(), 'w') as fhandle:
json.dump(
Expand All @@ -353,7 +356,8 @@ def init_container( # pylint: disable=bad-continuation
'loose_prefix_len': loose_prefix_len,
'pack_size_target': pack_size_target,
'hash_type': hash_type,
'container_id': container_id
'container_id': container_id,
'compression_algorithm': compression_algorithm
},
fhandle
)
Expand Down Expand Up @@ -413,6 +417,23 @@ def container_id(self):
"""
return self._get_repository_config()['container_id']

@property
def compression_algorithm(self):
"""Return the compression algorithm defined for this container.
This is read from the repository configuration."""
return self._get_repository_config()['compression_algorithm']

def _get_compressobj_instance(self):
"""Return the correct `compressobj` class for the compression algorithm defined for this container."""
return get_compressobj_instance(self.compression_algorithm)

def _get_stream_decompresser(self):
"""Return a new instance of the correct StreamDecompresser class for the compression algorithm
defined for this container.
"""
return get_stream_decompresser(self.compression_algorithm)

def get_object_content(self, hashkey):
"""Get the content of an object with a given hash key.
Expand Down Expand Up @@ -553,7 +574,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
fhandle=last_open_file, offset=metadata.offset, length=metadata.length
)
if metadata.compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -671,7 +692,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
fhandle=last_open_file, offset=metadata.offset, length=metadata.length
)
if metadata.compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -1107,7 +1128,7 @@ def _write_data_to_packfile(self, pack_handle, read_handle, compress, hash_type=
hasher = get_hash(hash_type=hash_type)()

if compress:
compressobj = zlib.compressobj(level=self._COMPRESSLEVEL)
compressobj = self._get_compressobj_instance()

count_read_bytes = 0
while True:
Expand Down Expand Up @@ -2057,7 +2078,7 @@ def callback(self, action, value):
for hashkey, size, offset, length, compressed in query:
obj_reader = PackedObjectReader(fhandle=pack_handle, offset=offset, length=length)
if compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)

computed_hash, computed_size = compute_hash_and_size(obj_reader, self.hash_type)

Expand Down
91 changes: 86 additions & 5 deletions disk_objectstore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
like the ``LazyOpener``.
"""
# pylint: disable= too-many-lines
import abc
import hashlib
import itertools
import os
Expand Down Expand Up @@ -546,21 +547,40 @@ def wrapper_callback(action, value):
return wrapper_callback


class StreamDecompresser:
"""A class that gets a stream of compressed zlib bytes, and returns the corresponding
class ZlibLikeBaseStreamDecompresser(abc.ABC):
"""A class that gets a stream of compressed bytes, and returns the corresponding
uncompressed bytes when being read via the .read() method.
This is the base class. Define the `decompressobj_class` and `decompress_error` properties to implement concrete
decompresser classes using specific algorithms that follow the zlib API.
"""

_CHUNKSIZE = 524288

@property
@abc.abstractmethod
def decompressobj_class(self):
"""Return here the `decompressobj` class of the given compression type.
Needs to be implemented by subclasses.
"""

@property
@abc.abstractmethod
def decompress_error(self):
"""Return here the Exception (or tuple of exceptions) that need to be caught if there is a compression error.
Needs to be implemented by subclasses.
"""

def __init__(self, compressed_stream):
"""Create the class from a given compressed bytestream.
:param compressed_stream: an open bytes stream that supports the .read() method,
returning a valid compressed stream.
"""
self._compressed_stream = compressed_stream
self._decompressor = zlib.decompressobj()
self._decompressor = self.decompressobj_class()
self._internal_buffer = b''
self._pos = 0

Expand Down Expand Up @@ -602,7 +622,7 @@ def read(self, size=-1):
# .unconsumed_tail and reused a the next loop
try:
decompressed_chunk = self._decompressor.decompress(compressed_chunk, size)
except zlib.error as exc:
except self.decompress_error as exc:
raise ValueError('Error while uncompressing data: {}'.format(exc))
self._internal_buffer += decompressed_chunk

Expand Down Expand Up @@ -653,7 +673,7 @@ def seek(self, target, whence=0):
if target == 0:
# Going back to zero it's efficient. I need to reset all internal variables, as in the init.
self._compressed_stream.seek(0)
self._decompressor = zlib.decompressobj()
self._decompressor = self.decompressobj_class()
self._internal_buffer = b''
self._pos = 0
return 0
Expand All @@ -673,6 +693,67 @@ def seek(self, target, whence=0):
return self._pos


class ZlibStreamDecompresser(ZlibLikeBaseStreamDecompresser):
"""A class that gets a stream of compressed bytes using ZLIB, and returns the corresponding
uncompressed bytes when being read via the .read() method."""

@property
def decompressobj_class(self):
"""Return the `decompressobj` class of zlib."""
return zlib.decompressobj

@property
def decompress_error(self):
"""Return the zlib error raised when there is an error."""
return zlib.error


def _get_compression_algorithm_info(algorithm):
"""Return a compresser and a decompresser for the given algorithm."""
known_algorithms = {
'zlib': {
'compressobj': zlib.compressobj,
'variant_name': 'level',
'variant_mapper': {str(i): i for i in range(1, 10)}, # from 1 to 9
'decompresser': ZlibStreamDecompresser
}
}

algorithm_name, _, variant = algorithm.partition('+')
try:
algorithm_info = known_algorithms[algorithm_name]
except KeyError:
raise ValueError("Unknown or unsupported compression algorithm '{}'".format(algorithm_name))
try:
kwargs = {algorithm_info['variant_name']: algorithm_info['variant_mapper'][variant]}
compresser = algorithm_info['compressobj'](**kwargs)
except KeyError:
raise ValueError("Invalid variant '{}' for compression algorithm '{}'".format(variant, algorithm_name))

decompresser = algorithm_info['decompresser']

return compresser, decompresser


def get_compressobj_instance(algorithm):
"""Return a compressobj class with a given algorithm.
:param algorithm: A string defining the algorithm and its variant.
The algorithm is split by a + sign from the variant.
E.g. 'zlib+1' means using a level 1, while 'zlib+9' indicates a zlib compression with level 9
(slower but compressing more).
"""
return _get_compression_algorithm_info(algorithm)[0]


def get_stream_decompresser(algorithm):
"""Return a StreamDecompresser class with a given algorithm.
:param algorithm: a compression algorithm (see `get_compressionobj_instance` for a description).
"""
return _get_compression_algorithm_info(algorithm)[1]


class ZeroStream:
"""A class to return an (unseekable) stream returning only zeros, with length length."""

Expand Down
Loading

0 comments on commit d786296

Please sign in to comment.