diff --git a/disk_objectstore/__init__.py b/disk_objectstore/__init__.py index d819f8f..eaadf61 100644 --- a/disk_objectstore/__init__.py +++ b/disk_objectstore/__init__.py @@ -2,8 +2,8 @@ It does not require a server running. """ -from .container import Container, ObjectType +from .container import Container, ObjectType, CompressMode -__all__ = ('Container', 'ObjectType') +__all__ = ('Container', 'ObjectType', 'CompressMode') __version__ = '0.4.0' diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 481711f..6d168da 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -6,6 +6,7 @@ import json import os import shutil +import warnings import zlib from collections import defaultdict, namedtuple @@ -18,8 +19,9 @@ from .models import Base, Obj from .utils import ( - ObjectWriter, PackedObjectReader, StreamDecompresser, chunk_iterator, is_known_hash, nullcontext, - safe_flush_to_disk, get_hash, compute_hash_and_size + ObjectWriter, PackedObjectReader, StreamDecompresser, CallbackStreamWrapper, Location, chunk_iterator, + is_known_hash, nullcontext, rename_callback, safe_flush_to_disk, get_hash, compute_hash_and_size, merge_sorted, + detect_where_sorted, yield_first_element ) from .exceptions import NotExistent, NotInitialised, InconsistentContent @@ -33,6 +35,17 @@ class ObjectType(Enum): MISSING = 'missing' +class CompressMode(Enum): + """Various possible behaviors when compressing. + + For now used only in the `repack` function, should probably be applied to all functions + that have a `compress` kwarg. + """ + NO = 'no' # pylint: disable=invalid-name + YES = 'yes' + KEEP = 'keep' # Keep the current compression when repacking. + + class Container: # pylint: disable=too-many-public-methods """A class representing a container of objects (which is stored on a disk folder)""" @@ -44,6 +57,10 @@ class Container: # pylint: disable=too-many-public-methods # when packing. _CHUNKSIZE = 65536 + # The pack ID that is used for repacking as a temporary location. + # NOTE: It MUST be an integer and it MUST be < 0 to avoid collisions with 'actual' packs + _REPACK_PACK_ID = -1 + # When performing an `in_` query in SQLite, this is converted to something like # 'SELECT * FROM db_object WHERE db_object.hashkey IN (?, ?)' with parameters = ('hash1', 'hash2') # Now, the maximum number of parameters is limited in SQLite, see variable SQLITE_MAX_VARIABLE_NUMBER @@ -56,6 +73,13 @@ class Container: # pylint: disable=too-many-public-methods # See also e.g. this comment https://bugzilla.redhat.com/show_bug.cgi?id=1798134 _IN_SQL_MAX_LENGTH = 950 + # If the length of required elements is larger than this, instead of iterating an IN statement over chunks of size + # _IN_SQL_MAX_LENGTH, it just quickly lists all elements (ordered by hashkey, requires a VACUUMed DB for + # performance) and returns only the intersection. + # This length might need some benchmarking, but seems OK on very large DBs of 6M nodes + # (after VACUUMing, as mentioned above). + _MAX_CHUNK_ITERATE_LENGTH = 9500 + def __init__(self, folder): """Create the class that represents the container. @@ -199,13 +223,15 @@ def _get_loose_path_from_hashkey(self, hashkey): # if loose_prefix_len is zero, there is no subfolder return os.path.join(self._get_loose_folder(), hashkey) - def _get_pack_path_from_pack_id(self, pack_id): + def _get_pack_path_from_pack_id(self, pack_id, allow_repack_pack=False): """Return the path of the pack file on disk for the given pack ID. :param pack_id: the pack ID. + :param pack_id: Whether to allow the repack pack id """ pack_id = str(pack_id) - assert self._is_valid_pack_id(pack_id), 'Invalid pack ID {}'.format(pack_id) + assert self._is_valid_pack_id(pack_id, + allow_repack_pack=allow_repack_pack), 'Invalid pack ID {}'.format(pack_id) return os.path.join(self._get_pack_folder(), pack_id) def _get_pack_index_path(self): @@ -441,7 +467,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too (i.e., neither packed nor loose). If False, return ``None`` instead of the stream. :param with_streams: if True, yield triplets (hashkey, stream, meta). - :param with_streams: if False, yield pairs (hashkey, meta) and avoid to open any file. + If False, yield pairs (hashkey, meta) and avoid to open any file. """ # pylint: disable=too-many-nested-blocks @@ -462,17 +488,29 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too # to order in python instead session = self._get_cached_session() - # Operate in chunks, due to the SQLite limits - # (see comment above the definition of self._IN_SQL_MAX_LENGTH) - for chunk in chunk_iterator(hashkeys_set, size=self._IN_SQL_MAX_LENGTH): - query = session.query(Obj).filter( - Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, - Obj.size).order_by(Obj.offset) - for res in query: - packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH: + # Operate in chunks, due to the SQLite limits + # (see comment above the definition of self._IN_SQL_MAX_LENGTH) + for chunk in chunk_iterator(hashkeys_set, size=self._IN_SQL_MAX_LENGTH): + query = session.query(Obj).filter( + Obj.hashkey.in_(chunk) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) + for res in query: + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + else: + sorted_hashkeys = sorted(hashkeys_set) + pack_iterator = session.execute( + 'SELECT pack_id, hashkey, offset, length, compressed, size FROM db_object ORDER BY hashkey' + ) + # The left_key returns the second element of the tuple, i.e. the hashkey (that is the value to compare + # with the right iterator) + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[1]): + if where == Location.BOTH: + # If it's in both, it returns the left one, i.e. the full data from the DB + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) for pack_int_id, pack_metadata in packs.items(): + pack_metadata.sort(key=lambda metadata: metadata.offset) hashkeys_in_packs.update(obj.hashkey for obj in pack_metadata) pack_path = self._get_pack_path_from_pack_id(str(pack_int_id)) try: @@ -565,19 +603,31 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too packs = defaultdict(list) session = self._get_cached_session() - for chunk in chunk_iterator(loose_not_found, size=self._IN_SQL_MAX_LENGTH): - query = session.query(Obj).filter( - Obj.hashkey.in_(chunk) - ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, - Obj.size).order_by(Obj.offset) - for res in query: - packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + if len(loose_not_found) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_not_found, size=self._IN_SQL_MAX_LENGTH): + query = session.query(Obj).filter( + Obj.hashkey.in_(chunk) + ).with_entities(Obj.pack_id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed, Obj.size) + for res in query: + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) + else: + sorted_hashkeys = sorted(loose_not_found) + pack_iterator = session.execute( + 'SELECT pack_id, hashkey, offset, length, compressed, size FROM db_object ORDER BY hashkey' + ) + # The left_key returns the second element of the tuple, i.e. the hashkey (that is the value to compare + # with the right iterator) + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[1]): + if where == Location.BOTH: + # If it's in both, it returns the left one, i.e. the full data from the DB + packs[res[0]].append(ObjQueryResults(res[1], res[2], res[3], res[4], res[5])) # I will construct here the really missing objects. # I make a copy of the set. really_not_found = loose_not_found.copy() for pack_int_id, pack_metadata in packs.items(): + pack_metadata.sort(key=lambda metadata: metadata.offset) # I remove those that I found really_not_found.difference_update(obj.hashkey for obj in pack_metadata) @@ -835,15 +885,20 @@ def count_objects(self): return retval - @staticmethod - def _is_valid_pack_id(pack_id): - """Return True if the name is a valid pack ID.""" + @classmethod + def _is_valid_pack_id(cls, pack_id, allow_repack_pack=False): + """Return True if the name is a valid pack ID. + + If allow_repack_pack is True, also the pack id used for repacking is considered as valid. + """ if not pack_id: # Must be a non-empty string return False if pack_id != '0' and pack_id[0] == '0': # The ID must be a valid integer: either zero, or it should not start by zero return False + if allow_repack_pack and pack_id == str(cls._REPACK_PACK_ID): + return True if not all(char in '0123456789' for char in pack_id): return False return True @@ -904,7 +959,7 @@ def get_total_size(self): return retval @contextmanager - def lock_pack(self, pack_id): + def lock_pack(self, pack_id, allow_repack_pack=False): """Lock the given pack id. Use as a context manager. Raise if the pack is already locked. If you enter the context manager, @@ -912,14 +967,16 @@ def lock_pack(self, pack_id): Important to use for avoiding concurrent access/append to the same file. :param pack_id: a string with a valid pack name. + :param allow_pack_repack: if True, allow to open the pack file used for repacking """ - assert self._is_valid_pack_id(pack_id) + assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), '{}.lock'.format(pack_id)) + pack_file = self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=allow_repack_pack) try: with open(lock_file, 'x'): - with open(self._get_pack_path_from_pack_id(pack_id), 'ab') as pack_handle: + with open(pack_file, 'ab') as pack_handle: yield pack_handle finally: # Release resource (I check if it exists in case there was an exception) @@ -992,11 +1049,9 @@ def list_all_objects(self): loose_objects.difference_update((hashkey,)) yield hashkey - if results_chunk: - last_pk = results_chunk[-1][0] - else: - # No more packed objects + if not results_chunk: break + last_pk = results_chunk[-1][0] # What is left are the loose objects that are not in the packs for hashkey in loose_objects: @@ -1056,12 +1111,18 @@ def _write_data_to_packfile(self, pack_handle, read_handle, compress, hash_type= return (count_read_bytes, hasher.hexdigest() if hash_type else None) - def pack_all_loose(self, compress=False, validate_objects=True): + def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches + self, compress=False, validate_objects=True, do_fsync=True + ): """Pack all loose objects. This is a maintenance operation, needs to be done only by one process. :param compress: if True, compress objects before storing them. :param validate_objects: if True, recompute the hash while packing, and raises if there is a problem. + :param do_fsync: if True, calls a flush to disk of the pack files before closing it. + Needed to guarantee that data will be there even in the case of a power loss. + Set to False if you don't need such a guarantee (anyway the loose version will be kept, + so often this guarantee is not strictly needed). """ hash_type = self.hash_type if validate_objects else None @@ -1076,10 +1137,20 @@ def pack_all_loose(self, compress=False, validate_objects=True): existing_packed_hashkeys = [] - for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): - # I check the hash keys that are already in the pack - for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): - existing_packed_hashkeys.append(res[0]) + if len(loose_objects) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): + # I check the hash keys that are already in the pack + for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): + existing_packed_hashkeys.append(res[0]) + else: + sorted_hashkeys = sorted(loose_objects) + pack_iterator = session.execute('SELECT hashkey FROM db_object ORDER BY hashkey') + + # The query returns a tuple of length 1, so I still need a left_key + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[0]): + if where == Location.BOTH: + existing_packed_hashkeys.append(res[0]) + # I remove them from the loose_objects list loose_objects.difference_update(existing_packed_hashkeys) # Now, I should be left only with objects with hash keys that are not yet known. @@ -1100,6 +1171,12 @@ def pack_all_loose(self, compress=False, validate_objects=True): with self.lock_pack(str(pack_int_id)) as pack_handle: # Inner loop: continue until when there is a file, or # if we need to change pack (in this case `break` is called) + + # We will store here the dictionaries with the data to be pushed in the DB + # By collecting the data and committing as a bulk operation at the end, + # we highly improve performance + obj_dicts = [] + while loose_objects: # Check in which pack I need to write to the next object pack_int_id = self._get_pack_id_to_write_to() @@ -1116,15 +1193,16 @@ def pack_all_loose(self, compress=False, validate_objects=True): # Get next hash key to process loose_hashkey = loose_objects.pop() - obj = Obj(hashkey=loose_hashkey) - obj.pack_id = pack_int_id - obj.compressed = compress - obj.offset = pack_handle.tell() + obj_dict = {} + obj_dict['hashkey'] = loose_hashkey + obj_dict['pack_id'] = pack_int_id + obj_dict['compressed'] = compress + obj_dict['offset'] = pack_handle.tell() try: with open(self._get_loose_path_from_hashkey(loose_hashkey), 'rb') as loose_handle: # The second parameter is `None` since we are not computing the hash # We can instead pass the hash algorithm and assert that it is correct - obj.size, new_hashkey = self._write_data_to_packfile( + obj_dict['size'], new_hashkey = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=loose_handle, compress=compress, @@ -1140,11 +1218,25 @@ def pack_all_loose(self, compress=False, validate_objects=True): loose_hashkey, new_hashkey ) ) - obj.length = pack_handle.tell() - obj.offset - session.add(obj) + obj_dict['length'] = pack_handle.tell() - obj_dict['offset'] + + # Appending for later bulk commit - see comments in add_streamed_objects_to_pack + obj_dicts.append(obj_dict) + + # It's now time to write to the DB, in a single bulk operation (per pack) + if obj_dicts: + # Here I shouldn't need to do `OR IGNORE` as in `add_streamed_objects_to_pack` + # Because I'm already checking the hash keys and avoiding to add twice the same + session.execute(Obj.__table__.insert(), obj_dicts) + # Clean up the list - this will be cleaned up also later, + # but it's better to make sure that we do it here, to avoid trying to rewrite + # the same objects again + obj_dicts = [] + # I don't commit here; I commit after making sure the file is flushed and closed # flush and sync to disk before closing - safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) + if do_fsync: + safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) # OK, if we are here, file was flushed, synced to disk and closed. # Let's commit then the information to the DB, so it's officially a @@ -1161,8 +1253,50 @@ def pack_all_loose(self, compress=False, validate_objects=True): # on Mac and on Windows (see issues #37 and #43). Therefore, I do NOT delete them, # and deletion is deferred to a manual clean-up operation. - def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements - self, stream_list, compress=False, open_streams=False, no_holes=False, no_holes_read_twice=True): + def add_streamed_object_to_pack( # pylint: disable=too-many-arguments + self, + stream, + compress=False, + open_streams=False, + no_holes=False, + no_holes_read_twice=True, + callback=None, + callback_size_hint=0, + do_fsync=True, + do_commit=True + ): + """Add a single object in streamed form to a pack. + + For the description of the parameters, see the docstring of ``add_streamed_objects_to_pack``. + + The only difference is that here the callback will provide feedback on the progress of this specific object. + :param callback_size_hint: the expected size of the stream - if provided, it is used send back the total + length in the callbacks + :return: a single object hash key + """ + streams = [CallbackStreamWrapper(stream, callback=callback, total_length=callback_size_hint)] + + # I specifically set the callback to None + retval = self.add_streamed_objects_to_pack( + streams, + compress=compress, + open_streams=open_streams, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=None, + do_fsync=do_fsync, + do_commit=do_commit, + ) + + # Close the callback so the bar doesn't remaing open + streams[0].close_callback() + + return retval[0] + + + def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments + self, stream_list, compress=False, open_streams=False, no_holes=False, no_holes_read_twice=True, + callback=None, do_fsync=True, do_commit=True): """Add objects directly to a pack, reading from a list of streams. This is a maintenance operation, available mostly for efficiency reasons @@ -1185,6 +1319,15 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b This of course gives a performance hit as data has to be read twice, and rehashed twice; but avoids risking to damage the hard drive if e.g. re-importing the exact same data). This variable is ignored if `no_holes` is False. + :param do_fsync: if True (default), call an fsync for every pack file, to ensure flushing to + disk. Important to guarantee that data is not lost even in the case of a power loss. + For performance (especially if you don't need such a guarantee, e.g. if you are creating + from scratch a new repository with copy of objects), set it to False. + :param do_commit: if True (default), commit data to the DB after every pack is written. + In this way, even if there is an issue, partial objects end up in the repository. + Set to False for efficiency if you need to call this function multiple times. In this case, + however, remember to call a `commit()` call on the `session` manually at the end of the + operations! (See e.g. the `import_files()` method). :return: a list of object hash keys """ yield_per_size = 1000 @@ -1197,6 +1340,17 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b session = self._get_cached_session() if no_holes: + if callback: + total = session.query(Obj).count() + if total: + # If we have a callback, compute the total count of objects in this pack + callback(action='init', value={'total': total, 'description': 'List existing'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 400), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + known_packed_hashkeys = set() # I need to get the full list of PKs to know if the object exists # As this is expensive, I will do it only if it is needed, i.e. when no_holes is True @@ -1206,23 +1360,52 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b Obj.id ).limit(yield_per_size).with_entities(Obj.id, Obj.hashkey).all() + if not results_chunk: + # No more packed objects + break + for _, hashkey in results_chunk: known_packed_hashkeys.add(hashkey) - if results_chunk: - last_pk = results_chunk[-1][0] - else: - # No more packed objects - break + last_pk = results_chunk[-1][0] + if callback: + since_last_update += len(results_chunk) + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if callback and total: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + + if callback: + total = len(working_stream_list) + # If we have a callback, compute the total count of objects in this pack + callback(action='init', value={'total': total, 'description': 'Bulk storing'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 400), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 # Outer loop: this is used to continue when a new pack file needs to be created while working_stream_list: # Store the last pack integer ID, needed to know later if I need to open a new pack last_pack_int_id = pack_int_id + # Avoid concurrent writes on the pack file with self.lock_pack(str(pack_int_id)) as pack_handle: # Inner loop: continue until when there is a file, or # if we need to change pack (in this case `break` is called) + + # We will store here the dictionaries with the data to be pushed in the DB + # By collecting the data and committing as a bulk operation at the end, + # we highly improve performance + obj_dicts = [] + while working_stream_list: # Check in which pack I need to write to the next object pack_int_id = self._get_pack_id_to_write_to() @@ -1243,6 +1426,12 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b else: stream_context_manager = nullcontext(next_stream) + if callback: + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + # Get the position before writing the object - I need it if `no_holes` is True and the object # is already there position_before = pack_handle.tell() @@ -1294,8 +1483,12 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # Either no_holes is False: then I don't know if the object exists, so I just try to insert # it and in case do nothing; the space on disk might remain allocated (but unreferenced). # Or `no_holes` is True and I don't have the object: this will insert the entry - insert_command = Obj.__table__.insert().prefix_with('OR IGNORE').values(obj_dict) - session.execute(insert_command) + obj_dicts.append(obj_dict) + + # In the future, if there are memory issues with millions of objects, + # We can flush here to DB if there are too many objects in the cache. + # Also, there are other optimisations that can be done, like deleting + # the pack_metadata when not needed anymore etc. # I also add the hash key in the known_packed_hashkeys (if no_holes, when this is defined) if no_holes: @@ -1328,19 +1521,42 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # I have written some bytes and then I have seeked back. I truncate then the file at the current # position. pack_handle.truncate() - # flush and sync to disk before closing - safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) + + # It's now time to write to the DB, in a single bulk operation (per pack) + if obj_dicts: + session.execute(Obj.__table__.insert().prefix_with('OR IGNORE'), obj_dicts) + # Clean up the list - this will be cleaned up also later, + # but it's better to make sure that we do it here, to avoid trying to rewrite + # the same objects again + obj_dicts = [] + # I don't commit here; I commit after making sure the file is flushed and closed + + if do_fsync: + safe_flush_to_disk(pack_handle, os.path.realpath(pack_handle.name), use_fullsync=True) # OK, if we are here, file was flushed, synced to disk and closed. # Let's commit then the information to the DB, so it's officially a # packed object. Note: committing as soon as we are done with one pack, # so if there's a problem with one pack we don't start operating on the next one # Note: because of the logic above, in theory this should not raise an IntegrityError! - session.commit() + # For efficiency, you might want to set do_commit = False in the call, and then + # call a `session.commit()` in the caller, as it is done for instance in `import_files()`. + if do_commit: + session.commit() + + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) return hashkeys - def add_objects_to_pack(self, content_list, compress=False, no_holes=False, no_holes_read_twice=True): + def add_objects_to_pack( # pylint: disable=too-many-arguments + self, content_list, compress=False, no_holes=False, no_holes_read_twice=True, + callback=None, do_fsync=True, do_commit=True + ): """Add objects directly to a pack, reading from a list of content byte arrays. This is a maintenance operation, available mostly for efficiency reasons @@ -1358,22 +1574,53 @@ def add_objects_to_pack(self, content_list, compress=False, no_holes=False, no_h to write on disk and then overwrite with another object). See comments in the docstring of ``add_streamed_objects_to_pack``. This variable is ignored if `no_holes` is False. + :param callback: a callback to monitor the progress, see docstring of `_validate_hashkeys_pack()` + :param do_fsync: if True (default), call an fsync for every pack file, to ensure flushing to + disk. See docstring of `add_streamed_objects_to_pack()` for further comments on the use of this flag. + :param do_commit: if True (default), commit data to the DB after every pack is written. + See docstring of `add_streamed_objects_to_pack()` for further comments on the use of this flag. + :return: a list of object hash keys """ stream_list = [io.BytesIO(content) for content in content_list] return self.add_streamed_objects_to_pack( - stream_list=stream_list, compress=compress, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice + stream_list=stream_list, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=callback, + do_fsync=do_fsync, + do_commit=do_commit ) - def clean_storage(self): # pylint: disable=too-many-branches + def _vacuum(self): + """Perform a `VACUUM` operation on the SQLite operation. + + This is critical for two aspects: + + 1. reclaiming unused space after many deletions + 2. reordering data on disk to make data access *much* more efficient + + (See also description in issue #94). + """ + engine = self._get_cached_session().get_bind() + engine.execute('VACUUM') + + def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches """Perform some maintenance clean-up of the container. .. note:: this is a maintenance operation, must be performed when nobody is using the container! In particular: + - if `vacuum` is True, it first VACUUMs the DB, reclaiming unused space and + making access much faster - it removes duplicates if any, with some validation - it cleans up loose objects that are already in packs """ + # I start by VACUUMing the DB - this is something useful to do + if vacuum: + self._vacuum() + all_duplicates = os.listdir(self._get_duplicates_folder()) duplicates_mapping = defaultdict(list) @@ -1441,10 +1688,19 @@ def clean_storage(self): # pylint: disable=too-many-branches session = self._get_cached_session() # I search now for all loose hash keys that exist also in the packs existing_packed_hashkeys = [] - for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): - # I check the hash keys that are already in the pack - for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): - existing_packed_hashkeys.append(res[0]) + if len(loose_objects) <= self._MAX_CHUNK_ITERATE_LENGTH: + for chunk in chunk_iterator(loose_objects, size=self._IN_SQL_MAX_LENGTH): + # I check the hash keys that are already in the pack + for res in session.query(Obj).filter(Obj.hashkey.in_(chunk)).with_entities(Obj.hashkey).all(): + existing_packed_hashkeys.append(res[0]) + else: + sorted_hashkeys = sorted(loose_objects) + pack_iterator = session.execute('SELECT hashkey FROM db_object ORDER BY hashkey') + + # The query returns a tuple of length 1, so I still need a left_key + for res, where in detect_where_sorted(pack_iterator, sorted_hashkeys, left_key=lambda x: x[0]): + if where == Location.BOTH: + existing_packed_hashkeys.append(res[0]) # I now clean up loose objects that are already in the packs. # Here, we assume that if it's already packed, it's safe to assume it's uncorrupted. @@ -1457,19 +1713,31 @@ def clean_storage(self): # pylint: disable=too-many-branches # I just ignore, I will remove it in a future call of this method. pass - def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600): - """Export the specified hashkeys to a new container (must be already initialised). + def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches,too-many-arguments + self, + hashkeys, + source_container, + compress=False, + target_memory_bytes=104857600, + callback=None, + do_fsync=True + ): + """Imports the objects with the specified hashkeys into the container. :param hashkeys: an iterable of hash keys. - :param new_container: another Container class into which you want to export the specified hash keys of this - container. + :param source_container: another Container class containing the objects with the given hash keys. :param compress: specifies if content should be stored in compressed form. - :param target_memory_bytes: how much data to store in RAM before dumping to the new container. Larger values - allow to read and write in bulk that is more efficient, but of course require more memory. + :param target_memory_bytes: how much data to store in RAM before actually storing in the container. + Larger values allow to read and write in bulk that is more efficient, but of course require more memory. Note that actual memory usage will be larger (SQLite DB, storage of the hashkeys are not included - this only counts the RAM needed for the object content). Default: 100MB. + :param callback: a callback to monitor the importing process. See docstring of `_validate_hashkeys_pack()`. + :param do_fsync: whether to do a fsync on every pack object when it's written. True by default; set it + to False for efficiency if this guarantee is not needed, e.g. if you are creating a new + Container from scratch as a part of a larger import/export operation. - :return: a mapping from the old hash keys (in this container) to the new hash keys (in `other_container`). + :return: a mapping from the old hash keys (in the ``source_container``) to the new hash keys + (in this container). """ old_obj_hashkeys = [] new_obj_hashkeys = [] @@ -1478,7 +1746,80 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # We then flush in 'bulk' to the `other_container`, thus speeding up the process content_cache = {} cache_size = 0 - with self.get_objects_stream_and_meta(hashkeys) as triplets: + + if source_container.hash_type == self.hash_type: + # In this case, I can use some optimisation, because I can just work on the intersection + # of the hash keys, since I can know in advnace which objects are already present. + sorted_hashkeys = sorted(set(hashkeys)) + + if callback: + # If we have a callback, compute the total count of objects in this pack + total = len(sorted_hashkeys) + callback(action='init', value={'total': total, 'description': 'Listing objects'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 1000), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + + sorted_loose = sorted(self._list_loose()) + # This is a very efficient way to get a sorted iterator without preloading everything in memory + # NOTE: this might be slow in the combination of these two cases: + # 1. the pack index (SQLite DB) of this repository is not VACUUMed + # AND + # 2. the pack index (SQLite DB) is not in the OS disk cache + # In this case, also the index on the hash key is scattered on disk and reading will be very slow, + # see issue #94. + # NOTE: I need to wrap in the `yield_first_element` iterator since it returns a list of lists + sorted_packed = yield_first_element( + self._get_cached_session().execute('SELECT hashkey FROM db_object ORDER BY hashkey') + ) + sorted_existing = merge_sorted(sorted_loose, sorted_packed) + + # Hashkeys will be replaced with only those that are not yet in this repository (i.e., LEFTONLY) + hashkeys = [] + for item, where in detect_where_sorted(sorted_hashkeys, sorted_existing): + if callback and where in [Location.BOTH, Location.LEFTONLY]: + # It is in the sorted hash keys. Since this is the one for which I know the length efficiently, + # I use it for the progress bar. This will be relatively accurate for large lists of hash keys, + # but will not show a continuous bar if the list of hash keys to import is much shorter than + # the list of hash keys in this (destination) container. This is probably OK, though. + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if where == Location.LEFTONLY: + hashkeys.append(item) + + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + + # I just insert the new objects without first checking that I am not leaving holes in the pack files, + # as I already checked here. + no_holes = False + no_holes_read_twice = False + else: + # hash types are different: I have to add all objects that were provided as I have no way to check + # if they already exist + no_holes = True + no_holes_read_twice = True + + if callback: + # If we have a callback, compute the total count of objects in this pack + total = len(hashkeys) + callback(action='init', value={'total': total, 'description': 'Copy objects'}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + update_every = max(int(total / 1000), 1) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + since_last_update = 0 + + with source_container.get_objects_stream_and_meta(hashkeys) as triplets: for old_obj_hashkey, stream, meta in triplets: if meta['size'] > target_memory_bytes: # If the object itself is too big, just write it directly @@ -1490,10 +1831,14 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # but I avoid to write a huge object to disk when it's not needed because already available # on the destination new_obj_hashkeys.append( - other_container.add_streamed_objects_to_pack([stream], - compress=compress, - no_holes=True, - no_holes_read_twice=True)[0] + self.add_streamed_object_to_pack( + stream, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + do_fsync=do_fsync, + do_commit=False # I will do a final commit + ) ) elif cache_size + meta['size'] > target_memory_bytes: # I were to read the content, I would be filling too much memory - I flush the cache first, @@ -1506,9 +1851,15 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk - # I accept the performance hit of reading twice (especially since it's already on memory) - temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True + # I accept the performance hit of reading twice if the hash type is different + # (especially since it's already on memory) + temp_new_hashkeys = self.add_objects_to_pack( + data, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + do_fsync=do_fsync, + do_commit=False ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1532,6 +1883,19 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # I update the cache size cache_size += meta['size'] + if callback: + since_last_update += 1 + if since_last_update >= update_every: + callback(action='update', value=since_last_update) + since_last_update = 0 + + if callback: + # Final call to complete the bar + if since_last_update: + callback(action='update', value=since_last_update) + # Perform any wrap-up, if needed + callback(action='close', value=None) + # The for loop is finished. I can also go out of the `with` context manager because whatever is in the # cache is in memory. Most probably I still have content in the cache, just flush it, # with the same logic as above. @@ -1541,8 +1905,16 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # I create a list of hash keys and the corresponding content temp_old_hashkeys, data = zip(*content_cache.items()) # I put all of them in bulk - temp_new_hashkeys = other_container.add_objects_to_pack( - data, compress=compress, no_holes=True, no_holes_read_twice=True + + temp_new_hashkeys = self.add_objects_to_pack( + data, + compress=compress, + no_holes=no_holes, + no_holes_read_twice=no_holes_read_twice, + callback=rename_callback(callback, new_description='Final flush'), + do_fsync=do_fsync, + # I will commit at the end + do_commit=False ) # I update the list of known old (this container) and new (other_container) hash keys @@ -1552,8 +1924,38 @@ def export(self, hashkeys, other_container, compress=False, target_memory_bytes= # Create a mapping from the old to the new hash keys: old_new_obj_hashkey_mapping[old_hashkey] = new_hashkey old_new_obj_hashkey_mapping = dict(zip(old_obj_hashkeys, new_obj_hashkeys)) + # Since I called the `add_objects_to_pack` without committing (gives a boost for performance), + # I need now to commit to save what I've been doing. + self._get_cached_session().commit() + return old_new_obj_hashkey_mapping + def export(self, hashkeys, other_container, compress=False, target_memory_bytes=104857600, callback=None): + """Export the specified hashkeys to a new container (must be already initialised). + + ..deprecated:: 0.6 + Deprecated: use the ``import_objects`` method of ``other_container`` instead. + + :param hashkeys: an iterable of hash keys. + :param other_container: another Container class into which you want to export the specified hash keys of this + container. + :param compress: specifies if content should be stored in compressed form. + :param target_memory_bytes: how much data to store in RAM before dumping to the new container. Larger values + allow to read and write in bulk that is more efficient, but of course require more memory. + Note that actual memory usage will be larger (SQLite DB, storage of the hashkeys are not included - this + only counts the RAM needed for the object content). Default: 100MB. + + :return: a mapping from the old hash keys (in this container) to the new hash keys (in `other_container`). + """ + warnings.warn('function is deprecated, use `import_objects` instead', DeprecationWarning) + return other_container.import_objects( + hashkeys=hashkeys, + source_container=self, + compress=compress, + target_memory_bytes=target_memory_bytes, + callback=callback + ) + # Let us also compute the hash def _validate_hashkeys_pack(self, pack_id, callback=None): # pylint: disable=too-many-locals """Validate all hashkeys and returns a dictionary of problematic entries. @@ -1778,3 +2180,131 @@ def delete_objects(self, hashkeys): # an error while deleting the packed version of an object (even if the loose version of the same object # was deleted) should be considered as if the object has *not* been deleted return list(deleted_loose.union(deleted_packed)) + + def repack(self, compress_mode=CompressMode.KEEP): + """Perform a repack of all packed objects. + + At the end, it also VACUUMs the DB to reclaim unused space and make + access more efficient. + + This is a maintenance operation. + + :param compress_mode: see docstring of ``repack_pack``. + """ + for pack_id in self._list_packs(): + self.repack_pack(pack_id, compress_mode=compress_mode) + self._vacuum() + + def repack_pack(self, pack_id, compress_mode=CompressMode.KEEP): + """Perform a repack of a given pack object. + + This is a maintenance operation. + + :param compress_mode: must be a valid CompressMode enum type. + Currently, the only implemented mode is KEEP, meaning that it + preserves the same compression (this means that repacking is *much* faster + as it can simply transfer the bytes without decompressing everything first, + and recompressing it back again). + """ + if compress_mode != CompressMode.KEEP: + raise NotImplementedError('Only keep method currently implemented') + + assert pack_id != self._REPACK_PACK_ID, ( + "The specified pack_id '{}' is invalid, it is the one used for repacking".format(pack_id) + ) + + # Check that it does not exist + assert not os.path.exists( + self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True) + ), ("The repack pack '{}' already exists, probably a previous repacking aborted?".format(self._REPACK_PACK_ID)) + + session = self._get_cached_session() + one_object_in_pack = session.query(Obj.id).filter(Obj.pack_id == pack_id).limit(1).all() + if not one_object_in_pack: + # No objects. Clean up the pack file, if it exists. + if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): + os.remove(self._get_pack_path_from_pack_id(pack_id)) + return + + obj_dicts = [] + # At least one object. Let's repack. We have checked before that the + # REPACK_PACK_ID did not exist. + with self.lock_pack(str(self._REPACK_PACK_ID), allow_repack_pack=True) as write_pack_handle: + with open(self._get_pack_path_from_pack_id(pack_id), 'rb') as read_pack: + query = session.query(Obj.id, Obj.hashkey, Obj.size, Obj.offset, Obj.length, + Obj.compressed).filter(Obj.pack_id == pack_id).order_by(Obj.offset) + for rowid, hashkey, size, offset, length, compressed in query: + # Since I am assuming above that the method is `KEEP`, I will just transfer + # the bytes. Otherwise I have to properly take into account compression in the + # source and in the destination. + read_handle = PackedObjectReader(read_pack, offset, length) + + obj_dict = {} + obj_dict['id'] = rowid + obj_dict['hashkey'] = hashkey + obj_dict['pack_id'] = self._REPACK_PACK_ID + obj_dict['compressed'] = compressed + obj_dict['size'] = size + obj_dict['offset'] = write_pack_handle.tell() + + # Transfer data in chunks. + # No need to rehash - it's the same container so the same hash. + # Not checking the compression on source or destination - we are assuming + # for now that the mode is KEEP. + while True: + chunk = read_handle.read(self._CHUNKSIZE) + if chunk == b'': + # Returns an empty bytes object on EOF. + break + write_pack_handle.write(chunk) + obj_dict['length'] = write_pack_handle.tell() - obj_dict['offset'] + + # Appending for later bulk commit + # I will assume that all objects of a single pack fit in memory + obj_dicts.append(obj_dict) + safe_flush_to_disk( + write_pack_handle, self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True) + ) + + # We are done with data transfer. + # At this stage we just have a new pack -1 (_REPACK_PACK_ID) but it is never referenced. + # Let us store the information in the DB. + # We had already checked earlier that this at least one exists. + session.bulk_update_mappings(Obj, obj_dicts) + # I also commit. + session.commit() + # Clean up the cache + obj_dicts = [] + + # Now we can safely delete the old object. I just check that there is no object still + # refencing the old pack, to be sure. + one_object_in_pack = session.query(Obj.id).filter(Obj.pack_id == pack_id).limit(1).all() + assert not one_object_in_pack, ( + "I moved the objects of pack '{pack_id}' to pack '{repack_id}' " + "but there are still references to pack '{pack_id}'!".format( + pack_id=pack_id, repack_id=self._REPACK_PACK_ID + ) + ) + os.remove(self._get_pack_path_from_pack_id(pack_id)) + + # I need now to move the file back. I need to be careful, to avoid conditions in which + # I remain with inconsistent data. + # Since hard links seem to be supported on all three platforms, I do a hard link + # of -1 back to the correct pack ID. + os.link( + self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True), + self._get_pack_path_from_pack_id(pack_id) + ) + + # Before deleting the source (pack -1) I need now to update again all + # entries to point to the correct pack id + session.query(Obj).filter(Obj.pack_id == self._REPACK_PACK_ID).update({Obj.pack_id: pack_id}) + session.commit() + + # Technically, to be crash safe, before deleting I should also fsync the folder + # I am not doing this for now + # I now can unlink/delete the original source + os.unlink(self._get_pack_path_from_pack_id(self._REPACK_PACK_ID, allow_repack_pack=True)) + + # We are now done. The temporary pack is gone, and the old `pack_id` + # has now been replaced with an udpated, repacked pack. diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 428ab81..f9d1846 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -3,12 +3,15 @@ Some might be useful also for end users, like the wrappers to get streams, like the ``LazyOpener``. """ +# pylint: disable= too-many-lines import hashlib import itertools import os import uuid import zlib +from enum import Enum + try: import fcntl except ImportError: @@ -25,6 +28,13 @@ _MACOS_ALWAYS_USE_FULLSYNC = False +class Location(Enum): + """Enum that describes if an element is only on the left or right iterator, or on both.""" + LEFTONLY = -1 + BOTH = 0 + RIGHTONLY = 1 + + class LazyOpener: """A class to return a stream to a given file, that however is opened lazily. @@ -413,6 +423,129 @@ def read(self, size=-1): return stream +class CallbackStreamWrapper: + """A class to just wrap a read stream, but perform a callback every few bytes. + + Should be used only for streams open in read mode. + """ + + @property + def mode(self): + return self._stream.mode + + @property + def seekable(self): + """Return whether object supports random access.""" + return self._stream.seekable + + def seek(self, target, whence=0): + """Change stream position.""" + if target > self.tell(): + if self._callback: + self._since_last_update += target - self.tell() + if self._since_last_update >= self._update_every: + self._callback(action='update', value=self._since_last_update) + self._since_last_update = 0 + else: + self.close_callback() + if self._callback: + # If we have a callback, compute the total count of objects in this pack + self._callback( + action='init', + value={ + 'total': self._total_length, + 'description': '{} [rewind]'.format(self._description) + } + ) + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + self._since_last_update = target + self._callback(action='update', value=self._since_last_update) + + return self._stream.seek(target, whence) + + def tell(self): + """Return current stream position.""" + return self._stream.tell() + + def __init__(self, stream, callback, total_length=0, description='Streamed object'): + """ + Initialises the reader to a given stream. + + :param stream: an open stream + :param callback: a callback to call to update the status (or None if not needed) + :param total_length: the expected length + """ + self._stream = stream + self._callback = callback + self._total_length = total_length + self._description = description + + if self._callback: + # If we have a callback, compute the total count of objects in this pack + self._callback(action='init', value={'total': total_length, 'description': description}) + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + self._update_every = max(int(total_length / 400), 1) if total_length else 1 + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + self._since_last_update = 0 + + def read(self, size=-1): + """ + Read and return up to n bytes. + + If the argument is omitted, None, or negative, reads and + returns all data until EOF (that corresponds to the length specified + in the __init__ method). + + Returns an empty bytes object on EOF. + """ + data = self._stream.read(size) + + if self._callback: + self._since_last_update += len(data) + if self._since_last_update >= self._update_every: + self._callback(action='update', value=self._since_last_update) + self._since_last_update = 0 + + return data + + def close_callback(self): + """ + Call the wrap up closing calls for the callback. + + .. note:: it DOES NOT close the stream. + """ + if self._callback: + # Final call to complete the bar + if self._since_last_update: + self._callback(action='update', value=self._since_last_update) + # Perform any wrap-up, if needed + self._callback(action='close', value=None) + + +def rename_callback(callback, new_description): + """Given a callback, return a new one where the description will be changed to `new_name`. + + Works even if `callback` is None (in this case, it returns None). + :param callback: a callback function. + :param new_description: a string with a modified description for the callback. + This will be replaced during the `init` call to the callback. + """ + if callback is None: + return None + + def wrapper_callback(action, value): + """A wrapper callback with changed description.""" + if action == 'init': + new_value = value.copy() + new_value['description'] = new_description + return callback(action, new_value) + return callback(action, value) + + return wrapper_callback + + class StreamDecompresser: """A class that gets a stream of compressed zlib bytes, and returns the corresponding uncompressed bytes when being read via the .read() method. @@ -590,9 +723,7 @@ def is_known_hash(hash_type): def get_hash(hash_type): """Return a hash class with an update method and a hexdigest method.""" - known_hashes = { - 'sha256': hashlib.sha256, - } + known_hashes = {'sha1': hashlib.sha1, 'sha256': hashlib.sha256} try: return known_hashes[hash_type] @@ -786,3 +917,145 @@ def compute_hash_and_size(stream, hash_type): size += len(next_chunk) return hasher.hexdigest(), size + + +def detect_where_sorted(left_iterator, right_iterator, left_key=None): # pylint: disable=too-many-branches, too-many-statements + """Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if + it's only on the left, only on the right, or in both. + + .. note:: IMPORTANT! The two iterators MUST return unique and sorted results. + + .. note:: if it's on both, the one on the left is returned. + + This function will check and raise a ValueError if it detects non-unique or non-sorted elements. + HOWEVER, this exception is raised only at the first occurrence of the issue, that can be very late in the execution, + so if you process results in a streamed way, please ensure that you pass sorted iterators. + + :param left_iterator: a left iterator + :param right_iterator: a right iterator + :param left_key: if specified, it's a lambda that determines how to process each element + of the left iterator when comparing with the right iterator. For instance, the left + iterator might be a tuple, whose first element is a hash key, while the right iterator + just a list of hash keys. In this case, left_key could be defined as a lambda returning + the first element of the tuple. + Note that when the element is in both iterators, the left one is returned (i.e. the + full tuple, in this example). + """ + left_exhausted = False + right_exhausted = False + + if left_key is None: + left_key = lambda x: x + + # Convert first in iterators (in case they are, e.g., lists) + left_iterator = iter(left_iterator) + right_iterator = iter(right_iterator) + + try: + last_left = next(left_iterator) + except StopIteration: + left_exhausted = True + + try: + last_right = next(right_iterator) + except StopIteration: + right_exhausted = True + + if left_exhausted and right_exhausted: + # Nothing to be done, both iterators are empty + return + + now_left = True + if left_exhausted or (not right_exhausted and left_key(last_left) > last_right): + now_left = False # I want the 'current' (now) to be behind or at the same position of the other at any time + + while not (left_exhausted and right_exhausted): + advance_both = False + if now_left: + if right_exhausted: + yield last_left, Location.LEFTONLY + else: + if left_key(last_left) == last_right: + # They are equal: add to intersection and continue + yield last_left, Location.BOTH + # I need to consume and advance on both iterators at the next iteration + advance_both = True + elif left_key(last_left) < last_right: + # the new entry (last_left) is still smaller: it's on the left only + yield last_left, Location.LEFTONLY + else: + # the new entry (last_left) is now larger: then, last_right is only on the right + # and I switch to now_right + yield last_right, Location.RIGHTONLY + now_left = False + else: + if left_exhausted: + yield last_right, Location.RIGHTONLY + else: + if left_key(last_left) == last_right: + # They are equal: add to intersection and continue + yield last_left, Location.BOTH + # I need to consume and advance on both iterators at the next iteration + advance_both = True + elif left_key(last_left) > last_right: + # the new entry (last_right) is still smaller: it's on the right only + yield last_right, Location.RIGHTONLY + else: + # the new entry (last_right) is now larger: then, last_left is only on the left + # and I switch to now_left + yield last_left, Location.LEFTONLY + now_left = True + + # When we are here: if now_left, then last_left has been inserted in one of the lists; + # if not now_left, then last_right has been insterted in one of the lists. + # If advance both, they both can be discarded. So if I exhausted an iterator, I am not losing + # any entry. + + # I will need to cache the old value, see comments below in the `except StopIteration` block + new_now_left = now_left + if now_left or advance_both: + try: + new = next(left_iterator) + if left_key(new) <= left_key(last_left): + raise ValueError( + "The left iterator does not return sorted unique entries, I got '{}' after '{}'".format( + left_key(new), left_key(last_left) + ) + ) + last_left = new + except StopIteration: + left_exhausted = True + # I need to store in a different variable, otherwise in this case + # I would also enter the next iteration even if advance_both is False! + new_now_left = False + + if not now_left or advance_both: + try: + new = next(right_iterator) + if new <= last_right: + raise ValueError( + "The right iterator does not return sorted unique entries, I got '{}' after '{}'".format( + new, last_right + ) + ) + last_right = new + except StopIteration: + right_exhausted = True + # For consistency, also here I set new_now_left + new_now_left = True + + # Set the new now_left value + now_left = new_now_left + + +def yield_first_element(iterator): + """Given an iterator that returns a tuple, return an iterator that yields only the first element of the tuple.""" + for elem in iterator: + yield elem[0] + + +def merge_sorted(iterator1, iterator2): + """Given two sorted iterators, return another sorted iterator being the union of the two.""" + for item, _ in detect_where_sorted(iterator1, iterator2): + # Whereever it is (only left, only right, on both) I return the object. + yield item diff --git a/tests/conftest.py b/tests/conftest.py index f96bb6e..a031314 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,39 @@ def pytest_generate_tests(metafunc): metafunc.parametrize('concurrency_repetition_index', range(metafunc.config.option.concurrency_repetitions)) +@pytest.fixture(scope='function') +def callback_instance(): + """Return the CallbackClass for the tests.""" + + class CallbackClass: + """Class that manages the callback and checks that it is correctly called.""" + + def __init__(self): + """Initialise the class.""" + self.current_action = None + self.performed_actions = [] + + def callback(self, action, value): + """Check how the callback is called.""" + + if action == 'init': + assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format( + action, self.current_action + ) + self.current_action = {'start_value': value, 'value': 0} + elif action == 'update': + # Track the current position + self.current_action['value'] += value + elif action == 'close': + # Add to list of performed actions + self.performed_actions.append(self.current_action) + self.current_action = None + else: + raise AssertionError("Unknown action '{}'".format(action)) + + yield CallbackClass() + + @pytest.fixture(scope='function') def temp_container(temp_dir): # pylint: disable=redefined-outer-name """Return an object-store container in a given temporary directory. diff --git a/tests/test_container.py b/tests/test_container.py index b6fb07c..e0f52a3 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -14,7 +14,7 @@ import psutil import pytest -from disk_objectstore import Container, ObjectType +from disk_objectstore import Container, ObjectType, CompressMode from disk_objectstore import utils, models import disk_objectstore.exceptions as exc @@ -628,7 +628,7 @@ def test_initialisation(temp_dir): assert 'already some file or folder' in str(excinfo.value) -@pytest.mark.parametrize('hash_type', ['sha256']) +@pytest.mark.parametrize('hash_type', ['sha256', 'sha1']) @pytest.mark.parametrize('compress', [True, False]) def test_check_hash_computation(temp_container, hash_type, compress): """Check that the hashes are correctly computed, when storing loose, @@ -636,6 +636,8 @@ def test_check_hash_computation(temp_container, hash_type, compress): Check both compressed and uncompressed packed objects. """ + # Re-init the container with the correct hash type + temp_container.init_container(hash_type=hash_type, clear=True) content1 = b'1' content2 = b'222' content3 = b'n2fwd' @@ -1504,8 +1506,9 @@ def test_simulate_concurrent_packing(temp_container, compress): # pylint: disab assert not os.path.exists(fname) +@pytest.mark.parametrize('do_vacuum', [True, False]) @pytest.mark.parametrize('compress', [True, False]) -def test_simulate_concurrent_packing_multiple(temp_container, compress): # pylint: disable=invalid-name +def test_simulate_concurrent_packing_multiple(temp_container, compress, do_vacuum): # pylint: disable=invalid-name """Simulate race conditions while reading and packing.""" content1 = b'abc' content2 = b'def' @@ -1527,13 +1530,13 @@ def test_simulate_concurrent_packing_multiple(temp_container, compress): # pyli assert stream.read(1) == b'a' temp_container.pack_all_loose(compress=compress) # Remove loose files - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert stream.read() == b'bc' elif obj_hashkey == hashkey2: assert stream.read(1) == b'd' temp_container.pack_all_loose(compress=compress) # Remove loose files - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert stream.read() == b'ef' else: # Should not happen! @@ -1555,10 +1558,45 @@ def test_simulate_concurrent_packing_multiple(temp_container, compress): # pyli assert data == temp_container.get_objects_content([hashkey1, hashkey2]) # After a second cleaning of the storage, the loose file *must* have been removed - temp_container.clean_storage() + temp_container.clean_storage(vacuum=do_vacuum) assert not os.path.exists(fname) +def test_simulate_concurrent_packing_multiple_many(temp_container): # pylint: disable=invalid-name + """Simulate race conditions while reading and packing, with more than objects _MAX_CHUNK_ITERATE_LENGTH changing.""" + expected = {} + + # I put at least one object already packed + preliminary_content = b'AAA' + preliminary_hashkey = temp_container.add_object(preliminary_content) + temp_container.pack_all_loose() + temp_container.clean_storage() + expected[preliminary_hashkey] = preliminary_content + + for idx in range(temp_container._MAX_CHUNK_ITERATE_LENGTH + 10): # pylint: disable=protected-access + content = '{}'.format(idx).encode('ascii') + expected[temp_container.add_object(content)] = content + + retrieved = {} + first = True + with temp_container.get_objects_stream_and_meta(expected.keys()) as triplets: + for obj_hashkey, stream, meta in triplets: + retrieved[obj_hashkey] = stream.read() + if first: + # I should have found only one packed object (preliminary_content). + assert obj_hashkey == preliminary_hashkey + assert meta['type'] == ObjectType.PACKED + # I will not look for the loose until I exhaust the packed objects. + # In the meantime, therefore, I pack all the rest, and I clean the storage: + # this will trigger the fallback logic to check again if there are + # objects that have been packed in the meantime. + temp_container.pack_all_loose() + temp_container.clean_storage() + first = False + + assert expected == retrieved + + @pytest.mark.parametrize('compress', [True, False]) def test_simulate_concurrent_packing_multiple_meta_only(temp_container, compress): # pylint: disable=invalid-name """Simulate race conditions while reading and packing (as earlier function, but no streams).""" @@ -1841,8 +1879,10 @@ def test_list_all_objects_extraneous(temp_dir, loose_prefix_len): # pylint: dis @pytest.mark.parametrize('target_memory_bytes', [1, 9, 100 * 1024 * 1024]) @pytest.mark.parametrize('compress_dest', [True, False]) @pytest.mark.parametrize('compress_source', [True, False]) -def test_export_to_pack(temp_container, compress_source, compress_dest, target_memory_bytes): - """Test the functionality to export to a new container.""" +# Test both the same hash and another one +@pytest.mark.parametrize('other_container_hash_type', ['sha256', 'sha1']) +def test_import_to_pack(temp_container, compress_source, compress_dest, target_memory_bytes, other_container_hash_type): + """Test the functionality to import from another container.""" obj1 = b'111111' obj2 = b'222222' obj3 = b'333332' @@ -1850,7 +1890,7 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m with tempfile.TemporaryDirectory() as tmpdir: other_container = Container(tmpdir) # Use the same hash type - other_container.init_container(clear=True, hash_type=temp_container.hash_type) + other_container.init_container(clear=True, hash_type=other_container_hash_type) hashkey1 = temp_container.add_object(obj1) hashkey2, hashkey3 = temp_container.add_objects_to_pack([obj2, obj3], compress=compress_source) @@ -1862,10 +1902,10 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m assert other_container.count_objects()['packed'] == 0 # Put only two objects - old_new_mapping = temp_container.export([hashkey1, hashkey2], - other_container, - compress=compress_dest, - target_memory_bytes=target_memory_bytes) + old_new_mapping = other_container.import_objects([hashkey1, hashkey2], + temp_container, + compress=compress_dest, + target_memory_bytes=target_memory_bytes) # Two objects should appear assert other_container.count_objects()['loose'] == 0 assert other_container.count_objects()['packed'] == 2 @@ -1877,10 +1917,10 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m # Add two more, one of which is already in the destination old_new_mapping.update( - temp_container.export([hashkey2, hashkey3], - other_container, - compress=compress_dest, - target_memory_bytes=target_memory_bytes) + other_container.import_objects([hashkey2, hashkey3], + temp_container, + compress=compress_dest, + target_memory_bytes=target_memory_bytes) ) # All three objects should be there, no duplicates assert other_container.count_objects()['loose'] == 0 @@ -1893,13 +1933,35 @@ def test_export_to_pack(temp_container, compress_source, compress_dest, target_m assert other_container.get_object_content(old_new_mapping[hashkey3]) == obj3 old_hashkeys, new_hashkeys = zip(*old_new_mapping.items()) - # Since we are using the same hash algorithm, the hashes should be the same! - assert old_hashkeys == new_hashkeys + if other_container_hash_type == temp_container.hash_type: + # Since we are using the same hash algorithm, the hashes ashould be the same! + assert old_hashkeys == new_hashkeys # close before exiting the context manager, so files are closed. other_container.close() +def test_export_deprecated(temp_container): + """Test that the export_function exists but is deprecated.""" + obj1 = b'111111' + + with tempfile.TemporaryDirectory() as tmpdir: + other_container = Container(tmpdir) + # Use the same hash type + other_container.init_container(clear=True, hash_type=temp_container.hash_type) + + hashkey1 = temp_container.add_object(obj1) + + # Put only two objects + with pytest.warns(DeprecationWarning): + temp_container.export([hashkey1], other_container) + + assert other_container.get_object_content(hashkey1) == obj1 + + # Close before going out, or the test will fail on Windows not being able to delete the folder + other_container.close() + + @pytest.mark.parametrize('compress', [True, False]) def test_validate(temp_container, compress): """Test the validation function.""" @@ -2045,37 +2107,10 @@ def test_validate_corrupt_packed_size(temp_container): # pylint: disable=invali assert not any(errors.values()) -def test_validate_callback(temp_container): +def test_validate_callback(temp_container, callback_instance): """Test the correctness of the callbacks. Stores the calls to check at the end that everything was called correctly.""" - - class CallbackClass: - """Class that manages the callback.""" - - def __init__(self): - """Initialise the class.""" - self.current_action = None - self.performed_actions = [] - - def callback(self, action, value): - """Check how the callback is called.""" - - if action == 'init': - assert self.current_action is None, "Starting a new action '{}' without closing the old one {}".format( - action, self.current_action - ) - self.current_action = {'start_value': value, 'value': 0} - elif action == 'update': - # Track the current position - self.current_action['value'] += value - elif action == 'close': - # Add to list of performed actions - self.performed_actions.append(self.current_action) - self.current_action = None - else: - raise AssertionError("Unknown action '{}'".format(action)) - # Add packed objects (2001, 10 chars each), *not* a multiple of 400 (that is the internal value # of how many events should be triggered as a maximum) len_packed = 2001 @@ -2088,7 +2123,6 @@ def callback(self, action, value): for content in data: temp_container.add_object(content) - callback_instance = CallbackClass() temp_container.validate(callback=callback_instance.callback) assert callback_instance.current_action is None, ( @@ -2118,6 +2152,126 @@ def callback(self, action, value): } +@pytest.mark.parametrize('use_size_hint', [True, False]) +def test_add_streamed_object_to_pack_callback( # pylint: disable=invalid-name + temp_container, use_size_hint, callback_instance + ): + """Test the correctness of the callback of add_streamed_object_to_pack.""" + length = 1000000 + content = b'0' * length + stream = io.BytesIO(content) + + if use_size_hint: + hashkey = temp_container.add_streamed_object_to_pack( + stream, callback_size_hint=length, callback=callback_instance.callback + ) + else: + hashkey = temp_container.add_streamed_object_to_pack(stream, callback=callback_instance.callback) + + assert temp_container.get_object_content(hashkey) == content + + assert callback_instance.current_action is None, ( + "The 'validate' call did not perform a final callback with a 'close' event" + ) + + assert callback_instance.performed_actions == [{ + 'start_value': { + 'total': length if use_size_hint else 0, + 'description': 'Streamed object' + }, + 'value': length + }] + + +@pytest.mark.parametrize('no_holes,no_holes_read_twice', [[True, True], [True, False], [False, False]]) +def test_add_streamed_objects_to_pack_callback( # pylint: disable=invalid-name + temp_container, callback_instance, no_holes, no_holes_read_twice + ): + """Test the correctness of the callback of add_streamed_objects_to_pack.""" + # Add packed objects (2001, 10 chars each) + len_packed = 2001 + stream_list = [io.BytesIO('p{:09d}'.format(i).encode('ascii')) for i in range(len_packed)] + + temp_container.add_streamed_objects_to_pack( + stream_list, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback_instance.callback + ) + + # Add another 4001 packed objects with 2001 already-existing objects + len_packed2 = 4001 + stream_list = [io.BytesIO('2p{:09d}'.format(i).encode('ascii')) for i in range(len_packed2)] + + temp_container.add_streamed_objects_to_pack( + stream_list, no_holes=no_holes, no_holes_read_twice=no_holes_read_twice, callback=callback_instance.callback + ) + + assert callback_instance.current_action is None, ( + "The 'add_streamed_objects_to_pack' call did not perform a final callback with a 'close' event" + ) + + expected_actions = [] + # First call + expected_actions.append({'start_value': {'total': len_packed, 'description': 'Bulk storing'}, 'value': len_packed}) + # Second call + if no_holes: + # If no_holes is True, i.e. we do not want holes, we compute an initial list of the existing ones + expected_actions.append({ + 'start_value': { + 'total': len_packed, + 'description': 'List existing' + }, + 'value': len_packed + }) + expected_actions.append({ + 'start_value': { + 'total': len_packed2, + 'description': 'Bulk storing' + }, + 'value': len_packed2 + }) + + assert callback_instance.performed_actions == expected_actions + + +# Check both with the same hash type and with a different one +@pytest.mark.parametrize('other_container_hash_type', ['sha256', 'sha1']) +def test_import_objects_callback(temp_container, callback_instance, other_container_hash_type): + """Test the correctness of the callback of import_objects.""" + # Add packed objects (2001, 10 chars each) + len_packed = 2001 + stream_list = [io.BytesIO('p{:09d}'.format(i).encode('ascii')) for i in range(len_packed)] + hashkeys = temp_container.add_streamed_objects_to_pack(stream_list) + + with tempfile.TemporaryDirectory() as tmpdir: + other_container = Container(tmpdir) + # Use the same hash type + other_container.init_container(clear=True, hash_type=other_container_hash_type) + + # Import objects + other_container.import_objects(hashkeys, temp_container, callback=callback_instance.callback) + + assert other_container.count_objects()['loose'] == temp_container.count_objects()['loose'] + assert other_container.count_objects()['packed'] == temp_container.count_objects()['packed'] + + # close before exiting the context manager, so files are closed. + other_container.close() + + expected_actions = [] + if other_container_hash_type == temp_container.hash_type: + expected_actions.append({ + 'start_value': { + 'description': 'Listing objects', + 'total': len_packed + }, + 'value': len_packed + }) + expected_actions.append({'start_value': {'description': 'Copy objects', 'total': len_packed}, 'value': len_packed}) + # len_packed is small (and the objects are small) + # so they all end up in the final flush + expected_actions.append({'start_value': {'description': 'Final flush', 'total': len_packed}, 'value': len_packed}) + + assert callback_instance.performed_actions == expected_actions + + @pytest.mark.parametrize('ask_deleting_unknown', [True, False]) @pytest.mark.parametrize('compress', [True, False]) def test_delete(temp_container, compress, ask_deleting_unknown): # pylint: disable=too-many-statements @@ -2701,3 +2855,181 @@ def new_write_data_to_packfile(self, call_counter, *args, **kwargs): assert new_sizes['total_size_packfiles_on_disk'] == 2 * sizes['total_size_packfiles_on_disk'] # (but shouldn't have increased the size of packed objects on disk) assert new_sizes['total_size_packed_on_disk'] == sizes['total_size_packed_on_disk'] + + +def test_packs_read_in_order(temp_dir): + """Test that when reading the objects from packs, they are read grouped by packs, and in offset order. + + This is very important for performance. + + .. note:: IMPORTANT: This is not running with concurrent packing, so only the first internal loop of + get_objects_stream_and_meta is triggered and the order is the one described above. + The application should NOT make any assumption on this because, during concurrent packing of loose objects, + the recently packed/clean_stored objects might be returned later. + + .. note:: We are not checking the order in which packs are considered + """ + num_objects = 10000 # Number of objects + obj_size = 999 + # This will generate N objects of size obj_size each + # They are different because the at least the first characters until the first dash are different + + temp_container = Container(temp_dir) + # A apck should accomodate ~100 objects, and there should be > 90 packs + temp_container.init_container(clear=True, pack_size_target=100000) + + data = [('{}-'.format(i).encode('ascii') * obj_size)[:obj_size] for i in range(num_objects)] + hashkeys = temp_container.add_objects_to_pack(data, compress=False) + + # Check that I indeed created num_objects (different) objects + assert len(set(hashkeys)) == num_objects + + # Shuffle the array. When retrieving data, I should still fetch them per pack, and then in offset order + # (so the pack file is read sequentially rather than randomly) + random.shuffle(hashkeys) + + last_offset = None + last_pack = None + seen_packs = set() + + with temp_container.get_objects_stream_and_meta(hashkeys, skip_if_missing=False) as triplets: + for _, _, meta in triplets: + assert meta['type'] == ObjectType.PACKED + if last_pack is None: + last_pack = meta['pack_id'] + seen_packs.add(meta['pack_id']) + last_offset = 0 + elif meta['pack_id'] != last_pack: + assert meta['pack_id'] not in seen_packs, ( + 'Objects were already retrieved from pack {}, the last pack was {} ' + 'and we are trying to retrieve again from pack {}'.format( + meta['pack_id'], last_pack, meta['pack_id'] + ) + ) + last_pack = meta['pack_id'] + seen_packs.add(meta['pack_id']) + last_offset = 0 + # We are still in the same pack + assert last_offset <= meta['pack_offset'], ( + 'in pack {} we are reading offset {}, but before we were reading a later offset {}'.format( + meta['pack_id'], meta['pack_offset'], last_offset + ) + ) + last_offset = meta['pack_offset'] + + # I want to make sure to have generated enough packs, meaning this function is actually testing the behavior + # This should generated 90 packs + # NOTE: if you use compress = True, you get many less packs since the data is very compressible! (only 2) + # So we only test with compress=False + largest_pack = max(seen_packs) + assert largest_pack > 80 + + # Check that all packs were scanned through + assert sorted(seen_packs) == list(range(largest_pack + 1)) + + # Important before exiting from the tests + temp_container.close() + + +def test_repack(temp_dir): + """Test the repacking functionality.""" + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b'-123456789', b'a123456789', b'b123456789', b'c123456789', b'd123456789', b'e123456789', b'f123456789', + b'g123456789', b'h123456789' + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + assert temp_container.get_object_meta(hashkeys[0])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[1])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[2])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[3])['pack_id'] == 0 + assert temp_container.get_object_meta(hashkeys[4])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[5])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[6])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[7])['pack_id'] == 1 + assert temp_container.get_object_meta(hashkeys[8])['pack_id'] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == ['0', '1', '2'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) + size = temp_container.get_total_size() + assert size['total_size_packed'] == 10 * len(data) + assert size['total_size_packfiles_on_disk'] == 10 * len(data) + + # I delete an object in the middle, an object at the end of a pack, and an object at the beginning. + # I also delete the only object + to_delete = [hashkeys[1], hashkeys[3], hashkeys[4], hashkeys[8]] + temp_container.delete_objects(to_delete) + + # I check that all packs are still there + assert sorted(temp_container._list_packs()) == ['0', '1', '2'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) - len(to_delete) + size = temp_container.get_total_size() + # I deleted 4 objects + assert size['total_size_packed'] == 10 * (len(data) - len(to_delete)) + # Still full size on disk + assert size['total_size_packfiles_on_disk'] == 10 * len(data) + + # I now repack + temp_container.repack(compress_mode=CompressMode.KEEP) + + # I check that all packs are still there, but pack 2 was deleted + assert sorted(temp_container._list_packs()) == ['0', '1'] # pylint: disable=protected-access + + counts = temp_container.count_objects() + assert counts['packed'] == len(data) - len(to_delete) + size = temp_container.get_total_size() + assert size['total_size_packed'] == 10 * (len(data) - len(to_delete)) + # This time also the size on disk should be adapted (it's the main goal of repacking) + assert size['total_size_packfiles_on_disk'] == 10 * (len(data) - len(to_delete)) + + # Check that the content is still correct + # Should not raise + errors = temp_container.validate() + assert not any(errors.values()) + + # Important before exiting from the tests + temp_container.close() + + +def test_not_implemented_repacks(temp_container): + """Check the error for not implemented repack methods.""" + # We need to have at least one pack + temp_container.add_objects_to_pack([b'23r2']) + for compress_mode in CompressMode: + if compress_mode == CompressMode.KEEP: + continue + with pytest.raises(NotImplementedError): + temp_container.repack(compress_mode=compress_mode) + + +def test_pack_all_loose_many(temp_container): + """Check the pack_all_loose when there are many objects to pack, more than _MAX_CHUNK_ITERATE_LENGTH.""" + expected = {} + for idx in range(temp_container._MAX_CHUNK_ITERATE_LENGTH + 10): # pylint: disable=protected-access + content = '{}'.format(idx).encode('utf8') + expected[temp_container.add_object(content)] = content + + # Pack all loose objects + temp_container.pack_all_loose() + + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected + + # Pack again, nothing should happen, but it should trigger the logic in pack_all_loose at the beginning, + # with `if where == Location.BOTH` + temp_container.pack_all_loose() + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected diff --git a/tests/test_utils.py b/tests/test_utils.py index c3ea935..980fdd2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1140,7 +1140,8 @@ def test_zero_stream_multi_read(): # Set as the second parameter the hash of the 'content' string written below # inside the test function @pytest.mark.parametrize( - 'hash_type,expected_hash', [['sha256', '9975d00a6e715d830aeaa035347b3e601a0c0bb457a7f87816300e7c01c0c39b']] + 'hash_type,expected_hash', [['sha256', '9975d00a6e715d830aeaa035347b3e601a0c0bb457a7f87816300e7c01c0c39b'], + ['sha1', '2a0439b5b34b74808b6cc7a2bf04dd02604c20b0']] ) def test_hash_writer_wrapper(temp_dir, hash_type, expected_hash): """Test some functionality of the HashWriterWrapper class.""" @@ -1193,11 +1194,13 @@ def test_is_known_hash(): """Check the functionality of the is_known_hash function.""" # At least sha256 should be supported assert utils.is_known_hash('sha256') + # sha1 should also be supported + assert utils.is_known_hash('sha1') # A weird string should not be a valid known hash assert not utils.is_known_hash('SOME_UNKNOWN_HASH_TYPE') -@pytest.mark.parametrize('hash_type', ['sha256']) +@pytest.mark.parametrize('hash_type', ['sha256', 'sha1']) def test_compute_hash_and_size(hash_type): """Check the funtion to compute the hash and size.""" @@ -1211,3 +1214,278 @@ def test_compute_hash_and_size(hash_type): hashkey, size = utils.compute_hash_and_size(stream, hash_type=hash_type) assert hashkey == expected_hash assert size == expected_size + + +LEFT_RIGHT_PAIRS = [ + # Both empty + [[], []], + # Left empty + [ + [], + [1, 2, 3], + ], + # Right empty + [ + [1, 2, 4], + [], + ], + # Some lists with some overlap, right exhausted first + [ + [1, 3, 5, 7, 9, 10, 11, 20], + [0, 2, 4, 5, 8, 10, 12], + ], + # Some lists with some overlap, left exhausted first + [ + [0, 2, 4, 5, 8, 10, 12], + [1, 3, 5, 7, 9, 10, 11, 20], + ], + # Start with both, continue left + [ + [0, 1, 2], + [0, 3, 5, 7], + ], + # Start with both, continue right + [ + [0, 3, 5, 7], + [0, 1, 2], + ], + # End with both, coming from left + [ + [0, 3, 5, 6, 8], + [1, 2, 8], + ], + # End with both, coming from right + [ + [1, 2, 8], + [0, 3, 5, 6, 8], + ], +] + + +@pytest.mark.parametrize('left,right', LEFT_RIGHT_PAIRS) +def test_detect_where(left, right): + """Test the detect_where_sorted function.""" + # Compute the expected result + merged = sorted(set(left + right)) + idx = -1 # Needed when detect_where_sorted is an empty iterator + for idx, (item, where) in enumerate(utils.detect_where_sorted(left, right)): + assert item == merged[idx] + if merged[idx] in left: + if merged[idx] in right: + expected_where = utils.Location.BOTH + else: + expected_where = utils.Location.LEFTONLY + else: + expected_where = utils.Location.RIGHTONLY + assert where == expected_where + assert idx + 1 == len(merged) + + +LEFT_RIGHT_PAIRS_UNSORTED = [ + # Unsorted at end, left + [ + [1, 4, 5, 1], + [1, 2, 3], + ], + # Unsorted at end, right + [ + [1, 4, 5], + [1, 2, 3, 1], + ], + # Unsorted at beginning, left + [ + [1, 0, 4, 5], + [1, 2, 3], + ], + # Unsorted at beginning, right + [ + [1, 4, 5], + [1, 0, 2, 3], + ], + # not unique at end, left + [ + [1, 4, 5, 5], + [1, 2, 3], + ], + # Not unique at end, right + [ + [1, 4, 5], + [1, 2, 3, 3], + ], + # Not unique at beginning, left + [ + [1, 1, 4, 5], + [1, 2, 3], + ], + # Not unique at beginning, right + [ + [1, 4, 5], + [1, 1, 2, 3], + ] +] + + +@pytest.mark.parametrize('left,right', LEFT_RIGHT_PAIRS_UNSORTED) +def test_detect_where_unsorted(left, right): + """Test the detect_where_sorted function when the lists are not sorted.""" + with pytest.raises(ValueError) as excinfo: + list(utils.detect_where_sorted(left, right)) + assert 'does not return sorted', str(excinfo.value) + + +def test_yield_first(): + """Test the yield_first_element function.""" + + first = [1, 3, 5, 7, 9] + second = [0, 2, 4, 6, 8] + + # [(1, 0), (3, 2), ...] + inner_iter = zip(first, second) + + result = list(utils.yield_first_element(inner_iter)) + + assert result == first + + +def test_merge_sorted(): + """Test the merge_sorted function.""" + # I also put some repetitions + first = sorted([1, 3, 5, 7, 9, 10, 11, 20]) + second = sorted([0, 2, 4, 5, 8, 10, 12]) + + result1 = list(utils.merge_sorted(first, second)) + result2 = list(utils.merge_sorted(second, first)) + + assert result1 == sorted(set(first + second)) + assert result2 == sorted(set(first + second)) + + +def test_callback_stream_wrapper_none(): # pylint: disable=invalid-name + """Test the callback stream wrapper with no actual callback.""" + with tempfile.TemporaryFile(mode='rb+') as fhandle: + fhandle.write(b'abc') + fhandle.seek(0) + + wrapped = utils.CallbackStreamWrapper(fhandle, callback=None) + + assert wrapped.mode == 'rb+' + assert wrapped.seekable + # Seek forward; read from byte 1 + wrapped.seek(1) + assert wrapped.tell() == 1 + assert wrapped.read() == b'bc' + assert wrapped.tell() == 3 + # Seek backwards; read from byte 0 + wrapped.seek(0) + assert wrapped.read() == b'abc' + + wrapped.close_callback() + + +@pytest.mark.parametrize('with_total_length', [True, False]) +def test_callback_stream_wrapper(callback_instance, with_total_length): + """Test the callback stream wrapper.""" + description = 'SOME CALLBACK DESCRIPTION' + # Long string so we trigger the update_every logic + content = b'abc' * 4000 + + with tempfile.TemporaryFile(mode='rb+') as fhandle: + fhandle.write(content) + fhandle.seek(0) + + if with_total_length: + wrapped = utils.CallbackStreamWrapper( + fhandle, callback=callback_instance.callback, total_length=len(content), description=description + ) + else: + wrapped = utils.CallbackStreamWrapper(fhandle, callback=callback_instance.callback, description=description) + + assert wrapped.mode == 'rb+' + assert wrapped.seekable + # Seek forward; read from byte 10 + wrapped.seek(10) + assert wrapped.tell() == 10 + assert wrapped.read() == content[10:] + assert wrapped.tell() == len(content) + # Seek backwards; read from byte 0, all + wrapped.seek(0) + assert wrapped.read() == content + + # Seek backwards; read from byte 0, only 2 bytes + wrapped.seek(0) + assert wrapped.read(2) == content[0:2] + # Close the callback. It should be long enough so that + # the close_callback has to "flush" the internal buffer + # (when we provide the total_length) + wrapped.close_callback() + + assert callback_instance.performed_actions == [{ + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': description + }, + 'value': len(content) + }, { + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': '{} [rewind]'.format(description) + }, + 'value': len(content) + }, { + 'start_value': { + 'total': len(content) if with_total_length else 0, + 'description': '{} [rewind]'.format(description) + }, + 'value': 2 + }] + + +def test_rename_callback(callback_instance): + """Check the rename_callback function.""" + old_description = 'original description' + new_description = 'SOME NEW DESC' + content = b'some content' + + assert utils.rename_callback(None, new_description=new_description) is None + + # First call with the original one + wrapped = utils.CallbackStreamWrapper( + io.BytesIO(content), + callback=callback_instance.callback, + total_length=len(content), + description=old_description + ) + # Call read so the callback is called + wrapped.read() + # We need to close the callback before reusing it + wrapped.close_callback() + + # Now call with the modified one + wrapped = utils.CallbackStreamWrapper( + io.BytesIO(content), + callback=utils.rename_callback(callback_instance.callback, new_description=new_description), + total_length=len(content), + description=old_description + ) + # Call read so the callback is called + wrapped.read() + # Close the callback to flush out + wrapped.close_callback() + + assert callback_instance.performed_actions == [ + { + 'start_value': { + 'total': len(content), + 'description': old_description + }, + 'value': len(content) + }, + { + 'start_value': { + 'total': len(content), + # Here there should be the new description + 'description': new_description + }, + 'value': len(content) + } + ]