From 2c662b28778952ccfc86ef79aaeec93f7bcb4fea Mon Sep 17 00:00:00 2001 From: dongreenberg Date: Fri, 6 Sep 2024 17:44:51 +0200 Subject: [PATCH] Install default_env on all nodes --- runhouse/constants.py | 3 ++ runhouse/resources/envs/conda_env.py | 73 ++++++++++++++++---------- runhouse/resources/envs/env.py | 46 +++++++++------- runhouse/resources/envs/utils.py | 13 ++--- runhouse/resources/hardware/cluster.py | 8 ++- runhouse/resources/hardware/utils.py | 6 +-- runhouse/resources/packages/package.py | 16 ++++-- 7 files changed, 106 insertions(+), 59 deletions(-) diff --git a/runhouse/constants.py b/runhouse/constants.py index 103cad8f5..c936107a9 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -59,6 +59,9 @@ "bash ~/miniconda.sh -b -p ~/miniconda", "source $HOME/miniconda/bin/activate", ] +# TODO should default to user's local Python version? +# from platform import python_version; python_version() +CONDA_PREFERRED_PYTHON_VERSION = "3.10.9" TEST_ORG = "test-org" TESTING_LOG_LEVEL = "INFO" diff --git a/runhouse/resources/envs/conda_env.py b/runhouse/resources/envs/conda_env.py index b33ada436..ccdd2ccd4 100644 --- a/runhouse/resources/envs/conda_env.py +++ b/runhouse/resources/envs/conda_env.py @@ -5,7 +5,7 @@ import yaml -from runhouse.constants import ENVS_DIR +from runhouse.constants import CONDA_PREFERRED_PYTHON_VERSION, ENVS_DIR from runhouse.globals import obj_store from runhouse.logger import get_logger @@ -59,18 +59,21 @@ def config(self, condensed=True): def env_name(self): return self.conda_yaml["name"] - def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None): + def _create_conda_env( + self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None + ): yaml_path = Path(ENVS_DIR) / f"{self.env_name}.yml" env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) - run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster) + run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster, node=node) yaml_exists = ( (Path(ENVS_DIR).expanduser() / f"{self.env_name}.yml").exists() if not cluster - else run_setup_command(f"ls {yaml_path}", cluster=cluster)[0] == 0 + else run_setup_command(f"ls {yaml_path}", cluster=cluster, node=node)[0] + == 0 ) if force or not (yaml_exists and env_exists): @@ -87,19 +90,25 @@ def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None): subprocess.run(f'python -c "{python_commands}"', shell=True) else: contents = yaml.dump(self.conda_yaml) - run_setup_command(f"echo $'{contents}' > {yaml_path}", cluster=cluster) + run_setup_command( + f"echo $'{contents}' > {yaml_path}", cluster=cluster, node=node + ) # create conda env from yaml file - run_setup_command(f"conda env create -f {yaml_path}", cluster=cluster) + run_setup_command( + f"conda env create -f {yaml_path}", cluster=cluster, node=node + ) env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) if not env_exists: raise RuntimeError(f"conda env {self.env_name} not created properly.") - def install(self, force: bool = False, cluster: "Cluster" = None): + def install( + self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None + ): """Locally install packages and run setup commands. Args: @@ -109,32 +118,42 @@ def install(self, force: bool = False, cluster: "Cluster" = None): on the cluster using SSH. (default: ``None``) """ if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]): - status_codes = run_setup_command("python --version", cluster=cluster) + status_codes = run_setup_command( + "python --version", cluster=cluster, node=node + ) base_python_version = ( - status_codes[1].split()[1] if status_codes[0] == 0 else "3.10.9" + status_codes[1].split()[1] + if status_codes[0] == 0 + else CONDA_PREFERRED_PYTHON_VERSION ) self.conda_yaml["dependencies"].append(f"python=={base_python_version}") install_conda(cluster=cluster) local_env_exists = ( f"\n{self.env_name} " - in run_setup_command("conda info --envs", cluster=cluster)[1] + in run_setup_command("conda info --envs", cluster=cluster, node=node)[1] ) - # Hash the config_for_rns to check if we need to create/install the conda env - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if local_env_exists and install_hash in obj_store.installed_envs and not force: - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - self._create_conda_env(force=force, cluster=cluster) - - self._install_reqs(cluster=cluster) - self._run_setup_cmds(cluster=cluster) + # If we're doing the install remotely via SSH (e.g. for default_env), there is no cache + if not cluster: + # Hash the config_for_rns to check if we need to create/install the conda env + env_config = self.config() + # Remove the name because auto-generated names will be different, but the installed components are the same + env_config.pop("name") + install_hash = hash(str(env_config)) + # Check the existing hash + if ( + local_env_exists + and install_hash in obj_store.installed_envs + and not force + ): + logger.debug("Env already installed, skipping") + return + obj_store.installed_envs[install_hash] = self.name + + self._create_conda_env(force=force, cluster=cluster, node=node) + + self._install_reqs(cluster=cluster, node=node) + self._run_setup_cmds(cluster=cluster, node=node) return diff --git a/runhouse/resources/envs/env.py b/runhouse/resources/envs/env.py index 41e011f40..d06b94522 100644 --- a/runhouse/resources/envs/env.py +++ b/runhouse/resources/envs/env.py @@ -140,7 +140,9 @@ def _secrets_to(self, system: Union[str, Cluster]): new_secrets.append(secret.to(system=system, env=self)) return new_secrets - def _install_reqs(self, cluster: Cluster = None, reqs: List = None): + def _install_reqs( + self, cluster: Cluster = None, reqs: List = None, node: str = "all" + ): reqs = reqs or self.reqs if reqs: for package in reqs: @@ -154,9 +156,11 @@ def _install_reqs(self, cluster: Cluster = None, reqs: List = None): raise ValueError(f"package {package} not recognized") logger.debug(f"Installing package: {str(pkg)}") - pkg._install(env=self, cluster=cluster) + pkg._install(env=self, cluster=cluster, node=node) - def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None): + def _run_setup_cmds( + self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all" + ): setup_cmds = setup_cmds or self.setup_cmds if not setup_cmds: @@ -165,10 +169,13 @@ def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None): for cmd in setup_cmds: cmd = self._full_command(cmd) run_setup_command( - cmd, cluster=cluster, env_vars=_process_env_vars(self.env_vars) + cmd, + cluster=cluster, + env_vars=_process_env_vars(self.env_vars), + node=node, ) - def install(self, force: bool = False, cluster: Cluster = None): + def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"): """Locally install packages and run setup commands. Args: @@ -176,20 +183,23 @@ def install(self, force: bool = False, cluster: Cluster = None): on the cluster. (Default: ``False``) cluster (Clsuter, optional): Cluster to install the env on. If not provided, env is installed on the current cluster. (Default: ``None``) + node (str, optional): Node to install the env on. (Default: ``"all"``) """ - # Hash the config_for_rns to check if we need to install - env_config = self.config() - # Remove the name because auto-generated names will be different, but the installed components are the same - env_config.pop("name") - install_hash = hash(str(env_config)) - # Check the existing hash - if install_hash in obj_store.installed_envs and not force: - logger.debug("Env already installed, skipping") - return - obj_store.installed_envs[install_hash] = self.name - - self._install_reqs(cluster=cluster) - self._run_setup_cmds(cluster=cluster) + # If we're doing the install remotely via SSH (e.g. for default_env), there is no cache + if not cluster: + # Hash the config_for_rns to check if we need to install + env_config = self.config() + # Remove the name because auto-generated names will be different, but the installed components are the same + env_config.pop("name") + install_hash = hash(str(env_config)) + # Check the existing hash + if install_hash in obj_store.installed_envs and not force: + logger.debug("Env already installed, skipping") + return + obj_store.installed_envs[install_hash] = self.name + + self._install_reqs(cluster=cluster, node=node) + self._run_setup_cmds(cluster=cluster, node=node) def _full_command(self, command: str): if self._run_cmd: diff --git a/runhouse/resources/envs/utils.py b/runhouse/resources/envs/utils.py index 1c4182627..5ef8465e2 100644 --- a/runhouse/resources/envs/utils.py +++ b/runhouse/resources/envs/utils.py @@ -2,7 +2,7 @@ import subprocess from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional import yaml @@ -131,6 +131,7 @@ def run_setup_command( cluster: "Cluster" = None, env_vars: Dict = None, stream_logs: bool = True, + node: Optional[str] = None, ): """ Helper function to run a command during possibly the cluster default env setup. If a cluster is provided, @@ -152,14 +153,14 @@ def run_setup_command( return run_with_logs(cmd, stream_logs=stream_logs, require_outputs=True)[:2] return cluster._run_commands_with_runner( - [cmd], stream_logs=stream_logs, env_vars=env_vars + [cmd], stream_logs=stream_logs, env_vars=env_vars, node=node )[0] -def install_conda(cluster: "Cluster" = None): - if run_setup_command("conda --version", cluster=cluster)[0] != 0: +def install_conda(cluster: "Cluster" = None, node: Optional[str] = None): + if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0: logging.info("Conda is not installed. Installing...") for cmd in CONDA_INSTALL_CMDS: - run_setup_command(cmd, cluster=cluster, stream_logs=True) - if run_setup_command("conda --version", cluster=cluster)[0] != 0: + run_setup_command(cmd, cluster=cluster, node=node, stream_logs=True) + if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0: raise RuntimeError("Could not install Conda.") diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index b7e5e096c..2fd0cf044 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -444,6 +444,11 @@ def _command_runner(self, node: Optional[str] = None) -> "CommandRunner": SkySSHRunner, ) + if node == "all": + raise ValueError( + "CommandRunner can only be instantiated for individual nodes" + ) + node = node or self.address if ( @@ -518,7 +523,8 @@ def _sync_default_env_to_cluster(self): logger.info(f"Using log level {log_level} on cluster's default env") logger.info(f"Syncing default env {self._default_env.name} to cluster") - self._default_env.install(cluster=self) + for node in self.ips: + self._default_env.install(cluster=self, node=node) def _sync_runhouse_to_cluster( self, diff --git a/runhouse/resources/hardware/utils.py b/runhouse/resources/hardware/utils.py index 5105c9552..87e2fec6a 100644 --- a/runhouse/resources/hardware/utils.py +++ b/runhouse/resources/hardware/utils.py @@ -125,18 +125,18 @@ def _unnamed_default_env_name(cluster_name): return f"{cluster_name}_default_env" -def detect_cuda_version_or_cpu(cluster: "Cluster" = None): +def detect_cuda_version_or_cpu(cluster: "Cluster" = None, node: Optional[str] = None): """Return the CUDA version on the cluster. If we are on a CPU-only cluster return 'cpu'. Note: A cpu-only machine may have the CUDA toolkit installed, which means nvcc will still return a valid version. Also check if the NVIDIA driver is installed to confirm we are on a GPU.""" - status_codes = run_setup_command("nvcc --version", cluster=cluster) + status_codes = run_setup_command("nvcc --version", cluster=cluster, node=node) if not status_codes[0] == 0: return "cpu" cuda_version = status_codes[1].split("release ")[1].split(",")[0] - if run_setup_command("nvidia-smi", cluster=cluster)[0] == 0: + if run_setup_command("nvidia-smi", cluster=cluster, node=node)[0] == 0: return cuda_version return "cpu" diff --git a/runhouse/resources/packages/package.py b/runhouse/resources/packages/package.py index 03f44f65e..fb5099c76 100644 --- a/runhouse/resources/packages/package.py +++ b/runhouse/resources/packages/package.py @@ -223,7 +223,12 @@ def _reqs_install_cmd( install_cmd = self._prepend_env_command(install_cmd, env=env) return install_cmd - def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): + def _install( + self, + env: Union[str, "Env"] = None, + cluster: "Cluster" = None, + node: Optional[str] = None, + ): """Install package. Args: @@ -240,6 +245,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): dest=str(self.install_target.path_to_sync_to_on_cluster), up=True, contents=True, + node=node, ) self.install_target.local_path = ( @@ -265,6 +271,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): retcode = run_setup_command( f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"", cluster=cluster, + node=node, )[0] if retcode != 0: self.install_target = ( @@ -273,7 +280,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): install_cmd = self._pip_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method pip: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Pip install {install_cmd} failed, check that the package exists and is available for your platform." @@ -282,7 +289,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): elif self.install_method == "conda": install_cmd = self._conda_install_cmd(env=env, cluster=cluster) logger.info(f"Running via install_method conda: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Conda install {install_cmd} failed, check that the package exists and is " @@ -293,7 +300,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): install_cmd = self._reqs_install_cmd(env=env, cluster=cluster) if install_cmd: logger.info(f"Running via install_method reqs: {install_cmd}") - retcode = run_setup_command(install_cmd, cluster=cluster)[0] + retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0] if retcode != 0: raise RuntimeError( f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform." @@ -315,6 +322,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None): ) if not cluster else run_setup_command( f"export PATH=$PATH;{self.install_target.full_local_path_str()}", cluster=cluster, + node=node, ) elif not cluster: if Path(self.install_target).resolve().expanduser().exists():