Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Generalize compression algorithm #99

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import shutil
import uuid
import warnings
import zlib

from collections import defaultdict, namedtuple
from contextlib import contextmanager
Expand All @@ -20,9 +19,9 @@

from .models import Base, Obj
from .utils import (
ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator,
is_known_hash, nullcontext, rename_callback, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted,
detect_where_sorted, yield_first_element
ObjectWriter, PackedObjectReader, CallbackStreamWrapper, Location, chunk_iterator, is_known_hash, nullcontext,
rename_callback, safe_flush_to_disk, get_hash, get_compressobj_instance, get_stream_decompresser,
compute_hash_and_size, merge_sorted, detect_where_sorted, yield_first_element
)
from .exceptions import NotExistent, NotInitialised, InconsistentContent

Expand Down Expand Up @@ -51,9 +50,6 @@ class Container: # pylint: disable=too-many-public-methods
"""A class representing a container of objects (which is stored on a disk folder)"""

_PACK_INDEX_SUFFIX = '.idx'
# Default compression level (when compression is requested)
# This is the lowest one, to get some reasonable compression without too much CPU time required
_COMPRESSLEVEL = 1
# Size in bytes of each of the chunks used when (internally) reading or writing in chunks, e.g.
# when packing.
_CHUNKSIZE = 65536
Expand Down Expand Up @@ -289,7 +285,8 @@ def is_initialised(self):
return True

def init_container( # pylint: disable=bad-continuation
self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256'
self, clear=False, pack_size_target=4 * 1024 * 1024 * 1024, loose_prefix_len=2, hash_type='sha256',
compression_algorithm='zlib+1'
):
"""Initialise the container folder, if not already done.

Expand All @@ -304,6 +301,7 @@ def init_container( # pylint: disable=bad-continuation
The longer the length, the more folders will be used to store loose
objects. Suggested values: 0 (for not using subfolders) or 2.
:param hash_type: a string defining the hash type to use.
:param compression_algorithm: a string defining the compression algorithm to use for compressed objects.
"""
if loose_prefix_len < 0:
raise ValueError('The loose prefix length can only be zero or a positive integer')
Expand Down Expand Up @@ -345,6 +343,11 @@ def init_container( # pylint: disable=bad-continuation
'There is already some file or folder in the Container folder, I cannot initialise it!'
)

# validate the compression algorithm: check if I'm able to load the classes to compress and decompress
# with the given specified string
get_compressobj_instance(compression_algorithm)
get_stream_decompresser(compression_algorithm)

# Create config file
with open(self._get_config_file(), 'w') as fhandle:
json.dump(
Expand All @@ -353,7 +356,8 @@ def init_container( # pylint: disable=bad-continuation
'loose_prefix_len': loose_prefix_len,
'pack_size_target': pack_size_target,
'hash_type': hash_type,
'container_id': container_id
'container_id': container_id,
'compression_algorithm': compression_algorithm
},
fhandle
)
Expand Down Expand Up @@ -413,6 +417,23 @@ def container_id(self):
"""
return self._get_repository_config()['container_id']

@property
def compression_algorithm(self):
"""Return the compression algorithm defined for this container.

This is read from the repository configuration."""
return self._get_repository_config()['compression_algorithm']

def _get_compressobj_instance(self):
"""Return the correct `compressobj` class for the compression algorithm defined for this container."""
return get_compressobj_instance(self.compression_algorithm)

def _get_stream_decompresser(self):
"""Return a new instance of the correct StreamDecompresser class for the compression algorithm
defined for this container.
"""
return get_stream_decompresser(self.compression_algorithm)

def get_object_content(self, hashkey):
"""Get the content of an object with a given hash key.

Expand Down Expand Up @@ -553,7 +574,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
fhandle=last_open_file, offset=metadata.offset, length=metadata.length
)
if metadata.compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -671,7 +692,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
fhandle=last_open_file, offset=metadata.offset, length=metadata.length
)
if metadata.compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -1107,7 +1128,7 @@ def _write_data_to_packfile(self, pack_handle, read_handle, compress, hash_type=
hasher = get_hash(hash_type=hash_type)()

if compress:
compressobj = zlib.compressobj(level=self._COMPRESSLEVEL)
compressobj = self._get_compressobj_instance()

count_read_bytes = 0
while True:
Expand Down Expand Up @@ -2057,7 +2078,7 @@ def callback(self, action, value):
for hashkey, size, offset, length, compressed in query:
obj_reader = PackedObjectReader(fhandle=pack_handle, offset=offset, length=length)
if compressed:
obj_reader = StreamDecompresser(obj_reader)
obj_reader = self._get_stream_decompresser()(obj_reader)

computed_hash, computed_size = compute_hash_and_size(obj_reader, self.hash_type)

Expand Down
91 changes: 86 additions & 5 deletions disk_objectstore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
like the ``LazyOpener``.
"""
# pylint: disable= too-many-lines
import abc
import hashlib
import itertools
import os
Expand Down Expand Up @@ -546,21 +547,40 @@ def wrapper_callback(action, value):
return wrapper_callback


class StreamDecompresser:
"""A class that gets a stream of compressed zlib bytes, and returns the corresponding
class ZlibLikeBaseStreamDecompresser(abc.ABC):
"""A class that gets a stream of compressed bytes, and returns the corresponding
uncompressed bytes when being read via the .read() method.

This is the base class. Define the `decompressobj_class` and `decompress_error` properties to implement concrete
decompresser classes using specific algorithms that follow the zlib API.
"""

_CHUNKSIZE = 524288

@property
@abc.abstractmethod
def decompressobj_class(self):
"""Return here the `decompressobj` class of the given compression type.

Needs to be implemented by subclasses.
"""

@property
@abc.abstractmethod
def decompress_error(self):
"""Return here the Exception (or tuple of exceptions) that need to be caught if there is a compression error.

Needs to be implemented by subclasses.
"""

def __init__(self, compressed_stream):
"""Create the class from a given compressed bytestream.

:param compressed_stream: an open bytes stream that supports the .read() method,
returning a valid compressed stream.
"""
self._compressed_stream = compressed_stream
self._decompressor = zlib.decompressobj()
self._decompressor = self.decompressobj_class()
self._internal_buffer = b''
self._pos = 0

Expand Down Expand Up @@ -602,7 +622,7 @@ def read(self, size=-1):
# .unconsumed_tail and reused a the next loop
try:
decompressed_chunk = self._decompressor.decompress(compressed_chunk, size)
except zlib.error as exc:
except self.decompress_error as exc:
raise ValueError('Error while uncompressing data: {}'.format(exc))
self._internal_buffer += decompressed_chunk

Expand Down Expand Up @@ -653,7 +673,7 @@ def seek(self, target, whence=0):
if target == 0:
# Going back to zero it's efficient. I need to reset all internal variables, as in the init.
self._compressed_stream.seek(0)
self._decompressor = zlib.decompressobj()
self._decompressor = self.decompressobj_class()
self._internal_buffer = b''
self._pos = 0
return 0
Expand All @@ -673,6 +693,67 @@ def seek(self, target, whence=0):
return self._pos


class ZlibStreamDecompresser(ZlibLikeBaseStreamDecompresser):
"""A class that gets a stream of compressed bytes using ZLIB, and returns the corresponding
uncompressed bytes when being read via the .read() method."""

@property
def decompressobj_class(self):
"""Return the `decompressobj` class of zlib."""
return zlib.decompressobj

@property
def decompress_error(self):
"""Return the zlib error raised when there is an error."""
return zlib.error


def _get_compression_algorithm_info(algorithm):
"""Return a compresser and a decompresser for the given algorithm."""
known_algorithms = {
'zlib': {
'compressobj': zlib.compressobj,
'variant_name': 'level',
'variant_mapper': {str(i): i for i in range(1, 10)}, # from 1 to 9
'decompresser': ZlibStreamDecompresser
}
}

algorithm_name, _, variant = algorithm.partition('+')
try:
algorithm_info = known_algorithms[algorithm_name]
except KeyError:
raise ValueError("Unknown or unsupported compression algorithm '{}'".format(algorithm_name))
try:
kwargs = {algorithm_info['variant_name']: algorithm_info['variant_mapper'][variant]}
compresser = algorithm_info['compressobj'](**kwargs)
except KeyError:
raise ValueError("Invalid variant '{}' for compression algorithm '{}'".format(variant, algorithm_name))

decompresser = algorithm_info['decompresser']

return compresser, decompresser


def get_compressobj_instance(algorithm):
"""Return a compressobj class with a given algorithm.

:param algorithm: A string defining the algorithm and its variant.
The algorithm is split by a + sign from the variant.
E.g. 'zlib+1' means using a level 1, while 'zlib+9' indicates a zlib compression with level 9
(slower but compressing more).
"""
return _get_compression_algorithm_info(algorithm)[0]


def get_stream_decompresser(algorithm):
"""Return a StreamDecompresser class with a given algorithm.

:param algorithm: a compression algorithm (see `get_compressionobj_instance` for a description).
"""
return _get_compression_algorithm_info(algorithm)[1]


class ZeroStream:
"""A class to return an (unseekable) stream returning only zeros, with length length."""

Expand Down
Loading