Skip to content

Commit

Permalink
added mypy settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Al-Saffar committed Aug 29, 2023
1 parent bfd1c88 commit 043fe7e
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 66 deletions.
4 changes: 2 additions & 2 deletions myresources/crocodile/cluster/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ def transfer_sh(machine: RemoteMachine):
if machine.config.copy_repo:
tmp_file = tb.P(machine.job_params.repo_path_rh).expanduser().zip_n_encrypt()
cloud_download_py_script += f"print('Downloading `{tmp_file.collapseuser()}`.')\n"
cloud_download_py_script += f"tb.P(r'{tmp_file.transfer_sh()}').download(folder=r'{tb.P(machine.job_params.repo_path_rh).parent}').decrypt_n_unzip()\n"
cloud_download_py_script += f"tb.P(r'{tmp_file.share_on_cloud()}').download(folder=r'{tb.P(machine.job_params.repo_path_rh).parent}').decrypt_n_unzip()\n"
tmp_file.delete(sure=True)

# download data
for _idx, item in enumerate(machine.data):
cloud_download_py_script += f"tb.P(r'{tb.P(item).transfer_sh()}').download(folder=r'{item.collapseuser().parent}')\n"
cloud_download_py_script += f"tb.P(r'{tb.P(item).share_on_cloud()}').download(folder=r'{item.collapseuser().parent}')\n"

# save cloud_download_script_py
machine.resources.cloud_download_py_script_path.expanduser().write_text(cloud_download_py_script, encoding="utf-8")
Expand Down
3 changes: 2 additions & 1 deletion myresources/crocodile/cluster/session_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import crocodile.toolbox as tb
import time
from typing import Optional


class Zellij:
Expand All @@ -20,7 +21,7 @@ def new_sess_name(self) -> str:
return tmp
# def __getstate__(self): return self.__dict__
# def __setstate__(self, state): self.__dict__.update(state)
def get_ssh_command(self, sess_name=None): return f"zellij attach {sess_name or self.get_new_session_name()} -c " # -c means create if not exists.
def get_ssh_command(self, sess_name: Optional[str] = None): return f"zellij attach {sess_name or self.get_new_session_name()} -c " # -c means create if not exists.
def get_new_session_string(self): return f"{self.ssh.get_ssh_conn_str()} -t {self.get_ssh_command()}"
def open_console(self): return tb.Terminal().run_async(self.get_new_session_string(), shell="pwsh")
def get_new_session_name(self):
Expand Down
5 changes: 3 additions & 2 deletions myresources/crocodile/cluster/template_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
import crocodile.toolbox as tb
from crocodile.cluster.distribute import WorkloadParams
from crocodile.cluster.distribute import RemoteMachineConfig, LoadCriterion, Cluster, ThreadLoadCalculator
from typing import Any


class ExpensiveComputation:
@staticmethod
def func_single_job(workload_params: WorkloadParams, *args, **kwargs) -> tb.P:
def func_single_job(workload_params: WorkloadParams, *args: Any, **kwargs: Any) -> tb.P:
from crocodile.cluster.utils import expensive_function
res = expensive_function(workload_params=workload_params, *args, **kwargs)
return res

@staticmethod
def func(workload_params: WorkloadParams, *args, **kwargs) -> tb.P:
def func(workload_params: WorkloadParams, *args: Any, **kwargs: Any) -> tb.P:
per_job_workload_params = tb.L(range(workload_params.idx_start, workload_params.idx_end, 1)).split(to=workload_params.jobs).apply(lambda sub_list: WorkloadParams(idx_start=sub_list.list[0], idx_end=sub_list.list[-1] + 1, idx_max=workload_params.idx_max, jobs=workload_params.jobs))
res: list[tb.P] = tb.L(per_job_workload_params).apply(lambda a_workload_params: ExpensiveComputation.func_single_job(*args, workload_params=a_workload_params, **kwargs), jobs=workload_params.jobs).list
return res[0]
Expand Down
75 changes: 40 additions & 35 deletions myresources/crocodile/core.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions myresources/crocodile/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DBMS:
* Always use sqlalchemy API and avoid sql-dielect specific language.
* Engine is provided externally. It is the end-user's business to make this engine.
"""
def __init__(self, engine, db=None, sch=None, vws=False):
def __init__(self, engine: Engine, db=None, sch: Optional[str] = None, vws=False):
self.eng: Engine = engine
self.con = None
self.ses = None
Expand All @@ -46,7 +46,7 @@ def __init__(self, engine, db=None, sch=None, vws=False):
# self.ip_formatter: Optional[Any] = None
# self.db_specs: Optional[Any] = None

def refresh(self, sch=None) -> 'Self': # fails if multiple schemas are there and None is specified
def refresh(self, sch: Optional[str] = None) -> 'Self': # fails if multiple schemas are there and None is specified
self.con = self.eng.connect()
self.ses = sessionmaker()(bind=self.eng) # ORM style
self.meta = MetaData()
Expand All @@ -58,14 +58,14 @@ def refresh(self, sch=None) -> 'Self': # fails if multiple schemas are there an
return self

def __getstate__(self): return Struct(self.__dict__.copy()).delete(keys=["eng", "con", "ses", "insp", "meta"]).update(path=self.path.collapseuser(strict=False)).__dict__
def __setstate__(self, state): self.__dict__.update(state); self.eng = self.make_sql_engine(self.path); self.refresh()
def __setstate__(self, state: dict[str, Any]): self.__dict__.update(state); self.eng = self.make_sql_engine(self.path); self.refresh()

@classmethod
def from_local_db(cls, path=None, echo=False, share_across_threads=False, **kwargs): return cls(engine=cls.make_sql_engine(path=path, echo=echo, share_across_threads=share_across_threads, **kwargs))
def from_local_db(cls, path=None, echo: bool = False, share_across_threads: bool = False, **kwargs: Any): return cls(engine=cls.make_sql_engine(path=path, echo=echo, share_across_threads=share_across_threads, **kwargs))
def __repr__(self): return f"DataBase @ {self.eng}"
def get_columns(self, table, sch=None):
def get_columns(self, table: str, sch=None):
return self.meta.tables[self._get_table_identifier(table=table, sch=sch)].exported_columns.keys()
def close(self, sleep=2):
def close(self, sleep: int = 2):
print(f"Terminating database `{self.path.as_uri() if 'memory' not in self.path else self.path}`")
self.con.close()
self.ses.close()
Expand All @@ -77,7 +77,7 @@ def _get_table_identifier(self, table, sch):
else: return table

@staticmethod
def make_sql_engine(path=None, echo=False, dialect="sqlite", driver=["pysqlite", "DBAPI"][0], pool_size=5, share_across_threads=True, **kwargs):
def make_sql_engine(path=None, echo: bool = False, dialect: str = "sqlite", driver: str = ["pysqlite", "DBAPI"][0], pool_size: int = 5, share_across_threads: bool = True, **kwargs: Any):
"""Establish lazy initialization with database"""
if str(path) == "memory":
print("Linking to in-memory database.")
Expand All @@ -102,25 +102,25 @@ def execute_begin_once(self, command, res_func=lambda x: x.all(), df=False):
result = res_func(result)
return result if not df else pd.DataFrame(result)

def execute(self, command, df=False):
def execute(self, command: str, df=False):
with self.eng.begin() as conn: result = conn.execute(text(command))
return result if not df else pd.DataFrame(result)

def execute_script(self, command, df=False):
def execute_script(self, command: str, df=False):
with self.eng.begin() as conn: result = conn.executescript(text(command))
return result if not df else pd.DataFrame(result)

# ========================== TABLES =====================================
def read_table(self, table, sch=None, size=100):
def read_table(self, table: str, sch: Optional[str] = None, size: int = 100):
res = self.con.execute(text(f'''SELECT * FROM "{self._get_table_identifier(table, sch)}"'''))
return pd.DataFrame(res.fetchmany(size))

def insert_dicts(self, table, *mydicts):
def insert_dicts(self, table: str, *mydicts):
cmd = f"""INSERT INTO {table} VALUES """
for mydict in mydicts: cmd += f"""({tuple(mydict)}), """
self.execute_begin_once(cmd)

def describe_table(self, table, sch=None, dtype=True):
def describe_table(self, table: str, sch=None, dtype=True):
print(table.center(100, "="))
self.refresh()
tbl = self.meta.tables[table]
Expand Down
4 changes: 2 additions & 2 deletions myresources/crocodile/deeplearning.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def save(self):
def __getstate__(self) -> dict[str, Any]: return self.__dict__
def __setstate__(self, state: dict): return self.__dict__.update(state)
@classmethod
def from_saved_data(cls, path, *args, **kwargs) -> 'HParams':
def from_saved_data(cls, path, *args: Any, **kwargs: Any) -> 'HParams':
data: dict = tb.Read.vanilla_pickle(path=tb.P(path) / cls.subpath / "hparams.HParams.dat.pkl", *args, **kwargs)
return cls(**data)
def __repr__(self, **kwargs): return "HParams Object with specs:\n" + tb.Struct(self.__dict__).print(as_config=True, return_str=True)
Expand Down Expand Up @@ -144,7 +144,7 @@ def __getstate__(self) -> dict[str, Any]:
def __setstate__(self, state): return self.__dict__.update(state)
def __repr__(self): return f"DataReader Object with these keys: \n" + tb.Struct(self.__dict__).print(as_config=False, return_str=True)

def split_the_data(self, *args, **kwargs) -> None:
def split_the_data(self, *args: Any, **kwargs: Any) -> None:
from sklearn.model_selection import train_test_split
result = train_test_split(*args, test_size=self.hp.test_split, shuffle=self.hp.shuffle, random_state=self.hp.seed, **kwargs)
self.split = dict(train_loader=None, test_loader=None)
Expand Down
2 changes: 1 addition & 1 deletion myresources/crocodile/deeplearning_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
# # if self.save_type in {"obj", "both"}: super(HyperParam, self).save(path=self.save_dir.joinpath(self.subpath / "hparams.HyperParam.pkl"), add_suffix=False, data_only=False, desc="")

# @classmethod
# def from_saved_data(cls, path, *args, **kwargs) -> 'HParams': return tb.Read.pickle(path=tb.P(path) / cls.subpath / "hparams.HParams.dat.pkl", *args, **kwargs)
# def from_saved_data(cls, path, *args: Any, **kwargs: Any) -> 'HParams': return tb.Read.pickle(path=tb.P(path) / cls.subpath / "hparams.HParams.dat.pkl", *args, **kwargs)
# def __repr__(self, **kwargs): return "HParams Object with specs:\n" + tb.Struct(self.__dict__).print(as_config=True, return_str=True)
# @property
# def pkg(self): return __import__("tensorflow") if self.pkg_name == "tensorflow" else (__import__("torch") if self.pkg_name == "torch" else ValueError(f"pkg_name must be either `tensorflow` or `torch`"))
Expand Down
4 changes: 2 additions & 2 deletions myresources/crocodile/file_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def start(self, opener=None):
__import__("subprocess").Popen(f"powershell start '{self.expanduser().resolve().str}'" if opener is None else rf'powershell {opener} \'{self}\''); return self # fails for folders. Start must be passed, but is not defined.
elif __import__("sys").platform == 'linux': __import__("subprocess").call(["xdg-open", self.expanduser().resolve().str]); return self # works for files and folders alike
else: __import__("subprocess").call(["open", self.expanduser().resolve().str]); return self # works for files and folders alike # mac
def __call__(self, *args, **kwargs) -> 'P': self.start(*args, **kwargs); return self
def __call__(self, *args: Any, **kwargs: Any) -> 'P': self.start(*args, **kwargs); return self
def append_text(self, appendix) -> 'P': self.write_text(self.read_text() + appendix); return self
def cache_from(self, source_func, expire="1w", save=Save.vanilla_pickle, reader=Read.read, **kwargs): return Cache(source_func=source_func, path=self, expire=expire, save=save, reader=reader, **kwargs)
def modify_text(self, txt_search, txt_alt, replace_line: bool = False, notfound_append: bool = False, prepend: bool = False, encoding=None):
Expand Down Expand Up @@ -193,7 +193,7 @@ def items(self) -> List[str]: return List(self.parts)
def __len__(self) -> int: return len(self.parts)
def __contains__(self, item): return P(item).as_posix() in self.as_posix()
def __iter__(self): return self.parts.__iter__()
def __deepcopy__(self, *args, **kwargs) -> 'P': return P(str(self))
def __deepcopy__(self, *args: Any, **kwargs: Any) -> 'P': return P(str(self))
def __getstate__(self) -> str: return str(self)
def __setstate__(self, state): self._str = str(state)
def __add__(self, other) -> 'P': return self.parent.joinpath(self.name + str(other)) # used append and prepend if the addition wanted to be before suffix.
Expand Down
4 changes: 2 additions & 2 deletions myresources/crocodile/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Null:
def __init__(self, return_='self'): self.return_ = return_
def __getattr__(self, item) -> 'Null': _ = item; return self if self.return_ == 'self' else self.return_
def __getitem__(self, item) -> 'Null': _ = item; return self if self.return_ == 'self' else self.return_
def __call__(self, *args, **kwargs) -> 'Null': return self if self.return_ == 'self' else self.return_
def __call__(self, *args: Any, **kwargs: Any) -> 'Null': return self if self.return_ == 'self' else self.return_
def __len__(self): return 0
def __bool__(self): return False
def __contains__(self, item): _ = self, item; return False
Expand Down Expand Up @@ -64,7 +64,7 @@ class Response:
@staticmethod
def from_completed_process(cp: subprocess.CompletedProcess): (resp := Terminal.Response(cmd=cp.args)).output.update(dict(stdout=cp.stdout, stderr=cp.stderr, returncode=cp.returncode)); return resp
def __init__(self, stdin=None, stdout=None, stderr=None, cmd: Optional[str] = None, desc: str =""): self.std, self.output, self.input, self.desc = dict(stdin=stdin, stdout=stdout, stderr=stderr), dict(stdin="", stdout="", stderr="", returncode=None), cmd, desc # input command
def __call__(self, *args, **kwargs) -> Optional[str]: return self.op.rstrip() if type(self.op) is str else None
def __call__(self, *args: Any, **kwargs: Any) -> Optional[str]: return self.op.rstrip() if type(self.op) is str else None
op = property(lambda self: self.output["stdout"])
ip = property(lambda self: self.output["stdin"])
err = property(lambda self: self.output["stderr"])
Expand Down
15 changes: 8 additions & 7 deletions myresources/crocodile/msc/obfuscater.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

import crocodile.toolbox as tb
from typing import Any, Optional


class Obfuscator(tb.Base):
def __init__(self, directory="", noun=False, suffix="same", *args, **kwargs) -> None:
def __init__(self, directory: str = "", noun: bool = False, suffix: str = "same", *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.directory = tb.P(directory).expanduser().absolute()
assert self.directory.is_dir()
Expand All @@ -13,12 +14,12 @@ def __init__(self, directory="", noun=False, suffix="same", *args, **kwargs) ->
self.noun = noun
self.suffix = suffix

def __getstate__(self):
def __getstate__(self) -> dict[str, Any]:
state = self.__dict__.copy()
state['directory'] = self.directory.collapseuser(strict=False)
return state

def __setstate__(self, state: dict):
def __setstate__(self, state: dict[str, Any]) -> None:
self.__dict__ = state
self.directory = self.directory.expanduser()

Expand All @@ -30,7 +31,7 @@ def deobfuscate(self, path: tb.P) -> tb.P:
def obfuscate(self, path: tb.P) -> tb.P:
return self.directory.parent.joinpath(self.real_to_phony[path.relative_to(self.directory).as_posix()])

def execute_map(self, forward=True):
def execute_map(self, forward: bool = True) -> None:
root = self.directory if forward else self.obfuscate(self.directory)
self._execute_map(root, forward=forward)

Expand All @@ -45,7 +46,7 @@ def symlink_to_phoney(self, base=None):
@staticmethod
def lcd(path1, path2): return tb.P("/".join([part1 for part1, part2 in zip(tb.P(path1).expanduser().absolute().parts, tb.P(path2).expanduser().absolute().parts) if part1 == part2]))

def _execute_map(self, path: tb.P, forward):
def _execute_map(self, path: tb.P, forward: bool) -> None:
assert path.is_dir()
children = path.search("*", r=False)
for child in children:
Expand Down Expand Up @@ -97,11 +98,11 @@ def display(self):
print(v)
print('-' * 100)

def save(self, **kwargs):
def save(self, **kwargs: Any):
_ = kwargs
super().save(self.directory.append("_obfuscater.pkl"))

def update_symlinks(self, directory=None):
def update_symlinks(self, directory: Optional[str] = None):
for path in (directory or self.directory).search("*", r=False):
if path.is_symlink(): continue
if path.is_dir():
Expand Down

0 comments on commit 043fe7e

Please sign in to comment.