Skip to content

Commit

Permalink
Install default_env on all nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
dongreenberg committed Sep 6, 2024
1 parent 0d6293d commit 0b8a327
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 59 deletions.
1 change: 1 addition & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"bash ~/miniconda.sh -b -p ~/miniconda",
"source $HOME/miniconda/bin/activate",
]
CONDA_PREFERRED_PYTHON_VERSION = "3.10.9"

TEST_ORG = "test-org"
TESTING_LOG_LEVEL = "INFO"
Expand Down
73 changes: 46 additions & 27 deletions runhouse/resources/envs/conda_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import yaml

from runhouse.constants import ENVS_DIR
from runhouse.constants import CONDA_PREFERRED_PYTHON_VERSION, ENVS_DIR
from runhouse.globals import obj_store
from runhouse.logger import get_logger

Expand Down Expand Up @@ -59,18 +59,21 @@ def config(self, condensed=True):
def env_name(self):
return self.conda_yaml["name"]

def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None):
def _create_conda_env(
self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None
):
yaml_path = Path(ENVS_DIR) / f"{self.env_name}.yml"

env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)
run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster)
run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster, node=node)
yaml_exists = (
(Path(ENVS_DIR).expanduser() / f"{self.env_name}.yml").exists()
if not cluster
else run_setup_command(f"ls {yaml_path}", cluster=cluster)[0] == 0
else run_setup_command(f"ls {yaml_path}", cluster=cluster, node=node)[0]
== 0
)

if force or not (yaml_exists and env_exists):
Expand All @@ -87,19 +90,25 @@ def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None):
subprocess.run(f'python -c "{python_commands}"', shell=True)
else:
contents = yaml.dump(self.conda_yaml)
run_setup_command(f"echo $'{contents}' > {yaml_path}", cluster=cluster)
run_setup_command(
f"echo $'{contents}' > {yaml_path}", cluster=cluster, node=node
)

# create conda env from yaml file
run_setup_command(f"conda env create -f {yaml_path}", cluster=cluster)
run_setup_command(
f"conda env create -f {yaml_path}", cluster=cluster, node=node
)

env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)
if not env_exists:
raise RuntimeError(f"conda env {self.env_name} not created properly.")

def install(self, force: bool = False, cluster: "Cluster" = None):
def install(
self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None
):
"""Locally install packages and run setup commands.
Args:
Expand All @@ -109,32 +118,42 @@ def install(self, force: bool = False, cluster: "Cluster" = None):
on the cluster using SSH. (default: ``None``)
"""
if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]):
status_codes = run_setup_command("python --version", cluster=cluster)
status_codes = run_setup_command(
"python --version", cluster=cluster, node=node
)
base_python_version = (
status_codes[1].split()[1] if status_codes[0] == 0 else "3.10.9"
status_codes[1].split()[1]
if status_codes[0] == 0
else CONDA_PREFERRED_PYTHON_VERSION
)
self.conda_yaml["dependencies"].append(f"python=={base_python_version}")
install_conda(cluster=cluster)
local_env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)

# Hash the config_for_rns to check if we need to create/install the conda env
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if local_env_exists and install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._create_conda_env(force=force, cluster=cluster)

self._install_reqs(cluster=cluster)
self._run_setup_cmds(cluster=cluster)
# If we're doing the install remotely via SSH (e.g. for default_env), there is no cache
if not cluster:
# Hash the config_for_rns to check if we need to create/install the conda env
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if (
local_env_exists
and install_hash in obj_store.installed_envs
and not force
):
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._create_conda_env(force=force, cluster=cluster, node=node)

self._install_reqs(cluster=cluster, node=node)
self._run_setup_cmds(cluster=cluster, node=node)

return

Expand Down
46 changes: 28 additions & 18 deletions runhouse/resources/envs/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def _secrets_to(self, system: Union[str, Cluster]):
new_secrets.append(secret.to(system=system, env=self))
return new_secrets

def _install_reqs(self, cluster: Cluster = None, reqs: List = None):
def _install_reqs(
self, cluster: Cluster = None, reqs: List = None, node: str = "all"
):
reqs = reqs or self.reqs
if reqs:
for package in reqs:
Expand All @@ -154,9 +156,11 @@ def _install_reqs(self, cluster: Cluster = None, reqs: List = None):
raise ValueError(f"package {package} not recognized")

logger.debug(f"Installing package: {str(pkg)}")
pkg._install(env=self, cluster=cluster)
pkg._install(env=self, cluster=cluster, node=node)

def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None):
def _run_setup_cmds(
self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all"
):
setup_cmds = setup_cmds or self.setup_cmds

if not setup_cmds:
Expand All @@ -165,31 +169,37 @@ def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None):
for cmd in setup_cmds:
cmd = self._full_command(cmd)
run_setup_command(
cmd, cluster=cluster, env_vars=_process_env_vars(self.env_vars)
cmd,
cluster=cluster,
env_vars=_process_env_vars(self.env_vars),
node=node,
)

def install(self, force: bool = False, cluster: Cluster = None):
def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"):
"""Locally install packages and run setup commands.
Args:
force (bool, optional): Whether to setup the installation again if the env already exists
on the cluster. (Default: ``False``)
cluster (Clsuter, optional): Cluster to install the env on. If not provided, env is installed
on the current cluster. (Default: ``None``)
node (str, optional): Node to install the env on. (Default: ``"all"``)
"""
# Hash the config_for_rns to check if we need to install
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._install_reqs(cluster=cluster)
self._run_setup_cmds(cluster=cluster)
# If we're doing the install remotely via SSH (e.g. for default_env), there is no cache
if not cluster:
# Hash the config_for_rns to check if we need to install
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._install_reqs(cluster=cluster, node=node)
self._run_setup_cmds(cluster=cluster, node=node)

def _full_command(self, command: str):
if self._run_cmd:
Expand Down
13 changes: 7 additions & 6 deletions runhouse/resources/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import subprocess

from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional

import yaml

Expand Down Expand Up @@ -131,6 +131,7 @@ def run_setup_command(
cluster: "Cluster" = None,
env_vars: Dict = None,
stream_logs: bool = True,
node: Optional[str] = None,
):
"""
Helper function to run a command during possibly the cluster default env setup. If a cluster is provided,
Expand All @@ -152,14 +153,14 @@ def run_setup_command(
return run_with_logs(cmd, stream_logs=stream_logs, require_outputs=True)[:2]

return cluster._run_commands_with_runner(
[cmd], stream_logs=stream_logs, env_vars=env_vars
[cmd], stream_logs=stream_logs, env_vars=env_vars, node=node
)[0]


def install_conda(cluster: "Cluster" = None):
if run_setup_command("conda --version", cluster=cluster)[0] != 0:
def install_conda(cluster: "Cluster" = None, node: Optional[str] = None):
if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0:
logging.info("Conda is not installed. Installing...")
for cmd in CONDA_INSTALL_CMDS:
run_setup_command(cmd, cluster=cluster, stream_logs=True)
if run_setup_command("conda --version", cluster=cluster)[0] != 0:
run_setup_command(cmd, cluster=cluster, node=node, stream_logs=True)
if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0:
raise RuntimeError("Could not install Conda.")
8 changes: 7 additions & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ def _command_runner(self, node: Optional[str] = None) -> "CommandRunner":
SkySSHRunner,
)

if node == "all":
raise ValueError(
"CommandRunner can only be instantiated for individual nodes"
)

node = node or self.address

if (
Expand Down Expand Up @@ -518,7 +523,8 @@ def _sync_default_env_to_cluster(self):
logger.info(f"Using log level {log_level} on cluster's default env")

logger.info(f"Syncing default env {self._default_env.name} to cluster")
self._default_env.install(cluster=self)
for node in self.ips:
self._default_env.install(cluster=self, node=node)

def _sync_runhouse_to_cluster(
self,
Expand Down
6 changes: 3 additions & 3 deletions runhouse/resources/hardware/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,18 @@ def _unnamed_default_env_name(cluster_name):
return f"{cluster_name}_default_env"


def detect_cuda_version_or_cpu(cluster: "Cluster" = None):
def detect_cuda_version_or_cpu(cluster: "Cluster" = None, node: Optional[str] = None):
"""Return the CUDA version on the cluster. If we are on a CPU-only cluster return 'cpu'.
Note: A cpu-only machine may have the CUDA toolkit installed, which means nvcc will still return
a valid version. Also check if the NVIDIA driver is installed to confirm we are on a GPU."""

status_codes = run_setup_command("nvcc --version", cluster=cluster)
status_codes = run_setup_command("nvcc --version", cluster=cluster, node=node)
if not status_codes[0] == 0:
return "cpu"
cuda_version = status_codes[1].split("release ")[1].split(",")[0]

if run_setup_command("nvidia-smi", cluster=cluster)[0] == 0:
if run_setup_command("nvidia-smi", cluster=cluster, node=node)[0] == 0:
return cuda_version
return "cpu"

Expand Down
16 changes: 12 additions & 4 deletions runhouse/resources/packages/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ def _reqs_install_cmd(
install_cmd = self._prepend_env_command(install_cmd, env=env)
return install_cmd

def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
def _install(
self,
env: Union[str, "Env"] = None,
cluster: "Cluster" = None,
node: Optional[str] = None,
):
"""Install package.
Args:
Expand All @@ -240,6 +245,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
dest=str(self.install_target.path_to_sync_to_on_cluster),
up=True,
contents=True,
node=node,
)

self.install_target.local_path = (
Expand All @@ -265,6 +271,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
retcode = run_setup_command(
f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"",
cluster=cluster,
node=node,
)[0]
if retcode != 0:
self.install_target = (
Expand All @@ -273,7 +280,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):

install_cmd = self._pip_install_cmd(env=env, cluster=cluster)
logger.info(f"Running via install_method pip: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Pip install {install_cmd} failed, check that the package exists and is available for your platform."
Expand All @@ -282,7 +289,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
elif self.install_method == "conda":
install_cmd = self._conda_install_cmd(env=env, cluster=cluster)
logger.info(f"Running via install_method conda: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Conda install {install_cmd} failed, check that the package exists and is "
Expand All @@ -293,7 +300,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
install_cmd = self._reqs_install_cmd(env=env, cluster=cluster)
if install_cmd:
logger.info(f"Running via install_method reqs: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform."
Expand All @@ -315,6 +322,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
) if not cluster else run_setup_command(
f"export PATH=$PATH;{self.install_target.full_local_path_str()}",
cluster=cluster,
node=node,
)
elif not cluster:
if Path(self.install_target).resolve().expanduser().exists():
Expand Down

0 comments on commit 0b8a327

Please sign in to comment.