diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bf055eb..f783c7f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -67,8 +67,9 @@ repos: - psutil==5.8.0 - pytest==6.2.4 - # - repo: https://github.com/pre-commit/mirrors-mypy - # rev: v0.910 - # hooks: - # - id: mypy - # additional_dependencies: [] + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.910 + hooks: + - id: mypy + additional_dependencies: [typing-extensions] + files: ^(disk_objectstore/.*py)$ diff --git a/MANIFEST.in b/MANIFEST.in index ff33864..e874f50 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -11,3 +11,4 @@ include LICENSE.txt include CHANGELOG.md include README.md include requirements.lock +include disk_objectstore/py.typed diff --git a/codecov.yml b/codecov.yml index 033877f..ad9713f 100644 --- a/codecov.yml +++ b/codecov.yml @@ -11,3 +11,6 @@ coverage: target: auto threshold: 1% base: auto + patch: + default: + threshold: 1% diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 2307bd6..b4ee8e0 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -11,18 +11,43 @@ from collections import defaultdict, namedtuple from contextlib import contextmanager from enum import Enum +from typing import ( + Any, + Callable, + Dict, + Iterator, + List, + Optional, + Set, + Tuple, + Type, + Union, + overload, +) + +try: + from typing import Literal +except ImportError: + # Python <3.8 backport + from typing_extensions import Literal # type: ignore from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.session import Session from sqlalchemy.sql import func from .exceptions import InconsistentContent, NotExistent, NotInitialised from .models import Base, Obj from .utils import ( CallbackStreamWrapper, + LazyOpener, Location, ObjectWriter, PackedObjectReader, + StreamReadBytesType, + StreamSeekBytesType, + StreamWriteBytesType, + ZlibStreamDecompresser, chunk_iterator, compute_hash_and_size, detect_where_sorted, @@ -93,51 +118,52 @@ class Container: # pylint: disable=too-many-public-methods # (after VACUUMing, as mentioned above). _MAX_CHUNK_ITERATE_LENGTH = 9500 - def __init__(self, folder): + def __init__(self, folder: str) -> None: """Create the class that represents the container. :param folder: the path to a folder that will host this object-store container. """ self._folder = os.path.realpath(folder) - self._session = None # Will be populated by the _get_session function + # Will be populated by the _get_session function + self._session: Optional[Session] = None # These act as caches and will be populated by the corresponding properties # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! - self._current_pack_id = None - self._config = None + self._current_pack_id: Optional[int] = None + self._config: Optional[dict] = None - def get_folder(self): + def get_folder(self) -> str: """Return the path to the folder that will host the object-store container.""" return self._folder - def close(self): + def close(self) -> None: """Close open files (in particular, the connection to the SQLite DB).""" if self._session is not None: self._session.close() self._session = None - def _get_sandbox_folder(self): + def _get_sandbox_folder(self) -> str: """Return the path to the sandbox folder that is used during a new object creation. It is a subfolder of the container folder. """ return os.path.join(self._folder, "sandbox") - def _get_loose_folder(self): + def _get_loose_folder(self) -> str: """Return the path to the folder that will host the loose objects. It is a subfolder of the container folder. """ return os.path.join(self._folder, "loose") - def _get_pack_folder(self): + def _get_pack_folder(self) -> str: """Return the path to the folder that will host the packed objects. It is a subfolder of the container folder. """ return os.path.join(self._folder, "packs") - def _get_duplicates_folder(self): + def _get_duplicates_folder(self) -> str: """Return the path to the folder that will host the duplicate loose objects that couldn't be written. This should happen only in race conditions on Windows. See `utils.ObjectWriter.__exit__` for its usage, and @@ -147,11 +173,13 @@ def _get_duplicates_folder(self): """ return os.path.join(self._folder, "duplicates") - def _get_config_file(self): + def _get_config_file(self) -> str: """Return the path to the container config file.""" return os.path.join(self._folder, "config.json") - def _get_session(self, create=False, raise_if_missing=False): + def _get_session( + self, create: bool = False, raise_if_missing: bool = False + ) -> Optional[Session]: """Return a new session to connect to the pack-index SQLite DB. :param create: if True, creates the sqlite file and schema. @@ -214,7 +242,7 @@ def do_begin(conn): # pylint: disable=unused-variable return session - def _get_cached_session(self): + def _get_cached_session(self) -> Session: """Return the SQLAlchemy session to access the SQLite file, reusing the same one.""" # We want to catch both if it's missing, and if it's None @@ -224,7 +252,7 @@ def _get_cached_session(self): self._session = self._get_session(create=False, raise_if_missing=True) return self._session - def _get_loose_path_from_hashkey(self, hashkey): + def _get_loose_path_from_hashkey(self, hashkey: str) -> str: """Return the path of a loose object on disk containing the data of a given hash key. :param hashkey: the hashkey of the object to get. @@ -238,7 +266,9 @@ def _get_loose_path_from_hashkey(self, hashkey): # if loose_prefix_len is zero, there is no subfolder return os.path.join(self._get_loose_folder(), hashkey) - def _get_pack_path_from_pack_id(self, pack_id, allow_repack_pack=False): + def _get_pack_path_from_pack_id( + self, pack_id: Union[str, int], allow_repack_pack: bool = False + ) -> str: """Return the path of the pack file on disk for the given pack ID. :param pack_id: the pack ID. @@ -250,11 +280,11 @@ def _get_pack_path_from_pack_id(self, pack_id, allow_repack_pack=False): ), f"Invalid pack ID {pack_id}" return os.path.join(self._get_pack_folder(), pack_id) - def _get_pack_index_path(self): + def _get_pack_index_path(self) -> str: """Return the path to the SQLite file containing the index of packed objects.""" return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}") - def _get_pack_id_to_write_to(self): + def _get_pack_id_to_write_to(self) -> int: """Return the pack ID to write the next object. This function checks that there is a pack file with the current pack ID. @@ -282,7 +312,7 @@ def _get_pack_id_to_write_to(self): return pack_id @property - def is_initialised(self): + def is_initialised(self) -> bool: """Return True if the container is already initialised.""" # If the config file does not exist, the container is not initialised try: @@ -304,12 +334,12 @@ def is_initialised(self): def init_container( self, - clear=False, - pack_size_target=4 * 1024 * 1024 * 1024, - loose_prefix_len=2, - hash_type="sha256", - compression_algorithm="zlib+1", - ): + clear: bool = False, + pack_size_target: int = 4 * 1024 * 1024 * 1024, + loose_prefix_len: int = 2, + hash_type: str = "sha256", + compression_algorithm: str = "zlib+1", + ) -> None: """Initialise the container folder, if not already done. If this is called multiple times, it does not corrupt the data, @@ -398,7 +428,7 @@ def init_container( self._get_session(create=True) - def _get_repository_config(self): + def _get_repository_config(self) -> Dict[str, Union[int, str]]: """Return the repository config.""" if self._config is None: if not self.is_initialised: @@ -410,31 +440,31 @@ def _get_repository_config(self): return self._config @property - def loose_prefix_len(self): + def loose_prefix_len(self) -> int: """Return the length of the prefix of loose objects, when sharding. This is read from the (cached) repository configuration. """ - return self._get_repository_config()["loose_prefix_len"] + return self._get_repository_config()["loose_prefix_len"] # type: ignore[return-value] @property - def pack_size_target(self): + def pack_size_target(self) -> int: """Return the length of the pack name, when sharding. This is read from the (cached) repository configuration. """ - return self._get_repository_config()["pack_size_target"] + return self._get_repository_config()["pack_size_target"] # type: ignore[return-value] @property - def hash_type(self): + def hash_type(self) -> str: """Return the length of the prefix of loose objects, when sharding. This is read from the (cached) repository configuration. """ - return self._get_repository_config()["hash_type"] + return self._get_repository_config()["hash_type"] # type: ignore[return-value] @property - def container_id(self): + def container_id(self) -> str: """Return the repository unique ID. This is read from the (cached) repository configuration, and is a UUID uniquely identifying @@ -443,26 +473,26 @@ def container_id(self): Clones of the container should have a different ID even if they have the same content. """ - return self._get_repository_config()["container_id"] + return self._get_repository_config()["container_id"] # type: ignore[return-value] @property - def compression_algorithm(self): + def compression_algorithm(self) -> str: """Return the compression algorithm defined for this container. This is read from the repository configuration.""" - return self._get_repository_config()["compression_algorithm"] + return self._get_repository_config()["compression_algorithm"] # type: ignore[return-value] def _get_compressobj_instance(self): """Return the correct `compressobj` class for the compression algorithm defined for this container.""" return get_compressobj_instance(self.compression_algorithm) - def _get_stream_decompresser(self): + def _get_stream_decompresser(self) -> Type[ZlibStreamDecompresser]: """Return a new instance of the correct StreamDecompresser class for the compression algorithm defined for this container. """ return get_stream_decompresser(self.compression_algorithm) - def get_object_content(self, hashkey): + def get_object_content(self, hashkey: str) -> bytes: """Get the content of an object with a given hash key. :param hashkey: The hash key of the object to retrieve. @@ -472,7 +502,7 @@ def get_object_content(self, hashkey): return handle.read() @contextmanager - def get_object_stream(self, hashkey): + def get_object_stream(self, hashkey: str) -> Iterator[StreamReadBytesType]: """Return a context manager yielding a stream to get the content of an object. To be used as a context manager:: @@ -489,7 +519,9 @@ def get_object_stream(self, hashkey): yield fhandle @contextmanager - def get_object_stream_and_meta(self, hashkey): + def get_object_stream_and_meta( + self, hashkey: str + ) -> Iterator[Tuple[StreamReadBytesType, Dict[str, Any]],]: """Return a context manager yielding a stream to get the content of an object, and a metadata dictionary. To be used as a context manager:: @@ -510,7 +542,11 @@ def get_object_stream_and_meta(self, hashkey): hashkeys=[hashkey], skip_if_missing=False ) as triplets: counter = 0 - for obj_hashkey, stream, meta in triplets: + for ( + obj_hashkey, + stream, + meta, + ) in triplets: # pylint: disable=not-an-iterable counter += 1 assert ( counter == 1 @@ -522,9 +558,35 @@ def get_object_stream_and_meta(self, hashkey): yield stream, meta + @overload + def _get_objects_stream_meta_generator( + self, + hashkeys: Union[List[str], Tuple[str, ...]], + skip_if_missing: bool, + with_streams: Literal[False], + ) -> Iterator[Tuple[str, Dict[str, Any]]]: + ... + + @overload + def _get_objects_stream_meta_generator( + self, + hashkeys: Union[List[str], Tuple[str, ...]], + skip_if_missing: bool, + with_streams: Literal[True], + ) -> Iterator[Tuple[str, Optional[StreamSeekBytesType], Dict[str, Any]]]: + ... + def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too-many-statements,too-many-locals - self, hashkeys, skip_if_missing, with_streams - ): + self, + hashkeys: Union[List[str], Tuple[str, ...]], + skip_if_missing: bool, + with_streams: bool, + ) -> Iterator[ + Union[ + Tuple[str, Dict[str, Any]], + Tuple[str, Optional[StreamSeekBytesType], Dict[str, Any]], + ] + ]: """Return a generator yielding triplets of (hashkey, open stream, size). :note: The stream is already open and at the right position, and can @@ -554,7 +616,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too # Operate on a set - only return once per required hashkey, even if required more than once hashkeys_set = set(hashkeys) - hashkeys_in_packs = set() + hashkeys_in_packs: Set[str] = set() packs = defaultdict(list) # Currently ordering in the DB (it's ordered across all packs, but this should not be @@ -562,6 +624,8 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too # to order in python instead session = self._get_cached_session() + obj_reader: StreamReadBytesType + if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH: # Operate in chunks, due to the SQLite limits # (see comment above the definition of self._IN_SQL_MAX_LENGTH) @@ -619,6 +683,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too } if with_streams: + assert last_open_file is not None obj_reader = PackedObjectReader( fhandle=last_open_file, offset=metadata.offset, @@ -760,6 +825,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too "pack_length": metadata.length, } if with_streams: + assert last_open_file is not None obj_reader = PackedObjectReader( fhandle=last_open_file, offset=metadata.offset, @@ -792,7 +858,9 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too yield missing_hashkey, meta @contextmanager - def get_objects_stream_and_meta(self, hashkeys, skip_if_missing=True): + def get_objects_stream_and_meta( + self, hashkeys: List[str], skip_if_missing: bool = True + ) -> Iterator[Iterator[Tuple[str, Optional[StreamSeekBytesType], Dict[str, Any]]]]: """A context manager returning a generator yielding triplets of (hashkey, open stream, metadata). :note: the hash keys yielded are often in a *different* order than the original @@ -834,7 +902,9 @@ def get_objects_stream_and_meta(self, hashkeys, skip_if_missing=True): hashkeys=hashkeys, skip_if_missing=skip_if_missing, with_streams=True ) - def get_objects_meta(self, hashkeys, skip_if_missing=True): + def get_objects_meta( + self, hashkeys: Union[List[str], Tuple[str, ...]], skip_if_missing: bool = True + ) -> Iterator[Tuple[str, Dict[str, Any]]]: """A generator yielding pairs of (hashkey, metadata). :note: the hash keys yielded are often in a *different* order than the original @@ -867,7 +937,7 @@ def get_objects_meta(self, hashkeys, skip_if_missing=True): hashkeys=hashkeys, skip_if_missing=skip_if_missing, with_streams=False ) - def get_object_meta(self, hashkey): + def get_object_meta(self, hashkey: str) -> Dict[str, Any]: """Return the metadata dictionary for the given hash key. To be used as follows: @@ -881,7 +951,10 @@ def get_object_meta(self, hashkey): :param hashkey: the hashkey of the object to stream. """ counter = 0 - for obj_hashkey, meta in self.get_objects_meta( + for ( + obj_hashkey, + meta, + ) in self.get_objects_meta( # pylint: disable=not-an-iterable hashkeys=[hashkey], skip_if_missing=False ): counter += 1 @@ -895,7 +968,9 @@ def get_object_meta(self, hashkey): return meta - def has_objects(self, hashkeys): + raise NotExistent(f"No object with hash key {hashkey}") + + def has_objects(self, hashkeys: Union[List[str], Tuple[str, ...]]) -> List[bool]: """Return whether the container contains objects with the given hash keys. :param hashkeys: a list of hash keys to check. @@ -905,7 +980,7 @@ def has_objects(self, hashkeys): existing_hashkeys = set() # Note: This iterates in a 'random' order, different than the `hashkeys` list - for obj_hashkey, _ in self.get_objects_meta( + for obj_hashkey, _ in self.get_objects_meta( # pylint: disable=not-an-iterable hashkeys=hashkeys, skip_if_missing=True ): # Since I use skip_if_missing=True, I should only iterate on those that exist @@ -914,7 +989,7 @@ def has_objects(self, hashkeys): # Return a list of booleans return [hashkey in existing_hashkeys for hashkey in hashkeys] - def has_object(self, hashkey): + def has_object(self, hashkey: str) -> bool: """Return whether the container contains an object with the given hashkey. :param hashkey: the hashkey of the object. @@ -922,7 +997,9 @@ def has_object(self, hashkey): """ return self.has_objects([hashkey])[0] - def get_objects_content(self, hashkeys, skip_if_missing=True): + def get_objects_content( + self, hashkeys: List[str], skip_if_missing: bool = True + ) -> Dict[str, Optional[bytes]]: """Get the content of a number of objects with given hash keys. :note: use this method only if you know objects fit in memory. @@ -933,11 +1010,11 @@ def get_objects_content(self, hashkeys, skip_if_missing=True): :return: a dictionary of byte streams where the keys are the hash keys and the values are the object contents. """ - retrieved = {} + retrieved: Dict[str, Optional[bytes]] = {} with self.get_objects_stream_and_meta( hashkeys=hashkeys, skip_if_missing=skip_if_missing ) as triplets: - for obj_hashkey, stream, _ in triplets: + for obj_hashkey, stream, _ in triplets: # pylint: disable=not-an-iterable if stream is None: # This should happen only if skip_if_missing is False retrieved[obj_hashkey] = None @@ -945,7 +1022,7 @@ def get_objects_content(self, hashkeys, skip_if_missing=True): retrieved[obj_hashkey] = stream.read() return retrieved - def _new_object_writer(self): + def _new_object_writer(self) -> ObjectWriter: """Return a context manager that can be used to create a new object. To use it, do the following:: @@ -963,7 +1040,7 @@ def _new_object_writer(self): hash_type=self.hash_type, ) - def add_object(self, content): + def add_object(self, content: bytes) -> str: """Add a loose object from its content. :param content: a binary stream with the file content. @@ -972,7 +1049,7 @@ def add_object(self, content): stream = io.BytesIO(content) return self.add_streamed_object(stream) - def add_streamed_object(self, stream): + def add_streamed_object(self, stream: StreamReadBytesType) -> str: """Add a loose object getting the content from a stream and limiting memory usage even for large objects. :param stream: an (open) stream. The stream will be read from the current position, so make sure that @@ -990,9 +1067,11 @@ def add_streamed_object(self, stream): break fhandle.write(chunk) - return writer.get_hashkey() + hashkey = writer.get_hashkey() + assert hashkey is not None + return hashkey - def count_objects(self): + def count_objects(self) -> Dict[str, int]: """Return a dictionary with the count of objects, keys are 'loose' and 'packed'. Also return a number of packs under 'pack_files'.""" @@ -1001,16 +1080,13 @@ def count_objects(self): number_packed = self._get_cached_session().query(Obj).count() retval["packed"] = number_packed - retval["loose"] = 0 - for loose_hashkey in self._list_loose(): # pylint: disable=unused-variable - retval["loose"] += 1 - - retval["pack_files"] = len(list(self._list_packs())) + retval["loose"] = sum(1 for _ in self._list_loose()) + retval["pack_files"] = sum(1 for _ in self._list_packs()) return retval @classmethod - def _is_valid_pack_id(cls, pack_id, allow_repack_pack=False): + def _is_valid_pack_id(cls, pack_id: str, allow_repack_pack: bool = False) -> bool: """Return True if the name is a valid pack ID. If allow_repack_pack is True, also the pack id used for repacking is considered as valid. @@ -1027,7 +1103,7 @@ def _is_valid_pack_id(cls, pack_id, allow_repack_pack=False): return False return True - def _is_valid_loose_prefix(self, prefix): + def _is_valid_loose_prefix(self, prefix: str) -> bool: """Return True if the name is a valid prefix.""" if len(prefix) != self.loose_prefix_len: return False @@ -1036,7 +1112,7 @@ def _is_valid_loose_prefix(self, prefix): return True @staticmethod - def _is_valid_hashkey(hashkey): + def _is_valid_hashkey(hashkey: str) -> bool: """Return True is the name is a valid hashkey. Note that it currently does not check the length but only that the key is composed only @@ -1046,7 +1122,7 @@ def _is_valid_hashkey(hashkey): return False return True - def get_total_size(self): + def get_total_size(self) -> Dict[str, int]: """Return a dictionary with the total size of objects in the container. - total_size_packed: size of the objects (before compressing) in the packs. @@ -1088,7 +1164,9 @@ def get_total_size(self): return retval @contextmanager - def lock_pack(self, pack_id, allow_repack_pack=False): + def lock_pack( + self, pack_id: str, allow_repack_pack: bool = False + ) -> Iterator[StreamWriteBytesType]: """Lock the given pack id. Use as a context manager. Raise if the pack is already locked. If you enter the context manager, @@ -1114,7 +1192,7 @@ def lock_pack(self, pack_id, allow_repack_pack=False): if os.path.exists(lock_file): os.remove(lock_file) - def _list_loose(self): + def _list_loose(self) -> Iterator[str]: """Iterate over loose objects. This returns all loose objects, even if a packed version of the same object exists. @@ -1138,7 +1216,7 @@ def _list_loose(self): continue yield first_level - def _list_packs(self): + def _list_packs(self) -> Iterator[str]: """Iterate over packs. .. note:: this returns a generator of the pack IDs. @@ -1151,7 +1229,7 @@ def _list_packs(self): if self._is_valid_pack_id(fname): yield fname - def list_all_objects(self): + def list_all_objects(self) -> Iterator[str]: """Iterate of all object hashkeys. This function might be slow if there are many loose objects! @@ -1196,8 +1274,12 @@ def list_all_objects(self): yield hashkey def _write_data_to_packfile( - self, pack_handle, read_handle, compress, hash_type=None - ): + self, + pack_handle: StreamWriteBytesType, + read_handle: StreamReadBytesType, + compress: bool, + hash_type: Optional[str] = None, + ) -> Union[Tuple[int, None], Tuple[int, str]]: """Append data, read from read_handle until it ends, to the correct packfile. Return the number of bytes READ (note that this will be different @@ -1252,8 +1334,11 @@ def _write_data_to_packfile( return (count_read_bytes, hasher.hexdigest() if hash_type else None) def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches - self, compress=False, validate_objects=True, do_fsync=True - ): + self, + compress: bool = False, + validate_objects: bool = True, + do_fsync: bool = True, + ) -> None: """Pack all loose objects. This is a maintenance operation, needs to be done only by one process. @@ -1342,7 +1427,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches # Get next hash key to process loose_hashkey = loose_objects.pop() - obj_dict = {} + obj_dict: Dict[str, Any] = {} obj_dict["hashkey"] = loose_hashkey obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress @@ -1415,16 +1500,16 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches def add_streamed_object_to_pack( # pylint: disable=too-many-arguments self, - stream, - compress=False, - open_streams=False, - no_holes=False, - no_holes_read_twice=True, - callback=None, - callback_size_hint=0, - do_fsync=True, - do_commit=True, - ): + stream: StreamSeekBytesType, + compress: bool = False, + open_streams: bool = False, + no_holes: bool = False, + no_holes_read_twice: bool = True, + callback: Optional[Callable] = None, + callback_size_hint: int = 0, + do_fsync: bool = True, + do_commit: bool = True, + ) -> str: """Add a single object in streamed form to a pack. For the description of the parameters, see the docstring of ``add_streamed_objects_to_pack``. @@ -1434,7 +1519,7 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments length in the callbacks :return: a single object hash key """ - streams = [ + streams: List[StreamSeekBytesType] = [ CallbackStreamWrapper( stream, callback=callback, total_length=callback_size_hint ) @@ -1452,22 +1537,22 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments do_commit=do_commit, ) - # Close the callback so the bar doesn't remaing open - streams[0].close_callback() + # Close the callback so the bar doesn't remain open + streams[0].close_callback() # type: ignore[union-attr] return retval[0] def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-branches, too-many-statements, too-many-arguments self, - stream_list, - compress=False, - open_streams=False, - no_holes=False, - no_holes_read_twice=True, - callback=None, - do_fsync=True, - do_commit=True, - ): + stream_list: Union[List[StreamSeekBytesType], List[LazyOpener]], + compress: bool = False, + open_streams: bool = False, + no_holes: bool = False, + no_holes_read_twice: bool = True, + callback: Optional[Callable] = None, + do_fsync: bool = True, + do_commit: bool = True, + ) -> List[str]: """Add objects directly to a pack, reading from a list of streams. This is a maintenance operation, available mostly for efficiency reasons @@ -1502,7 +1587,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b :return: a list of object hash keys """ yield_per_size = 1000 - hashkeys = [] + hashkeys: List[str] = [] # Make a copy of the list and revert its order, so we can pop from the list # without affecting the original list, and it's from the end so it's fast @@ -1605,7 +1690,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b if open_streams: stream_context_manager = next_stream else: - stream_context_manager = nullcontext(next_stream) + stream_context_manager = nullcontext(next_stream) # type: ignore[assignment] if callback: since_last_update += 1 @@ -1617,7 +1702,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # is already there position_before = pack_handle.tell() - obj_dict = {} + obj_dict: Dict[str, Any] = {} obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress obj_dict["offset"] = pack_handle.tell() @@ -1752,14 +1837,14 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b def add_objects_to_pack( # pylint: disable=too-many-arguments self, - content_list, - compress=False, - no_holes=False, - no_holes_read_twice=True, - callback=None, - do_fsync=True, - do_commit=True, - ): + content_list: Union[List[bytes], Tuple[bytes, ...]], + compress: bool = False, + no_holes: bool = False, + no_holes_read_twice: bool = True, + callback: Optional[Callable] = None, + do_fsync: bool = True, + do_commit: bool = True, + ) -> List[str]: """Add objects directly to a pack, reading from a list of content byte arrays. This is a maintenance operation, available mostly for efficiency reasons @@ -1785,7 +1870,9 @@ def add_objects_to_pack( # pylint: disable=too-many-arguments :return: a list of object hash keys """ - stream_list = [io.BytesIO(content) for content in content_list] + stream_list: List[StreamSeekBytesType] = [ + io.BytesIO(content) for content in content_list + ] return self.add_streamed_objects_to_pack( stream_list=stream_list, compress=compress, @@ -1796,7 +1883,7 @@ def add_objects_to_pack( # pylint: disable=too-many-arguments do_commit=do_commit, ) - def _vacuum(self): + def _vacuum(self) -> None: """Perform a `VACUUM` operation on the SQLite operation. This is critical for two aspects: @@ -1809,7 +1896,9 @@ def _vacuum(self): engine = self._get_cached_session().get_bind() engine.execute("VACUUM") - def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches + def clean_storage( # pylint: disable=too-many-branches + self, vacuum: bool = False + ) -> None: """Perform some maintenance clean-up of the container. .. note:: this is a maintenance operation, must be performed when nobody is using the container! @@ -1935,13 +2024,13 @@ def clean_storage(self, vacuum=False): # pylint: disable=too-many-branches def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-many-branches,too-many-arguments self, - hashkeys, - source_container, - compress=False, - target_memory_bytes=104857600, - callback=None, - do_fsync=True, - ): + hashkeys: List[str], + source_container: "Container", + compress: bool = False, + target_memory_bytes: int = 104857600, + callback: Optional[Callable] = None, + do_fsync: bool = True, + ) -> Dict[str, str]: """Imports the objects with the specified hashkeys into the container. :param hashkeys: an iterable of hash keys. @@ -1964,7 +2053,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m # We load data in this cache as long as the memory usage is < target_memory_bytes # We then flush in 'bulk' to the `other_container`, thus speeding up the process - content_cache = {} + content_cache: Dict[str, bytes] = {} cache_size = 0 if source_container.hash_type == self.hash_type: @@ -2048,6 +2137,7 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m with source_container.get_objects_stream_and_meta(hashkeys) as triplets: for old_obj_hashkey, stream, meta in triplets: + assert stream is not None if meta["size"] > target_memory_bytes: # If the object itself is too big, just write it directly # via streams, bypassing completely the cache. I don't touch the cache in this case, @@ -2159,12 +2249,12 @@ def import_objects( # pylint: disable=too-many-locals,too-many-statements,too-m def export( self, - hashkeys, - other_container, - compress=False, - target_memory_bytes=104857600, - callback=None, - ): + hashkeys: List[str], + other_container: "Container", + compress: bool = False, + target_memory_bytes: int = 104857600, + callback: Optional[Callable] = None, + ) -> Dict[str, str]: """Export the specified hashkeys to a new container (must be already initialised). ..deprecated:: 0.5 @@ -2194,8 +2284,10 @@ def export( # Let us also compute the hash def _validate_hashkeys_pack( - self, pack_id, callback=None - ): # pylint: disable=too-many-locals + self, pack_id: int, callback: Optional[Callable] = None + ) -> Dict[ + str, Union[List[Union[str, Any]], List[Any]] + ]: # pylint: disable=too-many-locals """Validate all hashkeys and returns a dictionary of problematic entries. The keys are the problem type, the values are a list of hashkeys of problematic objects. @@ -2280,7 +2372,7 @@ def callback(self, action, value): .order_by(Obj.offset) ) for hashkey, size, offset, length, compressed in query: - obj_reader = PackedObjectReader( + obj_reader: StreamSeekBytesType = PackedObjectReader( fhandle=pack_handle, offset=offset, length=length ) if compressed: @@ -2320,7 +2412,9 @@ def callback(self, action, value): "overlapping_packed": overlapping, } - def validate(self, callback=None): + def validate( + self, callback: Optional[Callable] = None + ) -> Dict[str, Union[List[Union[str, Any]], List[Any], List[str]]]: """Perform a number of validations on the container content, to make sure it is not corrupt.""" all_errors = defaultdict(list) @@ -2358,7 +2452,7 @@ def validate(self, callback=None): return dict(all_errors) - def delete_objects(self, hashkeys): + def delete_objects(self, hashkeys: List[str]) -> List[Union[str, Any]]: """Delete the selected objects. .. note:: In the current version, this has to be considered a maintenance operation, and as such it should @@ -2439,7 +2533,7 @@ def delete_objects(self, hashkeys): # was deleted) should be considered as if the object has *not* been deleted return list(deleted_loose.union(deleted_packed)) - def repack(self, compress_mode=CompressMode.KEEP): + def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: """Perform a repack of all packed objects. At the end, it also VACUUMs the DB to reclaim unused space and make @@ -2453,7 +2547,9 @@ def repack(self, compress_mode=CompressMode.KEEP): self.repack_pack(pack_id, compress_mode=compress_mode) self._vacuum() - def repack_pack(self, pack_id, compress_mode=CompressMode.KEEP): + def repack_pack( + self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP + ) -> None: """Perform a repack of a given pack object. This is a maintenance operation. diff --git a/disk_objectstore/examples/example_objectstore.py b/disk_objectstore/examples/example_objectstore.py index 4fdca3b..24d2d42 100755 --- a/disk_objectstore/examples/example_objectstore.py +++ b/disk_objectstore/examples/example_objectstore.py @@ -143,9 +143,7 @@ def main( for filename, item in retrieved.items(): assert ( item == files[filename] - ), "Mismatch (content) for {}, {} vs {}".format( - filename, item, files[filename] - ) + ), f"Mismatch (content) for {filename}, {item!r} vs {files[filename]!r}" # Check that num_files new loose files are present now counts = container.count_objects() @@ -299,7 +297,7 @@ def bulk_read_data(container, hashkey_list): for filename, content in retrieved.items(): assert ( content == files[filename] - ), f"Mismatch (content) for {filename}, {content} vs {files[filename]}" + ), f"Mismatch (content) for {filename}, {content!r} vs {files[filename]!r}" print("All tests passed") diff --git a/disk_objectstore/models.py b/disk_objectstore/models.py index ad912e2..f724b93 100644 --- a/disk_objectstore/models.py +++ b/disk_objectstore/models.py @@ -5,7 +5,7 @@ Base = declarative_base() # pylint: disable=invalid-name,useless-suppression -class Obj(Base): # pylint: disable=too-few-public-methods +class Obj(Base): # type: ignore # pylint: disable=too-few-public-methods """The main (and only) table to store object metadata (hashkey, offset, length, ...).""" __tablename__ = "db_object" diff --git a/disk_objectstore/py.typed b/disk_objectstore/py.typed new file mode 100644 index 0000000..7632ecf --- /dev/null +++ b/disk_objectstore/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561 diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index adb73f9..0cfbede 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -10,17 +10,53 @@ import os import uuid import zlib +from contextlib import contextmanager from enum import Enum +from typing import ( + Any, + BinaryIO, + Callable, + Iterable, + Iterator, + Optional, + Tuple, + Type, + Union, +) +from zlib import error + +from sqlalchemy.engine.result import ResultProxy + +from .exceptions import ClosingNotAllowed, ModificationNotAllowed + +try: + from typing import Literal # pylint: disable=ungrouped-imports +except ImportError: + # Python <3.8 backport + from typing_extensions import Literal # type: ignore try: import fcntl except ImportError: # Not available on Windows - fcntl = None - -from contextlib import contextmanager - -from .exceptions import ClosingNotAllowed, ModificationNotAllowed + fcntl = None # type: ignore[assignment] + +# requires read method only +StreamReadBytesType = Union[ + BinaryIO, + "PackedObjectReader", + "CallbackStreamWrapper", + "ZlibLikeBaseStreamDecompresser", + "ZeroStream", +] +# requires read and seek capability +StreamSeekBytesType = Union[ + BinaryIO, + "PackedObjectReader", + "CallbackStreamWrapper", + "ZlibLikeBaseStreamDecompresser", +] +StreamWriteBytesType = BinaryIO # For now I I don't always activate it as I need to think at the right balance between # safety and performance/disk wearing @@ -43,26 +79,25 @@ class LazyOpener: when opening the stream. """ - def __init__(self, path, mode="rb"): + def __init__(self, path: str) -> None: """Lazily store file path and mode, but do not open now. File will be opened only when entering the context manager. """ self._path = path - self._mode = mode - self._fhandle = None + self._fhandle: Optional[BinaryIO] = None @property - def path(self): + def path(self) -> str: """The file path.""" return self._path @property - def mode(self): + def mode(self) -> Literal["rb"]: """The file open mode.""" - return self._mode + return "rb" - def tell(self): + def tell(self) -> int: """Return the position in the underlying file object. :return: an integer with the position. @@ -72,7 +107,7 @@ def tell(self): raise ValueError("I/O operation on closed file.") return self._fhandle.tell() - def __enter__(self): + def __enter__(self) -> BinaryIO: """Open the file when entering the with context manager. Note: you cannot open it twice with two with statements. @@ -82,7 +117,7 @@ def __enter__(self): self._fhandle = open(self.path, mode=self.mode) return self._fhandle - def __exit__(self, exc_type, value, traceback): + def __exit__(self, exc_type, value, traceback) -> None: """Close the file when exiting the with context manager.""" if self._fhandle is not None: if not self._fhandle.closed: @@ -91,7 +126,7 @@ def __exit__(self, exc_type, value, traceback): @contextmanager -def nullcontext(enter_result): +def nullcontext(enter_result: Any) -> Iterator[Any]: """Return a context manager that returns enter_result from __enter__, but otherwise does nothing. This can be replaced by ``contextlib.nullcontext`` if we want to support only py>=3.7. @@ -104,13 +139,13 @@ class ObjectWriter: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, - sandbox_folder, - loose_folder, - loose_prefix_len, - duplicates_folder, - hash_type, - trust_existing=False, - ): + sandbox_folder: str, + loose_folder: str, + loose_prefix_len: int, + duplicates_folder: str, + hash_type: str, + trust_existing: bool = False, + ) -> None: """Initialise an object to store a new loose object. :param sandbox_folder: the folder to store objects while still giving @@ -130,23 +165,23 @@ def __init__( # pylint: disable=too-many-arguments self._loose_folder = loose_folder self._duplicates_folder = duplicates_folder self._hash_type = hash_type - self._hashkey = None + self._hashkey: Optional[str] = None self._loose_prefix_len = loose_prefix_len self._stored = False - self._obj_path = None - self._filehandle = None + self._obj_path: Optional[str] = None + self._filehandle: Optional[HashWriterWrapper] = None self._trust_existing = trust_existing @property - def hash_type(self): + def hash_type(self) -> str: """Return the currently used hash type.""" return self._hash_type - def get_hashkey(self): + def get_hashkey(self) -> Optional[str]: """Return the object hash key. Return None if the stream wasn't opened yet.""" return self._hashkey - def __enter__(self): + def __enter__(self) -> "HashWriterWrapper": """Start creating a new object in a context manager. You will get access access to a file-like object. @@ -157,9 +192,7 @@ def __enter__(self): raise OSError("You have already opened this ObjectWriter instance") if self._stored: raise ModificationNotAllowed( - "You have already tried to store this object '{}'".format( - self.get_hashkey() - ) + f"You have already tried to store this object '{self.get_hashkey()}'" ) # Create a new uniquely-named file in the sandbox. # It seems faster than using a NamedTemporaryFile, see benchmarks. @@ -169,15 +202,16 @@ def __enter__(self): ) return self._filehandle - def __exit__( - self, exc_type, value, traceback - ): # pylint: disable=too-many-branches, too-many-statements + def __exit__( # pylint: disable=too-many-branches, too-many-statements + self, exc_type: Any, value: Any, traceback: Any + ) -> None: """ Close the file object, and move it from the sandbox to the loose object folder, possibly using sharding if loose_prexix_len is not 0. """ try: if exc_type is None: + assert self._filehandle is not None and self._obj_path is not None if self._filehandle.closed: raise ClosingNotAllowed( "You cannot close the file handle yourself!" @@ -325,10 +359,10 @@ def __exit__( if self._filehandle is not None and not self._filehandle.closed: self._filehandle.close() - if os.path.exists(self._obj_path): + if self._obj_path and os.path.exists(self._obj_path): os.remove(self._obj_path) - def _store_duplicate_copy(self, source_file, hashkey): + def _store_duplicate_copy(self, source_file: str, hashkey: str) -> None: """This function is called (on Windows) when trying to store a file that already exists. In the `clean_storage` I will clean up old copies if the hash matches. @@ -345,18 +379,39 @@ class PackedObjectReader: """A class to read from a pack file. This ensures that the .read() method works and does not go beyond the - length of the given object.""" + length of the given object. + """ + + def __init__(self, fhandle: StreamSeekBytesType, offset: int, length: int) -> None: + """ + Initialises the reader to a pack file. + + :param fhandle: an open handle to the pack file, must be opened in read and binary mode. + :param offset: the integer offset where in the fhandle where the object starts. + :param length: the integer length of the byte stream. + The read() method will ensure that you never go beyond the given length. + """ + assert "b" in fhandle.mode + assert "r" in fhandle.mode + + self._fhandle = fhandle + self._offset = offset + self._length = length + + # Move to the offset position, and keep track of the internal position + self._fhandle.seek(self._offset) + self._update_pos() @property - def mode(self): + def mode(self) -> str: return self._fhandle.mode - @property - def seekable(self): + @staticmethod + def seekable() -> bool: """Return whether object supports random access.""" return True - def seek(self, target, whence=0): + def seek(self, target: int, whence: int = 0) -> int: """Change stream position. Note that contrary to a standard file, also seeking beyond the borders will raise a ValueError. @@ -387,31 +442,11 @@ def seek(self, target, whence=0): # seek returns the new absolute position return target - def tell(self): + def tell(self) -> int: """Return current stream position, relative to the internal offset.""" return self._fhandle.tell() - self._offset - def __init__(self, fhandle, offset, length): - """ - Initialises the reader to a pack file. - - :param fhandle: an open handle to the pack file, must be opened in read and binary mode. - :param offset: the integer offset where in the fhandle where the object starts. - :param length: the integer length of the byte stream. - The read() method will ensure that you never go beyond the given length. - """ - assert "b" in fhandle.mode - assert "r" in fhandle.mode - - self._fhandle = fhandle - self._offset = offset - self._length = length - - # Move to the offset position, and keep track of the internal position - self._fhandle.seek(self._offset) - self._update_pos() - - def _update_pos(self): + def _update_pos(self) -> None: """Update the internal position variable with the correct value. This function must be called (internally) after any operation that @@ -426,7 +461,7 @@ def _update_pos(self): "PackedObjectReader didn't manage to prevent to go beyond the length (in the negative direction)!" ) - def read(self, size=-1): + def read(self, size: int = -1) -> bytes: """ Read and return up to n bytes. @@ -465,20 +500,49 @@ class CallbackStreamWrapper: Should be used only for streams open in read mode. """ + def __init__( + self, + stream: StreamSeekBytesType, + callback: Optional[Callable], + total_length: int = 0, + description: str = "Streamed object", + ) -> None: + """ + Initialises the reader to a given stream. + + :param stream: an open stream + :param callback: a callback to call to update the status (or None if not needed) + :param total_length: the expected length + """ + self._stream = stream + self._callback = callback + self._total_length = total_length + self._description = description + + # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. + self._update_every: int = max(int(total_length / 400), 1) if total_length else 1 + # Counter of how many objects have been since since the last update. + # A new callback will be performed when this value is > update_every. + self._since_last_update: int = 0 + if self._callback: + # If we have a callback, compute the total count of objects in this pack + self._callback( + action="init", value={"total": total_length, "description": description} + ) + @property - def mode(self): + def mode(self) -> str: return self._stream.mode - @property - def seekable(self): + def seekable(self) -> bool: """Return whether object supports random access.""" - return self._stream.seekable + return self._stream.seekable() - def seek(self, target, whence=0): + def seek(self, target: int, whence: int = 0) -> int: """Change stream position.""" if target > self.tell(): if self._callback: - self._since_last_update += target - self.tell() + self._since_last_update = self._since_last_update + target - self.tell() if self._since_last_update >= self._update_every: self._callback(action="update", value=self._since_last_update) self._since_last_update = 0 @@ -500,35 +564,11 @@ def seek(self, target, whence=0): return self._stream.seek(target, whence) - def tell(self): + def tell(self) -> int: """Return current stream position.""" return self._stream.tell() - def __init__(self, stream, callback, total_length=0, description="Streamed object"): - """ - Initialises the reader to a given stream. - - :param stream: an open stream - :param callback: a callback to call to update the status (or None if not needed) - :param total_length: the expected length - """ - self._stream = stream - self._callback = callback - self._total_length = total_length - self._description = description - - if self._callback: - # If we have a callback, compute the total count of objects in this pack - self._callback( - action="init", value={"total": total_length, "description": description} - ) - # Update at most 400 times, avoiding to increase CPU usage; if the list is small: every object. - self._update_every = max(int(total_length / 400), 1) if total_length else 1 - # Counter of how many objects have been since since the last update. - # A new callback will be performed when this value is > update_every. - self._since_last_update = 0 - - def read(self, size=-1): + def read(self, size: int = -1) -> bytes: """ Read and return up to n bytes. @@ -554,8 +594,8 @@ def __enter__(self) -> "CallbackStreamWrapper": def __exit__(self, exc_type, value, traceback) -> None: """Close context manager.""" - - def close_callback(self): + + def close_callback(self) -> None: """ Call the wrap up closing calls for the callback. @@ -569,7 +609,9 @@ def close_callback(self): self._callback(action="close", value=None) -def rename_callback(callback, new_description): +def rename_callback( + callback: Optional[Callable], new_description: str +) -> Optional[Callable]: """Given a callback, return a new one where the description will be changed to `new_name`. Works even if `callback` is None (in this case, it returns None). @@ -585,8 +627,8 @@ def wrapper_callback(action, value): if action == "init": new_value = value.copy() new_value["description"] = new_description - return callback(action, new_value) - return callback(action, value) + return callback(action, new_value) # type: ignore + return callback(action, value) # type: ignore return wrapper_callback @@ -601,23 +643,7 @@ class ZlibLikeBaseStreamDecompresser(abc.ABC): _CHUNKSIZE = 524288 - @property - @abc.abstractmethod - def decompressobj_class(self): - """Return here the `decompressobj` class of the given compression type. - - Needs to be implemented by subclasses. - """ - - @property - @abc.abstractmethod - def decompress_error(self): - """Return here the Exception (or tuple of exceptions) that need to be caught if there is a compression error. - - Needs to be implemented by subclasses. - """ - - def __init__(self, compressed_stream): + def __init__(self, compressed_stream: StreamSeekBytesType) -> None: """Create the class from a given compressed bytestream. :param compressed_stream: an open bytes stream that supports the .read() method, @@ -628,7 +654,11 @@ def __init__(self, compressed_stream): self._internal_buffer = b"" self._pos = 0 - def read(self, size=-1): + @property + def mode(self) -> str: + return getattr(self._compressed_stream, "mode", "rb") + + def read(self, size: int = -1) -> bytes: """ Read and return up to n bytes. @@ -702,15 +732,31 @@ def __exit__(self, exc_type, value, traceback) -> None: """Close context manager.""" @property - def seekable(self): + @abc.abstractmethod + def decompressobj_class(self): + """Return here the `decompressobj` class of the given compression type. + + Needs to be implemented by subclasses. + """ + + @property + @abc.abstractmethod + def decompress_error(self): + """Return here the Exception (or tuple of exceptions) that need to be caught if there is a compression error. + + Needs to be implemented by subclasses. + """ + + @staticmethod + def seekable() -> bool: """Return whether object supports random access.""" return True - def tell(self): + def tell(self) -> int: """Return current position in file.""" return self._pos - def seek(self, target, whence=0): + def seek(self, target: int, whence: int = 0) -> int: """Change stream position. ..note:: This is particularly inefficient if `target > 0` since it will have @@ -758,17 +804,17 @@ class ZlibStreamDecompresser(ZlibLikeBaseStreamDecompresser): uncompressed bytes when being read via the .read() method.""" @property - def decompressobj_class(self): + def decompressobj_class(self) -> Callable: """Return the `decompressobj` class of zlib.""" return zlib.decompressobj @property - def decompress_error(self): + def decompress_error(self) -> Type[error]: """Return the zlib error raised when there is an error.""" return zlib.error -def _get_compression_algorithm_info(algorithm): +def _get_compression_algorithm_info(algorithm: str): """Return a compresser and a decompresser for the given algorithm.""" known_algorithms = { "zlib": { @@ -789,9 +835,9 @@ def _get_compression_algorithm_info(algorithm): ) try: kwargs = { - algorithm_info["variant_name"]: algorithm_info["variant_mapper"][variant] + algorithm_info["variant_name"]: algorithm_info["variant_mapper"][variant] # type: ignore } - compresser = algorithm_info["compressobj"](**kwargs) + compresser = algorithm_info["compressobj"](**kwargs) # type: ignore except KeyError: # pylint: disable=raise-missing-from raise ValueError( @@ -803,7 +849,7 @@ def _get_compression_algorithm_info(algorithm): return compresser, decompresser -def get_compressobj_instance(algorithm): +def get_compressobj_instance(algorithm: str): """Return a compressobj class with a given algorithm. :param algorithm: A string defining the algorithm and its variant. @@ -814,7 +860,7 @@ def get_compressobj_instance(algorithm): return _get_compression_algorithm_info(algorithm)[0] -def get_stream_decompresser(algorithm): +def get_stream_decompresser(algorithm: str) -> Type[ZlibStreamDecompresser]: """Return a StreamDecompresser class with a given algorithm. :param algorithm: a compression algorithm (see `get_compressionobj_instance` for a description). @@ -825,7 +871,7 @@ def get_stream_decompresser(algorithm): class ZeroStream: """A class to return an (unseekable) stream returning only zeros, with length length.""" - def __init__(self, length): + def __init__(self, length: int) -> None: """ Initialises the object and specifies the expected length. @@ -835,7 +881,11 @@ def __init__(self, length): self._length = length self._pos = 0 - def read(self, size=-1): + @property + def mode(self) -> str: + return "rb" + + def read(self, size: int = -1) -> bytes: """ Read and return up to n bytes (composed only of zeros). @@ -861,7 +911,7 @@ def read(self, size=-1): return stream -def is_known_hash(hash_type): +def is_known_hash(hash_type: str) -> bool: """Return True if the hash_type is known, False otherwise.""" try: get_hash(hash_type) @@ -870,7 +920,7 @@ def is_known_hash(hash_type): return False -def get_hash(hash_type): +def get_hash(hash_type: str) -> Callable: """Return a hash class with an update method and a hexdigest method.""" known_hashes = {"sha1": hashlib.sha1, "sha256": hashlib.sha256} @@ -881,7 +931,7 @@ def get_hash(hash_type): raise ValueError(f"Unknown or unsupported hash type '{hash_type}'") -def _compute_hash_for_filename(filename, hash_type): +def _compute_hash_for_filename(filename: str, hash_type: str) -> Optional[str]: """Return the hash for the given file. Will read the file in chunks. @@ -911,7 +961,7 @@ def _compute_hash_for_filename(filename, hash_type): class HashWriterWrapper: """A class that gets a stream open in write mode and wraps it in a new class that computes a hash while writing.""" - def __init__(self, write_stream, hash_type): + def __init__(self, write_stream: BinaryIO, hash_type: str) -> None: """Create the class from a given compressed bytestream. :param write_stream: an open bytes stream that supports the .write() method. @@ -927,15 +977,15 @@ def __init__(self, write_stream, hash_type): self._position = self._write_stream.tell() @property - def hash_type(self): + def hash_type(self) -> str: """Return the currently used hash type.""" return self._hash_type - def flush(self): + def flush(self) -> None: """Flush the internal I/O buffer.""" self._write_stream.flush() - def write(self, data): + def write(self, data: bytes) -> int: """ Write binary data to the underlying write_stream object, and compute a hash at the same time. """ @@ -959,26 +1009,27 @@ def write(self, data): # Update the hash information self._hash.update(data) + return self._position - def hexdigest(self): + def hexdigest(self) -> str: """Return the hexdigest of the hash computed until now.""" return self._hash.hexdigest() @property - def closed(self): + def closed(self) -> bool: """Return True if the underlying file is closed.""" return self._write_stream.closed - def close(self): + def close(self) -> None: """Close the underlying file.""" self._write_stream.close() @property - def mode(self): + def mode(self) -> str: """Return a string with the mode the file was open with.""" return self._write_stream.mode - def fileno(self): + def fileno(self) -> int: """Return the integer file descriptor of the underlying file object.""" return self._write_stream.fileno() @@ -998,7 +1049,11 @@ def chunk_iterator(iterator, size): return iter(lambda: tuple(itertools.islice(iterator, size)), ()) -def safe_flush_to_disk(fhandle, real_path, use_fullsync=False): +def safe_flush_to_disk( + fhandle: Union[StreamWriteBytesType, HashWriterWrapper], + real_path: str, + use_fullsync: bool = False, +) -> None: """Tries to to its best to safely commit to disk. Note that calling this is needed to reduce the risk of data loss due to, e.g., a power failure. @@ -1016,7 +1071,9 @@ def safe_flush_to_disk(fhandle, real_path, use_fullsync=False): fhandle.flush() # Default fsync function, replaced on Mac OS X - _fsync_function = lambda fileno: os.fsync( # pylint: disable=unnecessary-lambda + _fsync_function: Callable[ + [Any], Any + ] = lambda fileno: os.fsync( # pylint: disable=unnecessary-lambda fileno ) @@ -1049,7 +1106,10 @@ def safe_flush_to_disk(fhandle, real_path, use_fullsync=False): os.close(dirfd) -def compute_hash_and_size(stream, hash_type): +def compute_hash_and_size( + stream: StreamReadBytesType, + hash_type: str, +) -> Tuple[str, int]: """Given a stream and a hash type, return the hash key (hexdigest) and the total size. :param stream: an open stream @@ -1072,9 +1132,11 @@ def compute_hash_and_size(stream, hash_type): return hasher.hexdigest(), size -def detect_where_sorted( - left_iterator, right_iterator, left_key=None -): # pylint: disable=too-many-branches, too-many-statements +def detect_where_sorted( # pylint: disable=too-many-branches, too-many-statements + left_iterator: Iterable[Any], + right_iterator: Iterable[Any], + left_key: Optional[Callable] = None, +) -> Iterator[Tuple[Any, Location]]: """Generator that loops in alternation (but only once each) the two iterators and yields an element, specifying if it's only on the left, only on the right, or in both. @@ -1203,13 +1265,13 @@ def detect_where_sorted( now_left = new_now_left -def yield_first_element(iterator): +def yield_first_element(iterator: Union[ResultProxy, zip]) -> Iterator[Union[str, int]]: """Given an iterator that returns a tuple, return an iterator that yields only the first element of the tuple.""" for elem in iterator: yield elem[0] -def merge_sorted(iterator1, iterator2): +def merge_sorted(iterator1: Iterable[Any], iterator2: Iterable[Any]) -> Iterator[Any]: """Given two sorted iterators, return another sorted iterator being the union of the two.""" for item, _ in detect_where_sorted(iterator1, iterator2): # Whereever it is (only left, only right, on both) I return the object. diff --git a/pyproject.toml b/pyproject.toml index 4d7302e..a1b0cf3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,3 +25,10 @@ docstring-min-length = 5 [tool.pylint.design] max-locals = 20 + +[tool.mypy] +show_error_codes = true +check_untyped_defs = true +scripts_are_modules = true +warn_unused_ignores = true +warn_redundant_casts = true diff --git a/requirements.lock b/requirements.lock index 56c8149..16bc686 100644 --- a/requirements.lock +++ b/requirements.lock @@ -75,8 +75,10 @@ toml==0.10.2 # pre-commit # pytest # pytest-cov -typing-extensions==3.10.0.0 - # via importlib-metadata +typing-extensions==3.10.0.0 ; python_version < "3.8" + # via + # disk-objectstore (setup.py) + # importlib-metadata virtualenv==20.7.2 # via pre-commit zipp==3.5.0 diff --git a/setup.cfg b/setup.cfg index dc836bb..3fc4a54 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ keywords = object store, repository, file store packages = find: install_requires = sqlalchemy<1.4 + typing-extensions;python_version < '3.8' python_requires = ~=3.7 include_package_data = True diff --git a/tests/concurrent_tests/periodic_worker.py b/tests/concurrent_tests/periodic_worker.py index 2bea791..962826b 100755 --- a/tests/concurrent_tests/periodic_worker.py +++ b/tests/concurrent_tests/periodic_worker.py @@ -189,7 +189,11 @@ def main( with container.get_objects_stream_and_meta( all_hashkeys, skip_if_missing=False ) as triplets: - for obj_hashkey, stream, meta in triplets: + for ( + obj_hashkey, + stream, + meta, + ) in triplets: # pylint: disable=not-an-iterable if stream is None: retrieved_content[obj_hashkey] = None else: diff --git a/tests/test_container.py b/tests/test_container.py index a4b4bee..0a167a6 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -413,8 +413,8 @@ def test_directly_to_pack_streamed( file_path = os.path.join(temp_dir2, key) with open(file_path, "bw") as fhandle: fhandle.write(content) - streams.append(utils.LazyOpener(file_path, mode="rb")) - streams_copy.append(utils.LazyOpener(file_path, mode="rb")) + streams.append(utils.LazyOpener(file_path)) + streams_copy.append(utils.LazyOpener(file_path)) obj_hashkeys = temp_container.add_streamed_objects_to_pack( streams, compress=use_compression, open_streams=True ) @@ -3165,7 +3165,7 @@ def test_packs_read_in_order(temp_dir): with temp_container.get_objects_stream_and_meta( hashkeys, skip_if_missing=False ) as triplets: - for _, _, meta in triplets: + for _, _, meta in triplets: # pylint: disable=not-an-iterable assert meta["type"] == ObjectType.PACKED if last_pack is None: last_pack = meta["pack_id"] diff --git a/tests/test_utils.py b/tests/test_utils.py index 9d0ded2..db20038 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -36,7 +36,7 @@ def test_lazy_opener_read(): try: current_process = psutil.Process() start_open_files = len(current_process.open_files()) - lazy = utils.LazyOpener(fhandle.name, mode="rb") + lazy = utils.LazyOpener(fhandle.name) assert lazy.path == fhandle.name assert lazy.mode == "rb" @@ -77,7 +77,7 @@ def test_lazy_opener_not_twice(): fhandle.write(content) try: - lazy = utils.LazyOpener(fhandle.name, mode="rb") + lazy = utils.LazyOpener(fhandle.name) # The first open should go through with lazy as fhandle: @@ -980,7 +980,7 @@ def test_packed_object_reader_seek(tmp_path): packed_reader = utils.PackedObjectReader(fhandle, offset=offset, length=length) # Check that it is properly marked as seekable - assert packed_reader.seekable + assert packed_reader.seekable() # Check that whence==2 is not implemented with pytest.raises(NotImplementedError): @@ -1045,6 +1045,20 @@ def test_packed_object_reader_mode(): assert reader.mode == handle.mode +@pytest.mark.parametrize("open_streams", [True, False]) +def test_packed_object_reader_add(temp_container, open_streams): + """Test using `PackedObjectReader` with add_streamed_object_to_pack.""" + content = b"content" + with tempfile.TemporaryFile(mode="rb+") as fhandle: + fhandle.write(content) + fhandle.seek(0) + wrapped = utils.PackedObjectReader(fhandle, 0, len(content)) + hashkey = temp_container.add_streamed_object_to_pack( + wrapped, open_streams=open_streams + ) + assert temp_container.get_object_content(hashkey) == content + + @pytest.mark.parametrize("compression_algorithm", COMPRESSION_ALGORITHMS_TO_TEST) def test_stream_decompresser(compression_algorithm): """Test the stream decompresser.""" @@ -1071,10 +1085,11 @@ def test_stream_decompresser(compression_algorithm): compressed_streams.append(io.BytesIO(compressed)) for original, compressed_stream in zip(original_data, compressed_streams): decompresser = StreamDecompresser(compressed_stream) + assert decompresser.mode == "rb" # Read in one chunk - assert ( - original == decompresser.read() - ), "Uncompressed data is wrong (single read)" + with decompresser as handle: + chunk = handle.read() + assert original == chunk # Redo the same, but do a read of zero bytes first, checking that # it returns a zero-length bytes, and that it does not move the offset @@ -1130,7 +1145,7 @@ def test_stream_decompresser_seek(compression_algorithm): decompresser = StreamDecompresser(compressed_stream) # Check the functionality is disabled - assert decompresser.seekable + assert decompresser.seekable() # Check that whence==2 is not implemented with pytest.raises(NotImplementedError): @@ -1491,7 +1506,7 @@ def test_callback_stream_wrapper_none(): # pylint: disable=invalid-name wrapped = utils.CallbackStreamWrapper(fhandle, callback=None) assert wrapped.mode == "rb+" - assert wrapped.seekable + assert wrapped.seekable() # Seek forward; read from byte 1 wrapped.seek(1) assert wrapped.tell() == 1 @@ -1528,7 +1543,7 @@ def test_callback_stream_wrapper(callback_instance, with_total_length): ) assert wrapped.mode == "rb+" - assert wrapped.seekable + assert wrapped.seekable() # Seek forward; read from byte 10 wrapped.seek(10) assert wrapped.tell() == 10 @@ -1571,6 +1586,20 @@ def test_callback_stream_wrapper(callback_instance, with_total_length): ] +@pytest.mark.parametrize("open_streams", [True, False]) +def test_callback_stream_wrapper_add(temp_container, open_streams): + """Test using callback stream wrapper with add_streamed_object_to_pack.""" + content = b"content" + with tempfile.TemporaryFile(mode="rb+") as fhandle: + fhandle.write(content) + fhandle.seek(0) + wrapped = utils.CallbackStreamWrapper(fhandle, None) + hashkey = temp_container.add_streamed_object_to_pack( + wrapped, open_streams=open_streams + ) + assert temp_container.get_object_content(hashkey) == content + + def test_rename_callback(callback_instance): """Check the rename_callback function.""" old_description = "original description"