Skip to content

Commit

Permalink
Add whence=2 to be used
Browse files Browse the repository at this point in the history
For reading packed file as it is, there is no need to restrict
whence to be only 0 or 1.
Compressed files are more tricky as it is not possible to freely
seek to the end. Instead, the entire files will be decompressed back
into a loose file, which will then be opened for reading.
If such file exists already it will be used, so we don't decompress
twice. Such "cache" files are deleted during the routine maintainance
operations (e.g. pack_all_loose).
  • Loading branch information
zhubonan authored and giovannipizzi committed Jul 4, 2023
1 parent cbb3fd8 commit 5515ab6
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 22 deletions.
30 changes: 28 additions & 2 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,9 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
length=metadata.length,
)
if metadata.compressed:
obj_reader = self._get_stream_decompresser()(obj_reader)
obj_reader = self._get_stream_decompresser()(
obj_reader, container=self, hashkey=metadata.hashkey
)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -794,7 +796,9 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
length=metadata.length,
)
if metadata.compressed:
obj_reader = self._get_stream_decompresser()(obj_reader)
obj_reader = self._get_stream_decompresser()(
obj_reader, container=self, hashkey=metadata.hashkey
)
yield metadata.hashkey, obj_reader, meta
else:
yield metadata.hashkey, meta
Expand Down Expand Up @@ -1840,6 +1844,28 @@ def add_objects_to_pack( # pylint: disable=too-many-arguments
do_commit=do_commit,
)

def _lossen_object(self, hashkey):
"""
Write a specific object to the loose directory, return the path to the loose file
"""
_read_chunk_size = 524288

# Here I just use a new object writer that writes the stream as loose file
writer = self._new_object_writer()

with self.get_object_stream(hashkey) as stream:
with writer as fhandle:
while True:
chunk = stream.read(_read_chunk_size)
if not chunk:
break
fhandle.write(chunk)
written_hashkey = writer.get_hashkey()
assert (
written_hashkey == hashkey
), "Mismatch in the hashkey - something is seriously wrong"
return self._get_loose_path_from_hashkey(hashkey)

def _vacuum(self) -> None:
"""Perform a `VACUUM` operation on the SQLite operation.
Expand Down
91 changes: 74 additions & 17 deletions disk_objectstore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,23 +420,28 @@ def seek(self, target: int, whence: int = 0) -> int:
:raises NotImplementedError: if ``whence`` is not 0 or 1.
"""
if whence not in [0, 1]:
if whence not in [0, 1, 2]:
raise NotImplementedError(
"Invalid value for `whence`: only 0 and 1 are currently implemented."
"Invalid value for `whence`: only 0, 1 and 2 are currently implemented."
)

if whence == 1:
target = self.tell() + target
if whence in [0, 1]:
if whence == 1:
target = self.tell() + target

if target < 0:
raise ValueError(
"specified target would exceed the lower boundary of bytes that are accessible."
)
if target > self._length:
raise ValueError(
"specified target would exceed the upper boundary of bytes that are accessible."
)
new_pos = self._offset + target
else:
# Seek relative to the end
new_pos = self._offset + self._length + target

if target < 0:
raise ValueError(
"specified target would exceed the lower boundary of bytes that are accessible."
)
if target > self._length:
raise ValueError(
"specified target would exceed the upper boundary of bytes that are accessible."
)
new_pos = self._offset + target
self._fhandle.seek(new_pos)
# Next function MUST be called every time we move into the _fhandle file, to update the position
self._update_pos()
Expand Down Expand Up @@ -645,16 +650,22 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC):

_CHUNKSIZE = 524288

def __init__(self, compressed_stream: StreamSeekBytesType) -> None:
def __init__(
self, compressed_stream: StreamSeekBytesType, container=None, hashkey=None
) -> None:
"""Create the class from a given compressed bytestream.
:param compressed_stream: an open bytes stream that supports the .read() method,
returning a valid compressed stream.
:param whence_callback: A callback that should return a new handler of the uncompressed stream.
"""
self._compressed_stream = compressed_stream
self._decompressor = self.decompressobj_class()
self._internal_buffer = b""
self._pos = 0
self._container = container
self._hashkey = hashkey
self._uncompressed_stream: Union[None, BinaryIO] = None

@property
def mode(self) -> str:
Expand All @@ -663,6 +674,14 @@ def mode(self) -> str:
def read(self, size: int = -1) -> bytes:
"""
Read and return up to n bytes.
"""
if self._uncompressed_stream is not None:
return self._uncompressed_stream.read(size)
return self._read(size)

def _read(self, size: int = -1) -> bytes:
"""
Read and return up to n bytes.
If the argument is omitted, None, or negative, reads and
returns all data until EOF (that corresponds to the length specified
Expand Down Expand Up @@ -732,6 +751,8 @@ def __enter__(self) -> "ZlibLikeBaseStreamDecompresser":

def __exit__(self, exc_type, value, traceback) -> None:
"""Close context manager."""
if self._uncompressed_stream is not None:
self._uncompressed_stream.close()

@property
@abc.abstractmethod
Expand All @@ -756,9 +777,17 @@ def seekable() -> bool:

def tell(self) -> int:
"""Return current position in file."""
if self._uncompressed_stream is not None:
return self._uncompressed_stream.tell()
return self._pos

def seek(self, target: int, whence: int = 0) -> int:
"""Seek the stream"""
if self._uncompressed_stream is not None:
return self._uncompressed_stream.seek(target, whence)
return self._seek_compressed(target, whence)

def _seek_compressed(self, target: int, whence: int = 0) -> int:
"""Change stream position.
..note:: This is particularly inefficient if `target > 0` since it will have
Expand All @@ -769,9 +798,17 @@ def seek(self, target: int, whence: int = 0) -> int:
read_chunk_size = 256 * 1024

if whence not in [0, 1]:
raise NotImplementedError(
"Invalid value for `whence`: only 0 and 1 are currently implemented."
)
if self._container is None:
raise NotImplementedError(
"Invalid value for `whence`: only 1 and 0 are currently implemented without container support."
)
# I have a container backing me up - so I can try if I can get the uncompressed stream
# instead so whence = 2 can be used with it
stream = self._get_uncompressed_stream()
# Seek the new stream to the current location
stream.seek(self._pos, 0)
# Seek to the desired location as requested
return self.seek(target, whence)

if whence == 1:
target = self.tell() + target
Expand Down Expand Up @@ -800,6 +837,26 @@ def seek(self, target: int, whence: int = 0) -> int:
# Differently than files, I return here the actual position
return self._pos

def _get_uncompressed_stream(self):
"""Obtain a uncompressed stream with extended support of `seek`"""
# Obtain the path to the loose file
object_path = self._container._get_loose_path_from_hashkey( # pylint: disable=protected-access
hashkey=self._hashkey
)
try:
self._uncompressed_stream = open(object_path, "rb")
except FileNotFoundError:
# The loose file is not found, hence we loosen the object here
object_path = (
self._container._lossen_object( # pylint: disable=protected-access
self._hashkey
)
)
self._uncompressed_stream = open( # pylint: disable=consider-using-with
object_path, "rb"
)
return self._uncompressed_stream


class ZlibStreamDecompresser(ZlibLikeBaseStreamDecompresser):
"""A class that gets a stream of compressed bytes using ZLIB, and returns the corresponding
Expand Down
8 changes: 8 additions & 0 deletions tests/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ def test_stream_seek(temp_container, use_compression, use_packing):
stream.seek(offset, 1)
assert stream.read() == content[(10 + 3) :]

# Test whence=2, by first seeking to the tenth byte, seeking 3 from the end
stream.seek(10)
offset = -3
stream.seek(offset, 2)
assert stream.read() == content[offset:]
# Check that we have lossen the file
assert os.path.isfile(temp_container._get_loose_path_from_hashkey(hashkey))


def test_num_packs_with_target_size(temp_dir, generate_random_data):
"""Add a number of objects directly to packs, with a small pack_size_target.
Expand Down
6 changes: 3 additions & 3 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,9 @@ def test_packed_object_reader_seek(tmp_path):
# Check that it is properly marked as seekable
assert packed_reader.seekable()

# Check that whence==2 is not implemented
with pytest.raises(NotImplementedError):
packed_reader.seek(0, 2)
# Check that whence==2 is implemented
packed_reader.seek(-2, 2)
assert packed_reader.read() == bytestream[offset + length - 2 : offset + length]

# Check that negative values and values > length are not valid
with pytest.raises(ValueError):
Expand Down

0 comments on commit 5515ab6

Please sign in to comment.