Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Install default_env on all nodes #1240

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
"bash ~/miniconda.sh -b -p ~/miniconda",
"source $HOME/miniconda/bin/activate",
]
# TODO should default to user's local Python version?
# from platform import python_version; python_version()
CONDA_PREFERRED_PYTHON_VERSION = "3.10.9"

TEST_ORG = "test-org"
TESTING_LOG_LEVEL = "INFO"
Expand Down
73 changes: 46 additions & 27 deletions runhouse/resources/envs/conda_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import yaml

from runhouse.constants import ENVS_DIR
from runhouse.constants import CONDA_PREFERRED_PYTHON_VERSION, ENVS_DIR
from runhouse.globals import obj_store
from runhouse.logger import get_logger

Expand Down Expand Up @@ -59,18 +59,21 @@ def config(self, condensed=True):
def env_name(self):
return self.conda_yaml["name"]

def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None):
def _create_conda_env(
self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None
):
yaml_path = Path(ENVS_DIR) / f"{self.env_name}.yml"

env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)
run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster)
run_setup_command(f"mkdir -p {ENVS_DIR}", cluster=cluster, node=node)
yaml_exists = (
(Path(ENVS_DIR).expanduser() / f"{self.env_name}.yml").exists()
if not cluster
else run_setup_command(f"ls {yaml_path}", cluster=cluster)[0] == 0
else run_setup_command(f"ls {yaml_path}", cluster=cluster, node=node)[0]
== 0
)

if force or not (yaml_exists and env_exists):
Expand All @@ -87,19 +90,25 @@ def _create_conda_env(self, force: bool = False, cluster: "Cluster" = None):
subprocess.run(f'python -c "{python_commands}"', shell=True)
else:
contents = yaml.dump(self.conda_yaml)
run_setup_command(f"echo $'{contents}' > {yaml_path}", cluster=cluster)
run_setup_command(
f"echo $'{contents}' > {yaml_path}", cluster=cluster, node=node
)

# create conda env from yaml file
run_setup_command(f"conda env create -f {yaml_path}", cluster=cluster)
run_setup_command(
f"conda env create -f {yaml_path}", cluster=cluster, node=node
)

env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)
if not env_exists:
raise RuntimeError(f"conda env {self.env_name} not created properly.")

def install(self, force: bool = False, cluster: "Cluster" = None):
def install(
self, force: bool = False, cluster: "Cluster" = None, node: Optional[str] = None
):
"""Locally install packages and run setup commands.

Args:
Expand All @@ -109,32 +118,42 @@ def install(self, force: bool = False, cluster: "Cluster" = None):
on the cluster using SSH. (default: ``None``)
"""
if not any(["python" in dep for dep in self.conda_yaml["dependencies"]]):
status_codes = run_setup_command("python --version", cluster=cluster)
status_codes = run_setup_command(
"python --version", cluster=cluster, node=node
)
base_python_version = (
status_codes[1].split()[1] if status_codes[0] == 0 else "3.10.9"
status_codes[1].split()[1]
if status_codes[0] == 0
else CONDA_PREFERRED_PYTHON_VERSION
)
self.conda_yaml["dependencies"].append(f"python=={base_python_version}")
install_conda(cluster=cluster)
local_env_exists = (
f"\n{self.env_name} "
in run_setup_command("conda info --envs", cluster=cluster)[1]
in run_setup_command("conda info --envs", cluster=cluster, node=node)[1]
)

# Hash the config_for_rns to check if we need to create/install the conda env
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if local_env_exists and install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._create_conda_env(force=force, cluster=cluster)

self._install_reqs(cluster=cluster)
self._run_setup_cmds(cluster=cluster)
# If we're doing the install remotely via SSH (e.g. for default_env), there is no cache
if not cluster:
# Hash the config_for_rns to check if we need to create/install the conda env
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if (
local_env_exists
and install_hash in obj_store.installed_envs
and not force
):
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._create_conda_env(force=force, cluster=cluster, node=node)

self._install_reqs(cluster=cluster, node=node)
self._run_setup_cmds(cluster=cluster, node=node)

return

Expand Down
66 changes: 39 additions & 27 deletions runhouse/resources/envs/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def _secrets_to(self, system: Union[str, Cluster]):
new_secrets.append(secret.to(system=system, env=self))
return new_secrets

def _install_reqs(self, cluster: Cluster = None, reqs: List = None):
def _install_reqs(
self, cluster: Cluster = None, reqs: List = None, node: str = "all"
):
reqs = reqs or self.reqs
if reqs:
for package in reqs:
Expand All @@ -154,9 +156,11 @@ def _install_reqs(self, cluster: Cluster = None, reqs: List = None):
raise ValueError(f"package {package} not recognized")

logger.debug(f"Installing package: {str(pkg)}")
pkg._install(env=self, cluster=cluster)
pkg._install(env=self, cluster=cluster, node=node)

def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None):
def _run_setup_cmds(
self, cluster: Cluster = None, setup_cmds: List = None, node: str = "all"
):
setup_cmds = setup_cmds or self.setup_cmds

if not setup_cmds:
Expand All @@ -165,31 +169,37 @@ def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None):
for cmd in setup_cmds:
cmd = self._full_command(cmd)
run_setup_command(
cmd, cluster=cluster, env_vars=_process_env_vars(self.env_vars)
cmd,
cluster=cluster,
env_vars=_process_env_vars(self.env_vars),
node=node,
)

def install(self, force: bool = False, cluster: Cluster = None):
def install(self, force: bool = False, cluster: Cluster = None, node: str = "all"):
"""Locally install packages and run setup commands.

Args:
force (bool, optional): Whether to setup the installation again if the env already exists
on the cluster. (Default: ``False``)
cluster (Clsuter, optional): Cluster to install the env on. If not provided, env is installed
on the current cluster. (Default: ``None``)
node (str, optional): Node to install the env on. (Default: ``"all"``)
"""
# Hash the config_for_rns to check if we need to install
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._install_reqs(cluster=cluster)
self._run_setup_cmds(cluster=cluster)
# If we're doing the install remotely via SSH (e.g. for default_env), there is no cache
if not cluster:
# Hash the config_for_rns to check if we need to install
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name

self._install_reqs(cluster=cluster, node=node)
self._run_setup_cmds(cluster=cluster, node=node)

def _full_command(self, command: str):
if self._run_cmd:
Expand All @@ -205,7 +215,7 @@ def _run_command(self, command: str, **kwargs):
def to(
self,
system: Union[str, Cluster],
node_idx: int = None,
node_idx: Optional[int] = None,
path: str = None,
force_install: bool = False,
):
Expand All @@ -227,19 +237,21 @@ def to(
>>> s3_env = env.to("s3", path="s3_bucket/my_env")
"""
system = _get_cluster_from(system)
if (
isinstance(system, Cluster)
and node_idx is not None
and node_idx >= len(system.ips)
):
raise ValueError(
f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds."
)

new_env = copy.deepcopy(self)
new_env.reqs, new_env.working_dir = self._reqs_to(system, path)

if isinstance(system, Cluster):
if node_idx is not None:
if node_idx >= len(system.ips):
raise ValueError(
f"Cluster {system.name} has only {len(system.ips)} nodes. Requested node index {node_idx} is out of bounds."
)

if new_env.compute is None:
new_env.compute = {}

new_env.compute = new_env.compute or {}
new_env.compute["node_idx"] = node_idx

key = (
Expand Down
13 changes: 7 additions & 6 deletions runhouse/resources/envs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import subprocess

from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional

import yaml

Expand Down Expand Up @@ -131,6 +131,7 @@ def run_setup_command(
cluster: "Cluster" = None,
env_vars: Dict = None,
stream_logs: bool = True,
node: Optional[str] = None,
):
"""
Helper function to run a command during possibly the cluster default env setup. If a cluster is provided,
Expand All @@ -152,14 +153,14 @@ def run_setup_command(
return run_with_logs(cmd, stream_logs=stream_logs, require_outputs=True)[:2]

return cluster._run_commands_with_runner(
[cmd], stream_logs=stream_logs, env_vars=env_vars
[cmd], stream_logs=stream_logs, env_vars=env_vars, node=node
)[0]


def install_conda(cluster: "Cluster" = None):
if run_setup_command("conda --version", cluster=cluster)[0] != 0:
def install_conda(cluster: "Cluster" = None, node: Optional[str] = None):
if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0:
logging.info("Conda is not installed. Installing...")
for cmd in CONDA_INSTALL_CMDS:
run_setup_command(cmd, cluster=cluster, stream_logs=True)
if run_setup_command("conda --version", cluster=cluster)[0] != 0:
run_setup_command(cmd, cluster=cluster, node=node, stream_logs=True)
if run_setup_command("conda --version", cluster=cluster, node=node)[0] != 0:
raise RuntimeError("Could not install Conda.")
8 changes: 7 additions & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ def _command_runner(self, node: Optional[str] = None) -> "CommandRunner":
SkySSHRunner,
)

if node == "all":
raise ValueError(
"CommandRunner can only be instantiated for individual nodes"
)

node = node or self.address

if (
Expand Down Expand Up @@ -518,7 +523,8 @@ def _sync_default_env_to_cluster(self):
logger.info(f"Using log level {log_level} on cluster's default env")

logger.info(f"Syncing default env {self._default_env.name} to cluster")
self._default_env.install(cluster=self)
for node in self.ips:
self._default_env.install(cluster=self, node=node)

def _sync_runhouse_to_cluster(
self,
Expand Down
6 changes: 3 additions & 3 deletions runhouse/resources/hardware/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,18 @@ def _unnamed_default_env_name(cluster_name):
return f"{cluster_name}_default_env"


def detect_cuda_version_or_cpu(cluster: "Cluster" = None):
def detect_cuda_version_or_cpu(cluster: "Cluster" = None, node: Optional[str] = None):
"""Return the CUDA version on the cluster. If we are on a CPU-only cluster return 'cpu'.

Note: A cpu-only machine may have the CUDA toolkit installed, which means nvcc will still return
a valid version. Also check if the NVIDIA driver is installed to confirm we are on a GPU."""

status_codes = run_setup_command("nvcc --version", cluster=cluster)
status_codes = run_setup_command("nvcc --version", cluster=cluster, node=node)
if not status_codes[0] == 0:
return "cpu"
cuda_version = status_codes[1].split("release ")[1].split(",")[0]

if run_setup_command("nvidia-smi", cluster=cluster)[0] == 0:
if run_setup_command("nvidia-smi", cluster=cluster, node=node)[0] == 0:
return cuda_version
return "cpu"

Expand Down
16 changes: 12 additions & 4 deletions runhouse/resources/packages/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ def _reqs_install_cmd(
install_cmd = self._prepend_env_command(install_cmd, env=env)
return install_cmd

def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
def _install(
self,
env: Union[str, "Env"] = None,
cluster: "Cluster" = None,
node: Optional[str] = None,
):
"""Install package.

Args:
Expand All @@ -240,6 +245,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
dest=str(self.install_target.path_to_sync_to_on_cluster),
up=True,
contents=True,
node=node,
)

self.install_target.local_path = (
Expand All @@ -265,6 +271,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
retcode = run_setup_command(
f"python -c \"import importlib.util; exit(0) if importlib.util.find_spec('{self.install_target}') else exit(1)\"",
cluster=cluster,
node=node,
)[0]
if retcode != 0:
self.install_target = (
Expand All @@ -273,7 +280,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):

install_cmd = self._pip_install_cmd(env=env, cluster=cluster)
logger.info(f"Running via install_method pip: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Pip install {install_cmd} failed, check that the package exists and is available for your platform."
Expand All @@ -282,7 +289,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
elif self.install_method == "conda":
install_cmd = self._conda_install_cmd(env=env, cluster=cluster)
logger.info(f"Running via install_method conda: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Conda install {install_cmd} failed, check that the package exists and is "
Expand All @@ -293,7 +300,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
install_cmd = self._reqs_install_cmd(env=env, cluster=cluster)
if install_cmd:
logger.info(f"Running via install_method reqs: {install_cmd}")
retcode = run_setup_command(install_cmd, cluster=cluster)[0]
retcode = run_setup_command(install_cmd, cluster=cluster, node=node)[0]
if retcode != 0:
raise RuntimeError(
f"Reqs install {install_cmd} failed, check that the package exists and is available for your platform."
Expand All @@ -315,6 +322,7 @@ def _install(self, env: Union[str, "Env"] = None, cluster: "Cluster" = None):
) if not cluster else run_setup_command(
f"export PATH=$PATH;{self.install_target.full_local_path_str()}",
cluster=cluster,
node=node,
)
elif not cluster:
if Path(self.install_target).resolve().expanduser().exists():
Expand Down
Loading
Loading