Skip to content

Commit

Permalink
working on types
Browse files Browse the repository at this point in the history
  • Loading branch information
realSAH committed Aug 28, 2023
1 parent 92bffe0 commit 11d64fe
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 43 deletions.
28 changes: 14 additions & 14 deletions myresources/crocodile/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@


# ============================== Accessories ============================================
def validate_name(astring: str, replace='_') -> str: return __import__("re").sub(r'[^-a-zA-Z0-9_.()]+', replace, str(astring))
def timestamp(fmt=None, name=None) -> datetime.datetime: return ((name + '_') if name is not None else '') + __import__("datetime").datetime.now().strftime(fmt or '%Y-%m-%d-%I-%M-%S-%p-%f') # isoformat is not compatible with file naming convention, fmt here is.
def validate_name(astring: str, replace: str = '_') -> str: return __import__("re").sub(r'[^-a-zA-Z0-9_.()]+', replace, str(astring))
def timestamp(fmt: Optional[str] = None, name: Optional[str] = None) -> datetime.datetime: return ((name + '_') if name is not None else '') + __import__("datetime").datetime.now().strftime(fmt or '%Y-%m-%d-%I-%M-%S-%p-%f') # isoformat is not compatible with file naming convention, fmt here is.
def str2timedelta(shift: str) -> datetime.timedelta: # Converts a human readable string like '1m' or '1d' to a timedate object. In essence, its gives a `2m` short for `pd.timedelta(minutes=2)`"""
key, val = {"s": "seconds", "m": "minutes", "h": "hours", "d": "days", "w": "weeks", "M": "months", "y": "years"}[shift[-1]], eval(shift[:-1])
key, val = ("days", val * 30) if key == "months" else (("weeks", val * 52) if key == "years" else (key, val)); return __import__("datetime").timedelta(**{key: val})
def install_n_import(library, package=None, **kwargs): # sometimes package name is different from import, e.g. skimage.
def install_n_import(library: str, package: Optional[str] = None, **kwargs): # sometimes package name is different from import, e.g. skimage.
try: return __import__(library, **kwargs)
except ImportError: __import__("subprocess").check_call([__import__("sys").executable, "-m", "pip", "install", package or library]); return __import__(library, **kwargs)
def randstr(length=10, lower=True, upper=True, digits=True, punctuation=False, safe=False, noun=False) -> str:
def randstr(length: int = 10, lower: bool = True, upper: bool = True, digits: bool = True, punctuation: bool = False, safe: bool = False, noun: bool = False) -> str:
if safe: return __import__("secrets").token_urlsafe(length) # interannly, it uses: random.SystemRandom or os.urandom which is hardware-based, not pseudo
if noun: return install_n_import("randomname").get_name()
string = __import__("string"); return ''.join(__import__("random").choices((string.ascii_lowercase if lower else "") + (string.ascii_uppercase if upper else "") + (string.digits if digits else "") + (string.punctuation if punctuation else ""), k=length))


def save_decorator(ext=""): # apply default paths, add extension to path, print the saved file path
def decorator(func):
def wrapper(obj, path: Union[str, Path, None] = None, verbose=True, add_suffix=False, desc="", class_name="", **kwargs):
def wrapper(obj: Any, path: Union[str, Path, None] = None, verbose: bool = True, add_suffix: bool = False, desc: str = "", class_name: str = "", **kwargs):
if path is None:
path = Path.home().joinpath("tmp_results/tmp_files").joinpath(randstr(noun=True))
_ = print(f"tb.core: Warning: Path not passed to {func}. A default path has been chosen: {Path(path).absolute().as_uri()}") if verbose else None
Expand All @@ -44,7 +44,7 @@ def wrapper(obj, path: Union[str, Path, None] = None, verbose=True, add_suffix=F


@save_decorator(".json")
def json(obj, path: Optional[str] = None, indent: Optional[str] = None, encoding='utf-8', **kwargs):
def json(obj: Any, path: Optional[str] = None, indent: Optional[str] = None, encoding='utf-8', **kwargs):
_ = encoding
return Path(path).write_text(__import__("json").dumps(obj, indent=indent, default=lambda x: x.__dict__, **kwargs), encoding="utf-8")
@save_decorator(".yml")
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_attributes(self, remove_base_attrs=True, return_objects=False, fields=Tr
return List([getattr(self, x) for x in attrs]) if return_objects else List(attrs)
def print(self, dtype=False, attrs=False, **kwargs): return Struct(self.__dict__).update(attrs=self.get_attributes() if attrs else None).print(dtype=dtype, **kwargs)
@staticmethod
def get_state(obj, repr_func=lambda x: x, exclude=None) -> dict: return repr_func(obj) if not any([hasattr(obj, "__getstate__"), hasattr(obj, "__dict__")]) else (tmp if type(tmp := obj.__getstate__() if hasattr(obj, "__getstate__") else obj.__dict__) is not dict else Struct(tmp).filter(lambda k, v: k not in (exclude or [])).apply2values(lambda k, v: Base.get_state(v, exclude=exclude, repr_func=repr_func)).__dict__)
def get_state(obj, repr_func=lambda x: x, exclude: Optional[list[str]] = None) -> dict[str, Any]: return repr_func(obj) if not any([hasattr(obj, "__getstate__"), hasattr(obj, "__dict__")]) else (tmp if type(tmp := obj.__getstate__() if hasattr(obj, "__getstate__") else obj.__dict__) is not dict else Struct(tmp).filter(lambda k, v: k not in (exclude or [])).apply2values(lambda k, v: Base.get_state(v, exclude=exclude, repr_func=repr_func)).__dict__)
def viz_composition_heirarchy(self, depth=3, obj=None, filt=None):
install_n_import("objgraph").show_refs([self] if obj is None else [obj], max_depth=depth, filename=str(filename := Path(__import__("tempfile").gettempdir()).joinpath("graph_viz_" + randstr(noun=True) + ".png")), filter=filt)
_ = __import__("os").startfile(str(filename.absolute())) if __import__("sys").platform == "win32" else None; return filename
Expand Down Expand Up @@ -149,7 +149,7 @@ def __getitem__(self, key: Union[int, list[int], slice]) -> Union[T, 'List[T]']:
if isinstance(key, list): return List(self[item] for item in key) # to allow fancy indexing like List[1, 5, 6]
# elif isinstance(key, str): return List(item[key] for item in self.list) # access keys like dictionaries.
elif isinstance(key, int): return self.list[key]
return List(self.list[key])
return List([self.list[key]])
def apply(self, func: Callable[[T], T2], *args: list[Any], other: Optional['List[T]'] = None, filt: Optional[Callable[[T], bool]] = lambda x: True, jobs: Optional[int] = None, prefer: Optional[str] = [None, 'processes', 'threads'][0], depth: int = 1, verbose: bool = False, desc: Optional[str] = None, **kwargs: dict[str, Any]) -> 'List[T2]':
if depth > 1: self.apply(lambda x: x.apply(func, *args, other=other, jobs=jobs, depth=depth - 1, **kwargs))
iterator = (self.list if not verbose else install_n_import("tqdm").tqdm(self.list, desc=desc)) if other is None else (zip(self.list, other) if not verbose else install_n_import("tqdm").tqdm(zip(self.list, other), desc=desc))
Expand All @@ -166,7 +166,7 @@ def to_dataframe(self, names: Optional[list[str]] = None, minimal: bool = False,

class Struct(Base): # inheriting from dict gives `get` method, should give `__contains__` but not working. # Inheriting from Base gives `save` method.
"""Use this class to keep bits and sundry items. Combines the power of dot notation in classes with strings in dictionaries to provide Pandas-like experience"""
def __init__(self, dictionary=None, **kwargs):
def __init__(self, dictionary: Optional[dict[Any, Any]] = None, **kwargs):
if dictionary is None or isinstance(dictionary, dict): final_dict = dict() if dictionary is None else dictionary
else: final_dict = (dict(dictionary) if dictionary.__class__.__name__ == "mappingproxy" else dictionary.__dict__)
final_dict.update(kwargs); super(Struct, self).__init__(); self.__dict__ = final_dict
Expand All @@ -188,7 +188,7 @@ def __getattr__(self, item) -> 'Struct':
try: return self.__dict__[item]
except KeyError as ke: raise AttributeError(f'{type(self).__name__!r} object has no attribute {item!r}') from ke # this works better with the linter. replacing Key error with Attribute error makes class work nicely with hasattr() by returning False.
clean_view = property(lambda self: type("TempClass", (object,), self.__dict__))
def __repr__(self, limit=150): return "Struct: " + Display.get_repr(self.keys().list.__repr__(), limit=limit, justify=0)
def __repr__(self, limit: int = 150): return "Struct: " + Display.get_repr(self.keys().list.__repr__(), limit=limit, justify=0)
def __getitem__(self, item): return self.__dict__[item] # thus, gives both dot notation and string access to elements.
def __setitem__(self, key, value): self.__dict__[key] = value
def __bool__(self): return bool(self.__dict__)
Expand Down Expand Up @@ -225,17 +225,17 @@ def plot(self, use_plt: bool = True, title: str = '', xlabel: str = '', ylabel:

def set_pandas_display(rows: int = 1000, columns: int = 1000, width: int = 5000, colwidth: int = 40) -> None: import pandas as pd; pd.set_option('display.max_colwidth', colwidth); pd.set_option('display.max_columns', columns); pd.set_option('display.width', width); pd.set_option('display.max_rows', rows)
def set_pandas_auto_width(): __import__("pandas").set_option('width', 0) # this way, pandas is told to detect window length and act appropriately. For fixed width host windows, this is recommended to avoid chaos due to line-wrapping.
def set_numpy_display(precision: int = 3, linewidth: int = 250, suppress=True, floatmode: str = 'fixed', **kwargs) -> None: __import__("numpy").set_printoptions(precision=precision, suppress=suppress, linewidth=linewidth, floatmode=floatmode, **kwargs)
def config(mydict, sep="\n", justify: int = 15, quotes: bool = False): return sep.join([f"{key:>{justify}} = {repr(val) if quotes else val}" for key, val in mydict.items()])
def set_numpy_display(precision: int = 3, linewidth: int = 250, suppress: bool = True, floatmode: str = 'fixed', **kwargs) -> None: __import__("numpy").set_printoptions(precision=precision, suppress=suppress, linewidth=linewidth, floatmode=floatmode, **kwargs)
def config(mydict, sep: str = "\n", justify: int = 15, quotes: bool = False): return sep.join([f"{key:>{justify}} = {repr(val) if quotes else val}" for key, val in mydict.items()])
def f(str_, limit=float('inf'), justify: int = 50, direc="<") -> str: return f"{(str_[:limit - 4] + '... ' if len(str_) > limit else str_):{direc}{justify}}"
def eng(): __import__("pandas").set_eng_float_format(accuracy=3, use_eng_prefix=True); __import__("pandas").options.float_format = '{:, .5f}'.format; __import__("pandas").set_option('precision', 7) # __import__("pandas").set_printoptions(formatter={'float': '{: 0.3f}'.format})
def outline(array, name: str = "Array", printit: bool = True): str_ = f"{name}. Shape={array.shape}. Dtype={array.dtype}"; print(str_) if printit else None; return str_
def get_repr(data, justify: int = 15, limit=float('inf'), direc="<") -> str:
def get_repr(data: Any, justify: int = 15, limit: Union[int, float] = float('inf'), direc: str = "<") -> str:
if (dtype := data.__class__.__name__) in {'list', 'str'}: str_ = data if dtype == 'str' else f"list. length = {len(data)}. " + ("1st item type: " + str(type(data[0])).split("'")[1]) if len(data) > 0 else " "
elif dtype in {"DataFrame", "Series"}: str_ = f"Pandas DF: shape = {data.shape}, dtype = {data.dtypes}." if dtype == 'DataFrame' else f"Pandas Series: Length = {len(data)}, Keys = {get_repr(data.keys().to_list())}."
else: str_ = f"shape = {data.shape}, dtype = {data.dtype}." if dtype == 'ndarray' else repr(data)
return f(str_.replace("\n", ", "), justify=justify, limit=limit, direc=direc)
def print_string_list(mylist, char_per_row: int = 125, sep=" ", style=str, _counter: int = 0):
def print_string_list(mylist: list[Any], char_per_row: int = 125, sep: str = " ", style: Callable[[Any], str] = str, _counter: int = 0):
for item in mylist: print("") if (_counter + len(style(item))) // char_per_row > 0 else print(style(item), end=sep); _counter = len(style(item)) if (_counter + len(style(item))) // char_per_row > 0 else _counter + len(style(item))
class Display: set_pandas_display = set_pandas_display; set_pandas_auto_width = set_pandas_auto_width; set_numpy_display = set_numpy_display; config = config; f = f; eng = eng; outline = outline; get_repr = get_repr; print_string_list = print_string_list # or D = type('D', (object, ), dict(set_pandas_display

Expand Down
12 changes: 6 additions & 6 deletions myresources/crocodile/file_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from crocodile.core import Struct, List, timestamp, randstr, validate_name, str2timedelta, Save, Path, install_n_import
from datetime import datetime
from typing import Any, Optional, Union, Any, Callable, TypeVar
from typing import Any, Optional, Union, Callable, TypeVar


# %% =============================== Security ================================================
Expand All @@ -15,7 +15,7 @@ def pwd2key(password: str, salt=None, iterations: int = 10) -> bytes: # Derive
if salt is None: m = __import__("hashlib").sha256(); m.update(password.encode("utf-8")); return __import__("base64").urlsafe_b64encode(m.digest()) # make url-safe bytes required by Ferent.
from cryptography.hazmat.primitives import hashes; from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
return __import__("base64").urlsafe_b64encode(PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=iterations, backend=None).derive(password.encode()))
def encrypt(msg: bytes, key: Optional[bytes] = None, pwd: Optional[str] = None, salted: bool =True, iteration: Optional[int] = None, gen_key=False) -> bytes:
def encrypt(msg: bytes, key: Optional[bytes] = None, pwd: Optional[str] = None, salted: bool = True, iteration: Optional[int] = None, gen_key=False) -> bytes:
salt = None # silence the linter.
if pwd is not None: # generate it from password
assert (key is None) and (type(pwd) is str), f"You can either pass key or pwd, or none of them, but not both."
Expand All @@ -42,7 +42,7 @@ def decrypt(token: bytes, key: Optional[bytes] = None, pwd: Optional[str] = None
elif isinstance(key, (str, P, Path)): key = P(key).read_bytes() # passed a path to a file containing kwy
else: raise TypeError(f"Key must be either str, P, Path, bytes or None. Recieved: {type(key)}")
return __import__("cryptography.fernet").__dict__["fernet"].Fernet(key).decrypt(token)
def unlock(drive="D:", pwd=None, auto_unlock=False):
def unlock(drive: str = "D:", pwd: Optional[str] = None, auto_unlock: bool = False):
return __import__("crocodile").meta.Terminal().run(f"""$SecureString = ConvertTo-SecureString "{pwd or P.home().joinpath("dotfiles/creds/data/bitlocker_pwd").read_text()}" -AsPlainText -Force; Unlock-BitLocker -MountPoint "{drive}" -Password $SecureString; """ + (f'Enable-BitLockerAutoUnlock -MountPoint "{drive}"' if auto_unlock else ''), shell="pwsh")


Expand Down Expand Up @@ -75,7 +75,7 @@ def txt(path: Union[Path, str], encoding: str = 'utf-8') -> str: return P(path).
class Read: read = read; mat = mat; json = json; yaml = yaml; ini = ini; npy = npy; csv = csv; vanilla_pickle = vanilla_pickle; py = py; pkl = vanilla_pickle; pickle = pickle; toml = toml; txt = txt


def modify_text(txt_raw, txt_search, txt_alt, replace_line=True, notfound_append=False, prepend=False, strict=False):
def modify_text(txt_raw: str, txt_search: str, txt_alt: str, replace_line: bool = True, notfound_append: bool = False, prepend: bool = False, strict: bool = False):
lines, bingo = txt_raw.split("\n"), False
if not replace_line: # no need for line splitting
if txt_search in txt_raw: return txt_raw.replace(txt_search, txt_alt)
Expand Down Expand Up @@ -387,10 +387,10 @@ def sync_to_cloud(self, cloud, sync_up=False, sync_down=False, os_specific=False
return self


def compress_folder(root_dir, op_path, base_dir, fmt='zip', **kwargs): # shutil works with folders nicely (recursion is done interally) # directory to be archived: root_dir\base_dir, unless base_dir is passed as absolute path. # when archive opened; base_dir will be found."""
def compress_folder(root_dir: str, op_path: str, base_dir: str, fmt: str = 'zip', **kwargs): # shutil works with folders nicely (recursion is done interally) # directory to be archived: root_dir\base_dir, unless base_dir is passed as absolute path. # when archive opened; base_dir will be found."""
assert fmt in {"zip", "tar", "gztar", "bztar", "xztar"} # .zip is added automatically by library, hence we'd like to avoid repeating it if user sent it.
return P(__import__('shutil').make_archive(base_name=str(op_path)[:-4] if str(op_path).endswith(".zip") else str(op_path), format=fmt, root_dir=str(root_dir), base_dir=str(base_dir), **kwargs)) # returned path possible have added extension.
def zip_file(ip_path, op_path, arcname=None, password=None, mode="w", **kwargs):
def zip_file(ip_path: str, op_path: str, arcname: Optional[str] = None, password: Optional[str] = None, mode: str = "w", **kwargs):
"""arcname determines the directory of the file being archived inside the archive. Defaults to same as original directory except for drive.
When changed, it should still include the file path in its end. If arcname = filename without any path, then, it will be in the root of the archive."""
import zipfile
Expand Down
46 changes: 23 additions & 23 deletions tst/file_management.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import tempfile
# import tempfile

import pytest
from crocodile.file_management import *
# import pytest
# from crocodile.file_management import *


class Test_P:
@staticmethod
def test_copy():
folder = P.tmp(folder="__test__/test_folder").create()
file = P.tmp(file="__test__/test_file").write_text("file content")
dest = P.tmp(folder="__test__/test_destination")
# class Test_P:
# @staticmethod
# def test_copy():
# folder = P.tmp(folder="__test__/test_folder").create()
# file = P.tmp(file="__test__/test_file").write_text("file content")
# dest = P.tmp(folder="__test__/test_destination")

folder.copy(path=dest / "folder_copy_path_passed")
folder.copy(folder=dest / "folder_copy_folder_passed")
folder.copy(folder=dest / "folder_copy_folder_passed_2", name="name_passed_after_folder")
assert (dest / "folder_copy_path_passed").is_dir()
assert (dest / "folder_copy_folder_passed").is_dir()
assert (dest / "folder_copy_folder_passed_2/name_passed_after_folder").is_dir()
# folder.copy(path=dest / "folder_copy_path_passed")
# folder.copy(folder=dest / "folder_copy_folder_passed")
# folder.copy(folder=dest / "folder_copy_folder_passed_2", name="name_passed_after_folder")
# assert (dest / "folder_copy_path_passed").is_dir()
# assert (dest / "folder_copy_folder_passed").is_dir()
# assert (dest / "folder_copy_folder_passed_2/name_passed_after_folder").is_dir()

file.copy(path=dest / "file_copy_path_passed")
file.copy(folder=dest / "file_copy_folder_passed")
file.copy(folder=dest / "file_copy_folder_passed_2", name="name_passed_after_folder")
# file.copy(path=dest / "file_copy_path_passed")
# file.copy(folder=dest / "file_copy_folder_passed")
# file.copy(folder=dest / "file_copy_folder_passed_2", name="name_passed_after_folder")

@staticmethod
def test_zip():
folder = P.tmp(folder="__test__/test_folder")
folder.joinpath("test_content").touch()
# @staticmethod
# def test_zip():
# folder = P.tmp(folder="__test__/test_folder")
# folder.joinpath("test_content").touch()

folder.zip()
# folder.zip()

0 comments on commit 11d64fe

Please sign in to comment.