Skip to content

Commit

Permalink
refactor(iolibs): remove unnecessary methods and improve word process…
Browse files Browse the repository at this point in the history
…ing functions
  • Loading branch information
entelecheia committed Aug 12, 2023
1 parent a1f47b2 commit 0c80f92
Showing 1 changed file with 49 additions and 200 deletions.
249 changes: 49 additions & 200 deletions src/hyfi/utils/iolibs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
import shutil
import stat
import sys
import tempfile
import time
import warnings
from glob import glob
from pathlib import Path, PosixPath, WindowsPath
from types import TracebackType
from typing import Callable, Iterator, List, Optional, Tuple, Union

import gdown

from hyfi.cached_path import _cached_path
from hyfi.utils.logging import LOGGING
from hyfi.utils.types import PathLikeType

Expand Down Expand Up @@ -400,171 +395,13 @@ def filelist(f):

return dest, filelist(f)

@staticmethod
def cached_path(
url_or_filename: str,
extract_archive: bool = False,
force_extract: bool = False,
return_parent_dir: bool = False,
cache_dir: str = "",
verbose: bool = False,
):
"""
Attempts to cache a file or URL and return the path to the cached file.
If required libraries 'cached_path' and 'gdown' are not installed, raises an ImportError.
Args:
url_or_filename (str): The URL or filename to be cached.
extract_archive (bool, optional): Whether to extract the file if it's an archive. Defaults to False.
force_extract (bool, optional): Whether to force extraction even if the destination already exists. Defaults to False.
return_parent_dir (bool, optional): If True, returns the parent directory of the cached file. Defaults to False.
cache_dir (str, optional): Directory to store cached files. Defaults to None.
verbose (bool, optional): Whether to print informative messages during the process. Defaults to False.
Raises:
ImportError: If the required libraries 'cached_path' and 'gdown' are not imported.
Returns:
str: Path to the cached file or its parent directory, depending on the 'return_parent_dir' parameter.
"""
if not url_or_filename:
logger.warning("url_or_filename not provided")
return None
if verbose:
logger.info(
"caching path: %s, extract_archive: %s, force_extract: %s, cache_dir: %s",
url_or_filename,
extract_archive,
force_extract,
cache_dir,
)

try:
if url_or_filename.startswith("gd://"):
_path = IOLIBs.cached_gdown(
url_or_filename,
verbose=verbose,
extract_archive=extract_archive,
force_extract=force_extract,
cache_dir=cache_dir,
)
_path = Path(_path) if isinstance(_path, str) else None
else:
if _cached_path is None:
raise ImportError(
"Error importing required libraries 'cached-path'. "
"Please install them using 'pip install cached-path' and try again."
)

if cache_dir:
cache_dir = str(Path(cache_dir) / "cached_path")
else:
cache_dir = str(Path.home() / ".hyfi" / ".cache" / "cached_path")

_path = _cached_path.cached_path(
url_or_filename,
extract_archive=extract_archive,
force_extract=force_extract,
cache_dir=cache_dir,
)

logger.debug("cached path: %s", _path)

if _path and _path.is_file():
_parent_dir = Path(_path).parent
elif _path and _path.is_dir():
_parent_dir = Path(_path)
else:
logger.warning("Unknown path: %s", _path)
return None

return _parent_dir.as_posix() if return_parent_dir else _path
except Exception as e:
logger.error(e)
return None

@staticmethod
def cached_gdown(
url: str,
verbose: bool = False,
extract_archive: bool = False,
force_extract: bool = False,
cache_dir: str = "",
):
"""
:type url: str
ex) gd://id:path
:type verbose: bool
:type extract_archive: bool
:type force_extract: bool
:type cache_dir: str
:returns: str
"""
if gdown is None:
raise ImportError(
"Error importing required libraries 'gdown'. "
"Please install them using 'pip install gdown' and try again."
)

if verbose:
logger.info("Downloading %s...", url)
if cache_dir:
cache_dir_ = Path(cache_dir) / "gdown"
else:
cache_dir_ = Path.home() / ".hyfi" / ".cache" / "gdown"
cache_dir_.mkdir(parents=True, exist_ok=True)

gd_prefix = "gd://"
if url.startswith(gd_prefix):
url = url[len(gd_prefix) :]
_url = url.split(":")
if len(_url) == 2:
id_, path = _url
else:
id_ = _url[0]
path = id_

# If we're using the path!c/d/file.txt syntax, handle it here.
fname = None
extraction_path = path
exclamation_index = path.find("!")
if extract_archive and exclamation_index >= 0:
extraction_path = path[:exclamation_index]
fname = path[exclamation_index + 1 :]

cache_path = cache_dir_ / f".{id_}" / extraction_path
cache_path.parent.mkdir(parents=True, exist_ok=True)

cache_path = gdown.cached_download(
id=id_,
path=cache_path.as_posix(),
quiet=not verbose,
)

if extract_archive:
extraction_path, files = IOLIBs.extractall(
cache_path, force_extract=force_extract
)

if not fname or not files:
return extraction_path
for f in files:
if f.endswith(fname):
return f
return cache_path

else:
logger.warning("Unknown url: %s", url)
return None

@staticmethod
def save_wordlist(
words: List[str],
filepath: Union[str, PosixPath, WindowsPath, Path],
sort: bool = True,
verbose: bool = True,
encoding="utf-8",
**kwargs,
encoding: str = "utf-8",
):
"""Save the word list to the file."""
if sort:
Expand All @@ -588,20 +425,60 @@ def load_wordlist(
ngram_delimiter: str = ";",
remove_delimiter: bool = False,
verbose: bool = True,
encoding="utf-8",
**kwargs,
encoding: str = "utf-8",
) -> List[str]:
"""Load the word list from the file."""
filepath = Path(filepath)
if filepath.is_file():
with open(filepath, encoding=encoding) as fo_:
words = [
word.strip().split()[0] for word in fo_ if len(word.strip()) > 0
]
else:
if not filepath.is_file():
logger.warning("File not found: %s", filepath)
return []

with open(filepath, encoding=encoding) as fo_:
words = [word.strip().split()[0] for word in fo_ if len(word.strip()) > 0]

if verbose:
logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words))

return IOLIBs.process_wordlist(
words,
sort=sort,
lowercase=lowercase,
unique=unique,
remove_tag=remove_tag,
max_ngram_to_include=max_ngram_to_include,
ngram_delimiter=ngram_delimiter,
remove_delimiter=remove_delimiter,
verbose=verbose,
)

@staticmethod
def process_wordlist(
words: List[str],
sort: bool = True,
lowercase: bool = False,
unique: bool = True,
remove_tag: bool = False,
max_ngram_to_include: Optional[int] = None,
ngram_delimiter: str = ";",
remove_delimiter: bool = False,
verbose: bool = True,
) -> List[str]:
"""Preprocess the word list.
Args:
words (List[str]): List of words.
sort (bool, optional): Sort the words. Defaults to True.
lowercase (bool, optional): Convert the words to lowercase. Defaults to False.
unique (bool, optional): Remove duplicate words. Defaults to True.
remove_tag (bool, optional): Remove the tag from the words. Defaults to False.
max_ngram_to_include (Optional[int], optional): Maximum ngram to include. Defaults to None.
ngram_delimiter (str, optional): Delimiter for ngram. Defaults to ";".
remove_delimiter (bool, optional): Remove the delimiter. Defaults to False.
verbose (bool, optional): Show the progress. Defaults to True.
Returns:
List[str]: List of words.
"""
if remove_delimiter:
words = [word.replace(ngram_delimiter, "") for word in words]
if max_ngram_to_include:
Expand All @@ -610,8 +487,6 @@ def load_wordlist(
for word in words
if len(word.split(ngram_delimiter)) <= max_ngram_to_include
]
if verbose:
logger.info("Loaded the file: %s, No. of words: %s", filepath, len(words))

if remove_tag:
words = [word.split("/")[0] for word in words]
Expand Down Expand Up @@ -677,29 +552,3 @@ def remove_duplicates_from_list_of_dicts(
new_data.append(d)
seen.add(d[key])
return new_data


# See https://github.com/copier-org/copier/issues/345
class TemporaryDirectory(tempfile.TemporaryDirectory):
"""A custom version of `tempfile.TemporaryDirectory` that handles read-only files better.
On Windows, before Python 3.8, `shutil.rmtree` does not handle read-only files very well.
This custom class makes use of a [special error handler][copier.tools.handle_remove_readonly]
to make sure that a temporary directory containing read-only files (typically created
when git-cloning a repository) is properly cleaned-up (i.e. removed) after using it
in a context manager.
"""

@classmethod
def _cleanup(cls, name, warn_message):
cls._robust_cleanup(name)
warnings.warn(warn_message, ResourceWarning)

def cleanup(self):
"""Remove directory safely."""
if self._finalizer.detach(): # type: ignore
self._robust_cleanup(self.name)

@staticmethod
def _robust_cleanup(name):
shutil.rmtree(name, ignore_errors=False, onerror=IOLIBs.handle_remove_readonly)

0 comments on commit 0c80f92

Please sign in to comment.