From 8e3731119a31cdcd6231749d1ef4177215be9063 Mon Sep 17 00:00:00 2001 From: Alex Al-Saffar Date: Thu, 31 Aug 2023 18:25:43 +0930 Subject: [PATCH] working on types --- myresources/crocodile/comms/notification.py | 6 +-- myresources/crocodile/core.py | 14 ++--- myresources/crocodile/file_management.py | 57 +++++++++++++-------- 3 files changed, 45 insertions(+), 32 deletions(-) diff --git a/myresources/crocodile/comms/notification.py b/myresources/crocodile/comms/notification.py index 2349c915..06a44ee5 100644 --- a/myresources/crocodile/comms/notification.py +++ b/myresources/crocodile/comms/notification.py @@ -1,6 +1,4 @@ -"""Notifications -""" # import numpy as np # import matplotlib.pyplot as plt import crocodile.toolbox as tb @@ -28,7 +26,7 @@ def get_source_of_truth(): return tb.P.home().joinpath("dotfiles/machineconfig/e def __init__(self, config: dict[str, Any]): self.config = config if config['encryption'].lower() == "ssl": self.server = smtplib.SMTP_SSL(host=self.config["smtp_host"], port=self.config["smtp_port"]) - elif config['encryption'].lower() == "tls": self.server = smtplib.SMTP(host=self.config["smtp_host"], port=self.config["smtp_port"]) + elif config['encryption'].lower() == "tls": self.server = smtplib.SMTP(host=self.config["smtp_host"], port=self.config["smtp_port"]) self.server.login(self.config['email_add'], password=self.config["password"]) def send_message(self, to: str, subject: str, body: str, txt_to_html: bool = True, attachments: Optional[list[Any]] = None): @@ -94,7 +92,7 @@ class PhoneNotification: # security concerns: avoid using this. def __init__(self, bulletpoint_token): pushbullet = tb.install_n_import("pushbullet") self.api = pushbullet.Pushbullet(bulletpoint_token) - def send_notification(self, title: str = "Note From Python", body: str = "A notfication"): self.api.push_note(title=title, body=body) + def send_notification(self, title: str="Note From Python", body: str = "A notfication"): self.api.push_note(title=title, body=body) @staticmethod def open_website(): tb.P(r"https://www.pushbullet.com/").readit() @staticmethod # https://www.youtube.com/watch?v=tbzPcKRZlHg diff --git a/myresources/crocodile/core.py b/myresources/crocodile/core.py index 1c9f67a5..eced5b64 100644 --- a/myresources/crocodile/core.py +++ b/myresources/crocodile/core.py @@ -140,7 +140,7 @@ def __array__(self): import numpy as np; return np.array(self.list) # compatibi def len(self) -> int: return len(self.list) # ================= call methods ===================================== def __getattr__(self, name: str) -> 'List[T]': return List(getattr(i, name) for i in self.list) # fallback position when __getattribute__ mechanism fails. - def __call__(self, *args: Any, **kwargs: Any) -> 'List[Any]': return List([i.__call__(*args, **kwargs) for i in self.list]) + def __call__(self, *args: Any, **kwargs: Any) -> 'List[Any]': return List([ii.__call__(*args, **kwargs) for ii in self.list]) # ======================== Access Methods ========================================== def __setitem__(self, key: int, value: T) -> None: self.list[key] = value def sample(self, size: int = 1, replace: bool = False, p: Optional[list[float]] = None) -> 'List[T]': @@ -209,7 +209,7 @@ def to_dataframe(self, names: Optional[list[str]] = None, minimal: bool = False, class Struct(Base): # inheriting from dict gives `get` method, should give `__contains__` but not working. # Inheriting from Base gives `save` method. """Use this class to keep bits and sundry items. Combines the power of dot notation in classes with strings in dictionaries to provide Pandas-like experience""" def __init__(self, dictionary: Union[dict[Any, Any], Type[object], None] = None, **kwargs: Any): - if dictionary is None or isinstance(dictionary, dict): final_dict = {} if dictionary is None else dictionary + if dictionary is None or isinstance(dictionary, dict): final_dict: dict[str, Any] = {} if dictionary is None else dictionary else: final_dict = (dict(dictionary) if dictionary.__class__.__name__ == "mappingproxy" else dictionary.__dict__) # type: ignore final_dict.update(kwargs) # type ignore @@ -270,17 +270,19 @@ def _pandas_repr(self, justify: int, return_str: bool = False, limit: int = 30): return res if not return_str else str(res) def print(self, dtype: bool = True, return_str: bool = False, justify: int = 30, as_config: bool = False, as_yaml: bool = False, limit: int = 50, title: str = "", attrs: bool = False, **kwargs: Any) -> Union[str, 'Struct']: _ = attrs + import pandas as pd if as_config and not return_str: install_n_import("rich").inspect(self, value=False, title=title, docs=False, sort=False); return self if not bool(self): res = f"Empty Struct." else: if as_yaml or as_config: res = __import__("yaml").dump(self.__dict__) if as_yaml else config(self.__dict__, justify=justify, **kwargs) else: - import pandas as pd tmp = self._pandas_repr(justify=justify, return_str=False, limit=limit) - assert isinstance(tmp, pd.DataFrame) - res = tmp.drop(columns=[] if dtype else ["dtype"]) + if isinstance(tmp, pd.DataFrame): + res = tmp.drop(columns=[] if dtype else ["dtype"]) + else: raise TypeError(f"Unexpected type {type(tmp)}") if not return_str: - if ("DataFrame" in res.__class__.__name__ and install_n_import("tabulate")): install_n_import("rich").print(res.to_markdown()) + if (isinstance(res, pd.DataFrame) and install_n_import("tabulate")): + install_n_import("rich").print(res.to_markdown()) else: print(res) return str(res) if return_str else self @staticmethod diff --git a/myresources/crocodile/file_management.py b/myresources/crocodile/file_management.py index fbac613a..c6a3737b 100644 --- a/myresources/crocodile/file_management.py +++ b/myresources/crocodile/file_management.py @@ -5,10 +5,11 @@ from crocodile.core import Struct, List, timestamp, randstr, validate_name, str2timedelta, Save, Path, install_n_import from datetime import datetime -from typing import Any, Optional, Union, Callable, TypeVar, TypeAlias +from typing import Any, Optional, Union, Callable, TypeVar, TypeAlias, Literal PLike: TypeAlias = Union[str, 'P', Path, None] +FILE_MODE: TypeAlias = Literal['r', 'w', 'x', 'a'] # %% =============================== Security ================================================ @@ -239,7 +240,7 @@ def size(self, units: Optional[str] = 'mb'): # ================================ total_size = self.stat().st_size if self.is_file() else sum([item.stat().st_size for item in self.rglob("*") if item.is_file()]) return round(total_size / dict(zip(List(['b', 'kb', 'mb', 'gb', 'B', 'KB', 'MB', 'GB']), 2 * [1024 ** item for item in range(4)]))[units], 1) def time(self, which: Optional[str] = ["m", "c", "a"][0], **kwargs: Any): return datetime.fromtimestamp({"m": self.stat().st_mtime, "a": self.stat().st_atime, "c": self.stat().st_ctime}[which], **kwargs) # m last mofidication of content, i.e. the time it was created. c last status change (its inode is changed, permissions, path, but not content) a: last access - def stats(self): return Struct(size=self.size(), content_mod_time=self.time(which="m"), attr_mod_time=self.time(which="c"), last_access_time=self.time(which="a"), group_id_owner=self.stat().st_gid, user_id_owner=self.stat().st_uid) + def stats(self) -> dict[str, Any]: return dict(size=self.size(), content_mod_time=self.time(which="m"), attr_mod_time=self.time(which="c"), last_access_time=self.time(which="a"), group_id_owner=self.stat().st_gid, user_id_owner=self.stat().st_uid) # ================================ String Nature management ==================================== def _type(self): return ("File" if self.is_file() else ("Dir" if self.is_dir() else "NotExist")) if self.absolute() else "Relative" def clickable(self, inlieu: bool = False) -> 'P': return self._return(self.expanduser().resolve().as_uri(), inlieu) @@ -383,20 +384,27 @@ def get_env(): return __import__("crocodile.environment").environment def share_on_cloud(self) -> 'P': return P(__import__("requests").put(f"https://transfer.sh/{self.expanduser().name}", self.expanduser().absolute().read_bytes()).text) def share_on_network(self, username: PLike = None, password: Optional[str] = None): from crocodile.meta import Terminal; Terminal(stdout=None).run(f"sharing {self} {('--username ' + username) if username else ''} {('--password ' + password) if password else ''}", shell="powershell") def to_qr(self, txt: bool = True, path: Union[str, 'P', None] = None): qrcode = install_n_import("qrcode"); qr = qrcode.QRCode(); qr.add_data(str(self) if "http" in str(self) else (self.read_text() if txt else self.read_bytes())); import io; f = io.StringIO(); qr.print_ascii(out=f); f.seek(0); print(f.read()); qr.make_image().save(path) if path is not None else None - def get_remote_path(self, root: str, os_specific: bool = False): return P(root) / (__import__('platform').system().lower() if os_specific else 'generic_os') / self.rel2home() + def get_remote_path(self, root: str, os_specific: bool = False) -> 'P': return P(root) / (__import__('platform').system().lower() if os_specific else 'generic_os') / self.rel2home() def to_cloud(self, cloud: str, remotepath: PLike = None, zip: bool = False, encrypt: bool = False, key: Optional[str] = None, pwd: Optional[str] = None, rel2home: bool = False, share: bool = False, verbose: bool = True, os_specific: bool = False, transfers: int = 10, root: str = "myhome") -> 'P': localpath, to_del = self.expanduser().absolute(), [] if zip: localpath = localpath.zip(inplace=False); to_del.append(localpath) if encrypt: localpath = localpath.encrypt(key=key, pwd=pwd, inplace=False); to_del.append(localpath) if remotepath is None: - remotepath = localpath.get_remote_path(root=root, os_specific=os_specific) if rel2home else (P(root) / localpath if root is not None else localpath) - else: remotepath = P(remotepath) - from crocodile.meta import Terminal; print(f"{'⬆️'*5} UPLOADING {localpath.as_posix()} to {cloud}:{remotepath.as_posix()}") - res = Terminal(stdout=None).run(f"""rclone copyto '{localpath.as_posix()}' '{cloud}:{remotepath.as_posix()}' {'--progress' if verbose else ''} --transfers={transfers}""", shell="powershell").capture(); [item.delete(sure=True) for item in to_del]; print(f"{'⬆️'*5} UPLOAD COMPLETED.") + rp = localpath.get_remote_path(root=root, os_specific=os_specific) if rel2home else (P(root) / localpath if root is not None else localpath) + else: rp = P(remotepath) + from crocodile.meta import Terminal; print(f"{'⬆️'*5} UPLOADING {localpath.as_posix()} to {cloud}:{rp.as_posix()}") + res = Terminal(stdout=None).run(f"""rclone copyto '{localpath.as_posix()}' '{cloud}:{rp.as_posix()}' {'--progress' if verbose else ''} --transfers={transfers}""", shell="powershell").capture() + _ = [item.delete(sure=True) for item in to_del]; print(f"{'⬆️'*5} UPLOAD COMPLETED.") assert res.is_successful(strict_err=False, strict_returcode=True), res.print(capture=False) - if share: print("🔗 SHARING FILE"); res = Terminal().run(f"""rclone link '{cloud}:{remotepath.as_posix()}'""", shell="powershell").capture(); return res.op2path(strict_err=True, strict_returncode=True) + if share: + print("🔗 SHARING FILE") + res = Terminal().run(f"""rclone link '{cloud}:{rp.as_posix()}'""", shell="powershell").capture() + tmp = res.op2path(strict_err=True, strict_returncode=True) + assert isinstance(tmp, P), f"Could not get link for {self}." + return tmp return self - def from_cloud(self, cloud: str, localpath: PLike = None, decrypt: bool = False, unzip: bool = False, key: Optional[str] = None, pwd: Optional[str] = None, rel2home: bool = False, overwrite: bool = True, merge: bool = False, os_specific: bool = False, transfers: int = 10, root: str = "myhome"): + def from_cloud(self, cloud: str, localpath: PLike = None, decrypt: bool = False, unzip: bool = False, # type: ignore + key: Optional[str] = None, pwd: Optional[str] = None, rel2home: bool = False, overwrite: bool = True, merge: bool = False, os_specific: bool = False, transfers: int = 10, root: str = "myhome"): remotepath = self # .expanduser().absolute() localpath = P(localpath).expanduser().absolute() if localpath is not None else P.home().joinpath(remotepath.rel2home()) if rel2home: remotepath = remotepath.get_remote_path(root=root, os_specific=os_specific) @@ -423,15 +431,15 @@ def str(self) -> str: return str(self) # or self._str def compress_folder(root_dir: str, op_path: str, base_dir: str, fmt: str = 'zip', **kwargs: Any): # shutil works with folders nicely (recursion is done interally) # directory to be archived: root_dir\base_dir, unless base_dir is passed as absolute path. # when archive opened; base_dir will be found.""" assert fmt in {"zip", "tar", "gztar", "bztar", "xztar"} # .zip is added automatically by library, hence we'd like to avoid repeating it if user sent it. return P(__import__('shutil').make_archive(base_name=str(op_path)[:-4] if str(op_path).endswith(".zip") else str(op_path), format=fmt, root_dir=str(root_dir), base_dir=str(base_dir), **kwargs)) # returned path possible have added extension. -def zip_file(ip_path: str, op_path: str, arcname: PLike = None, password: Optional[str] = None, mode: str = "w", **kwargs: Any): +def zip_file(ip_path: str, op_path: str, arcname: PLike = None, password: Optional[bytes] = None, mode: FILE_MODE = "w", **kwargs: Any): """arcname determines the directory of the file being archived inside the archive. Defaults to same as original directory except for drive. When changed, it should still include the file path in its end. If arcname = filename without any path, then, it will be in the root of the archive.""" import zipfile - with zipfile.ZipFile(str(op_path), mode=mode) as jungle_zip: + with zipfile.ZipFile(op_path, mode=mode) as jungle_zip: jungle_zip.setpassword(pwd=password) if password is not None else None jungle_zip.write(filename=str(ip_path), arcname=str(arcname) if arcname is not None else None, compress_type=zipfile.ZIP_DEFLATED, **kwargs) return P(op_path) -def unzip(ip_path: str, op_path: PLike = None, fname: PLike = None, password: Optional[str] = None, memory: bool = False, **kwargs: Any): +def unzip(ip_path: str, op_path: str, fname: PLike = None, password: Optional[str] = None, memory: bool = False, **kwargs: Any): with __import__("zipfile").ZipFile(str(ip_path), 'r') as zipObj: if memory: return Struct({name: zipObj.read(name) for name in zipObj.namelist()}) if fname is None else zipObj.read(fname) if fname is None: zipObj.extractall(op_path, pwd=password, **kwargs); return P(op_path) @@ -475,27 +483,32 @@ class Compression: class Cache: # This class helps to accelrate access to latest data coming from expensive function. The class has two flavours, memory-based and disk-based variants.""" - def __init__(self, source_func: Callable[[], T], expire: str = "1m", logger=None, path: PLike = None, save: Optional[Callable[[Any, Union[str, P]], None]] = Save.pickle, reader: Optional[Callable[[str], Any]] = Read.read) -> None: - self.cache = None # fridge content + def __init__(self, source_func: Callable[[], 'T'], expire: str = "1m", logger: Optional[Any] = None, path: PLike = None, save: Callable[[T, Union[str, P]], Any] = Save.pickle, reader: Callable[[Union[str, P]], T] = Read.read) -> None: + self.cache: Optional[T] = None # fridge content self.source_func = source_func # function which when called returns a fresh object to be frozen. self.path: P | None = P(path) if path else None # if path is passed, it will function as disk-based flavour. self.time_produced = datetime.now() # if path is None else - self.save, self.reader, self.logger, self.expire = save, reader, logger, expire + self.save = save + self.reader = reader + self.logger = logger + self.expire = str2timedelta(expire) @property - def age(self): return datetime.now() - self.time_produced if self.path is None else datetime.now() - self.path.stats().content_mod_time + def age(self): return datetime.now() - self.time_produced if self.path is None else datetime.now() - datetime.fromtimestamp(self.path.stat().st_mtime) def __setstate__(self, state: dict[str, Any]) -> None: self.__dict__.update(state); self.path = P.home() / self.path if self.path is not None else self.path def __getstate__(self) -> dict[str, Any]: state = self.__dict__.copy(); state["path"] = self.path.rel2home() if self.path is not None else state["path"]; return state # With this implementation, instances can be pickled and loaded up in different machine and still works. - def __call__(self, fresh: bool = False) -> T: + def __call__(self, fresh: bool = False) -> 'T': # type: ignore if self.path is None: # Memory Cache - if self.cache is None or fresh is True or self.age > str2timedelta(self.expire): + if self.cache is None or fresh is True or self.age > self.expire: self.cache, self.time_produced = self.source_func(), datetime.now() if self.logger: self.logger.debug(f"Updating / Saving data from {self.source_func}") elif self.logger: self.logger.debug(f"Using cached values. Lag = {self.age}.") - elif fresh or not self.path.exists() or self.age > str2timedelta(self.expire): # disk fridge + elif fresh or not self.path.exists() or self.age > self.expire: # disk fridge if self.logger: self.logger.debug(f"Updating & Saving {self.path} ...") - self.cache = self.source_func(); self.save(obj=self.cache, path=self.path) # fresh order, never existed or exists but expired. - elif self.age < str2timedelta(self.expire) and self.cache is None: self.cache = self.reader(self.path) # this implementation favours reading over pulling fresh at instantiation. # exists and not expired. else # use the one in memory self.cache - return self.cache + self.cache = self.source_func() + self.save(self.cache, self.path) # fresh order, never existed or exists but expired. + elif self.age < self.expire and self.cache is None: + self.cache = self.reader(self.path) # this implementation favours reading over pulling fresh at instantiation. # exists and not expired. else # use the one in memory self.cache + return self.cache # type: ignore if __name__ == '__main__':