diff --git a/src/hyfi/utils/iolibs.py b/src/hyfi/utils/iolibs.py index a1d0ff75..6508bba2 100644 --- a/src/hyfi/utils/iolibs.py +++ b/src/hyfi/utils/iolibs.py @@ -6,17 +6,12 @@ import shutil import stat import sys -import tempfile import time -import warnings from glob import glob from pathlib import Path, PosixPath, WindowsPath from types import TracebackType from typing import Callable, Iterator, List, Optional, Tuple, Union -import gdown - -from hyfi.cached_path import _cached_path from hyfi.utils.logging import LOGGING from hyfi.utils.types import PathLikeType @@ -400,171 +395,13 @@ def filelist(f): return dest, filelist(f) - @staticmethod - def cached_path( - url_or_filename: str, - extract_archive: bool = False, - force_extract: bool = False, - return_parent_dir: bool = False, - cache_dir: str = "", - verbose: bool = False, - ): - """ - Attempts to cache a file or URL and return the path to the cached file. - If required libraries 'cached_path' and 'gdown' are not installed, raises an ImportError. - - Args: - url_or_filename (str): The URL or filename to be cached. - extract_archive (bool, optional): Whether to extract the file if it's an archive. Defaults to False. - force_extract (bool, optional): Whether to force extraction even if the destination already exists. Defaults to False. - return_parent_dir (bool, optional): If True, returns the parent directory of the cached file. Defaults to False. - cache_dir (str, optional): Directory to store cached files. Defaults to None. - verbose (bool, optional): Whether to print informative messages during the process. Defaults to False. - - Raises: - ImportError: If the required libraries 'cached_path' and 'gdown' are not imported. - - Returns: - str: Path to the cached file or its parent directory, depending on the 'return_parent_dir' parameter. - """ - if not url_or_filename: - logger.warning("url_or_filename not provided") - return None - if verbose: - logger.info( - "caching path: %s, extract_archive: %s, force_extract: %s, cache_dir: %s", - url_or_filename, - extract_archive, - force_extract, - cache_dir, - ) - - try: - if url_or_filename.startswith("gd://"): - _path = IOLIBs.cached_gdown( - url_or_filename, - verbose=verbose, - extract_archive=extract_archive, - force_extract=force_extract, - cache_dir=cache_dir, - ) - _path = Path(_path) if isinstance(_path, str) else None - else: - if _cached_path is None: - raise ImportError( - "Error importing required libraries 'cached-path'. " - "Please install them using 'pip install cached-path' and try again." - ) - - if cache_dir: - cache_dir = str(Path(cache_dir) / "cached_path") - else: - cache_dir = str(Path.home() / ".hyfi" / ".cache" / "cached_path") - - _path = _cached_path.cached_path( - url_or_filename, - extract_archive=extract_archive, - force_extract=force_extract, - cache_dir=cache_dir, - ) - - logger.debug("cached path: %s", _path) - - if _path and _path.is_file(): - _parent_dir = Path(_path).parent - elif _path and _path.is_dir(): - _parent_dir = Path(_path) - else: - logger.warning("Unknown path: %s", _path) - return None - - return _parent_dir.as_posix() if return_parent_dir else _path - except Exception as e: - logger.error(e) - return None - - @staticmethod - def cached_gdown( - url: str, - verbose: bool = False, - extract_archive: bool = False, - force_extract: bool = False, - cache_dir: str = "", - ): - """ - :type url: str - ex) gd://id:path - :type verbose: bool - :type extract_archive: bool - :type force_extract: bool - :type cache_dir: str - :returns: str - """ - if gdown is None: - raise ImportError( - "Error importing required libraries 'gdown'. " - "Please install them using 'pip install gdown' and try again." - ) - - if verbose: - logger.info("Downloading %s...", url) - if cache_dir: - cache_dir_ = Path(cache_dir) / "gdown" - else: - cache_dir_ = Path.home() / ".hyfi" / ".cache" / "gdown" - cache_dir_.mkdir(parents=True, exist_ok=True) - - gd_prefix = "gd://" - if url.startswith(gd_prefix): - url = url[len(gd_prefix) :] - _url = url.split(":") - if len(_url) == 2: - id_, path = _url - else: - id_ = _url[0] - path = id_ - - # If we're using the path!c/d/file.txt syntax, handle it here. - fname = None - extraction_path = path - exclamation_index = path.find("!") - if extract_archive and exclamation_index >= 0: - extraction_path = path[:exclamation_index] - fname = path[exclamation_index + 1 :] - - cache_path = cache_dir_ / f".{id_}" / extraction_path - cache_path.parent.mkdir(parents=True, exist_ok=True) - - cache_path = gdown.cached_download( - id=id_, - path=cache_path.as_posix(), - quiet=not verbose, - ) - - if extract_archive: - extraction_path, files = IOLIBs.extractall( - cache_path, force_extract=force_extract - ) - - if not fname or not files: - return extraction_path - for f in files: - if f.endswith(fname): - return f - return cache_path - - else: - logger.warning("Unknown url: %s", url) - return None - @staticmethod def save_wordlist( words: List[str], filepath: Union[str, PosixPath, WindowsPath, Path], sort: bool = True, verbose: bool = True, - encoding="utf-8", - **kwargs, + encoding: str = "utf-8", ): """Save the word list to the file.""" if sort: @@ -588,20 +425,60 @@ def load_wordlist( ngram_delimiter: str = ";", remove_delimiter: bool = False, verbose: bool = True, - encoding="utf-8", - **kwargs, + encoding: str = "utf-8", ) -> List[str]: """Load the word list from the file.""" filepath = Path(filepath) - if filepath.is_file(): - with open(filepath, encoding=encoding) as fo_: - words = [ - word.strip().split()[0] for word in fo_ if len(word.strip()) > 0 - ] - else: + if not filepath.is_file(): logger.warning("File not found: %s", filepath) return [] + with open(filepath, encoding=encoding) as fo_: + words = [word.strip().split()[0] for word in fo_ if len(word.strip()) > 0] + + if verbose: + logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words)) + + return IOLIBs.process_wordlist( + words, + sort=sort, + lowercase=lowercase, + unique=unique, + remove_tag=remove_tag, + max_ngram_to_include=max_ngram_to_include, + ngram_delimiter=ngram_delimiter, + remove_delimiter=remove_delimiter, + verbose=verbose, + ) + + @staticmethod + def process_wordlist( + words: List[str], + sort: bool = True, + lowercase: bool = False, + unique: bool = True, + remove_tag: bool = False, + max_ngram_to_include: Optional[int] = None, + ngram_delimiter: str = ";", + remove_delimiter: bool = False, + verbose: bool = True, + ) -> List[str]: + """Preprocess the word list. + + Args: + words (List[str]): List of words. + sort (bool, optional): Sort the words. Defaults to True. + lowercase (bool, optional): Convert the words to lowercase. Defaults to False. + unique (bool, optional): Remove duplicate words. Defaults to True. + remove_tag (bool, optional): Remove the tag from the words. Defaults to False. + max_ngram_to_include (Optional[int], optional): Maximum ngram to include. Defaults to None. + ngram_delimiter (str, optional): Delimiter for ngram. Defaults to ";". + remove_delimiter (bool, optional): Remove the delimiter. Defaults to False. + verbose (bool, optional): Show the progress. Defaults to True. + + Returns: + List[str]: List of words. + """ if remove_delimiter: words = [word.replace(ngram_delimiter, "") for word in words] if max_ngram_to_include: @@ -610,8 +487,6 @@ def load_wordlist( for word in words if len(word.split(ngram_delimiter)) <= max_ngram_to_include ] - if verbose: - logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words)) if remove_tag: words = [word.split("/")[0] for word in words] @@ -677,29 +552,3 @@ def remove_duplicates_from_list_of_dicts( new_data.append(d) seen.add(d[key]) return new_data - - -# See https://github.com/copier-org/copier/issues/345 -class TemporaryDirectory(tempfile.TemporaryDirectory): - """A custom version of `tempfile.TemporaryDirectory` that handles read-only files better. - - On Windows, before Python 3.8, `shutil.rmtree` does not handle read-only files very well. - This custom class makes use of a [special error handler][copier.tools.handle_remove_readonly] - to make sure that a temporary directory containing read-only files (typically created - when git-cloning a repository) is properly cleaned-up (i.e. removed) after using it - in a context manager. - """ - - @classmethod - def _cleanup(cls, name, warn_message): - cls._robust_cleanup(name) - warnings.warn(warn_message, ResourceWarning) - - def cleanup(self): - """Remove directory safely.""" - if self._finalizer.detach(): # type: ignore - self._robust_cleanup(self.name) - - @staticmethod - def _robust_cleanup(name): - shutil.rmtree(name, ignore_errors=False, onerror=IOLIBs.handle_remove_readonly)