Skip to content

Commit

Permalink
Implement CompressMode other than KEEP for repacking
Browse files Browse the repository at this point in the history
Alow CompressMode.YES and CompressMode.NO to be used when repacking.
Previously, there was no way to change compression once an object
is stored in a packed file.
  • Loading branch information
zhubonan committed Jun 2, 2022
1 parent 47d4400 commit c7af5af
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 11 deletions.
45 changes: 34 additions & 11 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2578,21 +2578,17 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None:
self.repack_pack(pack_id, compress_mode=compress_mode)
self._vacuum()

def repack_pack(
def repack_pack( # pylint: disable=too-many-branches, too-many-statements
self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP
) -> None:
"""Perform a repack of a given pack object.
This is a maintenance operation.
:param compress_mode: must be a valid CompressMode enum type.
Currently, the only implemented mode is KEEP, meaning that it
preserves the same compression (this means that repacking is *much* faster
as it can simply transfer the bytes without decompressing everything first,
and recompressing it back again).
"""
if compress_mode != CompressMode.KEEP:
raise NotImplementedError("Only keep method currently implemented")
# if compress_mode != CompressMode.KEEP:
# raise NotImplementedError("Only keep method currently implemented")

assert (
pack_id != self._REPACK_PACK_ID
Expand Down Expand Up @@ -2649,26 +2645,53 @@ def repack_pack(
# Since I am assuming above that the method is `KEEP`, I will just transfer
# the bytes. Otherwise I have to properly take into account compression in the
# source and in the destination.
read_handle: Union[PackedObjectReader, ZlibStreamDecompresser]
read_handle = PackedObjectReader(read_pack, offset, length)

obj_dict = {}
obj_dict["id"] = rowid
obj_dict["hashkey"] = hashkey
obj_dict["pack_id"] = self._REPACK_PACK_ID
obj_dict["compressed"] = compressed
obj_dict["size"] = size
obj_dict["offset"] = write_pack_handle.tell()
if compress_mode == CompressMode.KEEP:
obj_dict["compressed"] = compressed
elif compress_mode == CompressMode.YES:
obj_dict["compressed"] = True
elif compress_mode == CompressMode.NO:
obj_dict["compressed"] = False

# Transfer data in chunks.
# No need to rehash - it's the same container so the same hash.
# Not checking the compression on source or destination - we are assuming
# for now that the mode is KEEP.
# Compression cases
# Original Mode Action
# - KEEP Copy in chunks
# yes KEEP Copy in chunks
# no KEEP Copy in chunks
# yes NO Copy in chunks, decompress the stream
# no NO Copy in chunks
# yes YES Copy in chunks
# no YES Copy in chunks, compress when writing
if compress_mode == CompressMode.YES and not compressed:
compressobj = self._get_compressobj_instance()
else:
compressobj = None

# Read compressed, but write uncompressed
if compressed is True and compress_mode == CompressMode.NO:
read_handle = self._get_stream_decompresser()(read_handle)

while True:
chunk = read_handle.read(self._CHUNKSIZE)
if chunk == b"":
# Returns an empty bytes object on EOF.
break
write_pack_handle.write(chunk)
if compressobj is not None:
write_pack_handle.write(compressobj.compress(chunk))
else:
write_pack_handle.write(chunk)
if compressobj is not None:
write_pack_handle.write(compressobj.flush())
obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"]

# Appending for later bulk commit
Expand Down
79 changes: 79 additions & 0 deletions tests/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3270,6 +3270,85 @@ def test_repack(temp_dir):
temp_container.close()


def test_repack_compress_modes(temp_dir):
"""
Test the repacking functionality and handling of CompressMode.
"""
temp_container = Container(temp_dir)
temp_container.init_container(clear=True, pack_size_target=39)

# data of 10 bytes each. Will fill two packs.
data = [
b"-123456789",
b"a123456789",
b"b123456789",
b"c123456789",
b"d123456789",
b"e123456789",
b"f123456789",
b"g123456789",
b"h123456789",
]
compress_flags = [False, True, True, False, False, False, True, True, False]

hashkeys = []
# Add them one by one, so I am sure in wich pack they go
for datum, compress in zip(data, compress_flags):
hashkeys.append(
temp_container.add_objects_to_pack([datum], compress=compress)[0]
)

assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2
assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2

# I check which packs exist
assert sorted(temp_container._list_packs()) == [
"0",
"1",
"2",
]

# I now repack
temp_container.repack_pack(0, compress_mode=CompressMode.NO)
assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[0])["pack_compressed"] is False
assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[1])["pack_compressed"] is False
assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0
assert temp_container.get_object_meta(hashkeys[2])["pack_compressed"] is False

temp_container.repack_pack(1, compress_mode=CompressMode.YES)
assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[3])["pack_compressed"] is True
assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[4])["pack_compressed"] is True
assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[5])["pack_compressed"] is True
assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1
assert temp_container.get_object_meta(hashkeys[6])["pack_compressed"] is True

temp_container.repack_pack(1, compress_mode=CompressMode.KEEP)
assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2
assert temp_container.get_object_meta(hashkeys[7])["pack_compressed"] is True
assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2
assert temp_container.get_object_meta(hashkeys[8])["pack_compressed"] is False

# Check that the content is still correct
# Should not raise
errors = temp_container.validate()
assert not any(errors.values())

# Important before exiting from the tests
temp_container.close()


def test_not_implemented_repacks(temp_container):
"""Check the error for not implemented repack methods."""
# We need to have at least one pack
Expand Down

0 comments on commit c7af5af

Please sign in to comment.