Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor multi-node running command into dedicated functions #6623

Merged
merged 27 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
afbf24e
Refractor multi-node running command into dedicated functions
mingxin-zheng Jun 18, 2023
9fccf63
Merge branch 'dev' into fix-6567
mingxin-zheng Jun 19, 2023
178132c
fix undefined name cmd
mingxin-zheng Jun 21, 2023
15f136b
Merge branch 'fix-6567' of https://github.com/mingxin-zheng/MONAI int…
mingxin-zheng Jun 21, 2023
7edf1e1
fix mypy
mingxin-zheng Jun 21, 2023
c1a6c89
Merge branch 'dev' into fix-6567
mingxin-zheng Jun 24, 2023
3ef2300
refractor
mingxin-zheng Jun 24, 2023
6764c7c
Merge branch 'fix-6567' of https://github.com/mingxin-zheng/MONAI int…
mingxin-zheng Jun 24, 2023
ad37c92
refractor
mingxin-zheng Jun 24, 2023
1ff5bca
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 24, 2023
e4bfbec
Merge branch 'dev' into fix-6567
mingxin-zheng Jun 24, 2023
92b6a10
Merge branch 'fix-6567' of https://github.com/mingxin-zheng/MONAI int…
mingxin-zheng Jun 24, 2023
c0215f9
fixes
mingxin-zheng Jun 25, 2023
745599e
fix ensemble test
mingxin-zheng Jun 25, 2023
6b94a98
fix wrong file
mingxin-zheng Jun 25, 2023
906a6e8
Update ensemble
mingxin-zheng Jun 25, 2023
82a2296
fix integration
mingxin-zheng Jun 25, 2023
8214299
update docstrings
mingxin-zheng Jun 25, 2023
93bdd63
autofix
mingxin-zheng Jun 25, 2023
f435478
fix mypy
mingxin-zheng Jun 25, 2023
b0dec5b
Merge branch 'dev' into fix-6567
mingxin-zheng Jul 4, 2023
6879378
fix comments
mingxin-zheng Jul 4, 2023
d04d112
Merge branch 'dev' into fix-6567
wyli Jul 5, 2023
eff3305
fix test error
mingxin-zheng Jul 5, 2023
5ac9573
Merge branch 'fix-6567' of https://github.com/mingxin-zheng/MONAI int…
mingxin-zheng Jul 5, 2023
39efc62
[MONAI] code formatting
monai-bot Jul 5, 2023
93d6e05
fixes mypy
wyli Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 61 additions & 53 deletions monai/apps/auto3dseg/bundle_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

import importlib
import os
import re
import shutil
import subprocess
import sys
import time
import warnings
from collections.abc import Mapping
from copy import deepcopy
from pathlib import Path
from tempfile import TemporaryDirectory
Expand All @@ -30,10 +30,17 @@
from monai.apps import download_and_extract
from monai.apps.utils import get_logger
from monai.auto3dseg.algo_gen import Algo, AlgoGen
from monai.auto3dseg.utils import algo_to_pickle
from monai.auto3dseg.utils import (
_create_bcprun,
_create_default,
_create_torchrun,
_run_cmd_bcprun,
_run_cmd_torchrun,
algo_to_pickle,
)
from monai.bundle.config_parser import ConfigParser
from monai.config import PathLike
from monai.utils import ensure_tuple, run_cmd
from monai.utils import ensure_tuple, look_up_option, run_cmd
from monai.utils.enums import AlgoKeys

logger = get_logger(module_name=__name__)
Expand Down Expand Up @@ -175,36 +182,45 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]:
train_py = os.path.join(self.output_path, "scripts", "train.py")
config_dir = os.path.join(self.output_path, "configs")

config_files = []
if os.path.isdir(config_dir):
base_cmd = ""
for file in sorted(os.listdir(config_dir)):
if not (file.endswith("yaml") or file.endswith("json")):
continue
base_cmd += f"{train_py} run --config_file=" if len(base_cmd) == 0 else ","
# Python Fire may be confused by single-quoted WindowsPath
config_yaml = Path(os.path.join(config_dir, file)).as_posix()
base_cmd += f"'{config_yaml}'"
cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore
# make sure cmd end with a space
if cmd is not None and not cmd.endswith(" "):
cmd += " "
if (int(self.device_setting["NUM_NODES"]) > 1 and self.device_setting["MN_START_METHOD"] == "bcprun") or (
int(self.device_setting["NUM_NODES"]) <= 1 and int(self.device_setting["n_devices"]) <= 1
):
cmd = "python " if cmd is None else cmd
elif int(self.device_setting["NUM_NODES"]) > 1:
raise NotImplementedError(
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
"Try modify BundleAlgo._create_cmd for your cluster."
if file.endswith("yaml") or file.endswith("json"):
# Python Fire may be confused by single-quoted WindowsPath
config_files.append(Path(os.path.join(config_dir, file)).as_posix())

if int(self.device_setting["NUM_NODES"]) > 1:
wyli marked this conversation as resolved.
Show resolved Hide resolved
# multi-node command
# only bcprun is supported for now
try:
look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"])
except ValueError as err:
raise NotImplementedError(
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
"Try modify BundleAlgo._create_cmd for your cluster."
) from err

return (
_create_bcprun(
f"{train_py} run",
cmd_prefix=str(self.device_setting["CMD_PREFIX"]),
config_file=config_files,
**params,
),
"",
)
elif int(self.device_setting["n_devices"]) > 1:
return _create_torchrun(f"{train_py} run", config_file=config_files, **params), ""
else:
if cmd is None:
cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} "
cmd += base_cmd
if params and isinstance(params, Mapping):
for k, v in params.items():
cmd += f" --{k}={v}"
return cmd, ""
return (
_create_default(
f"{train_py} run",
cmd_prefix=str(self.device_setting["CMD_PREFIX"]),
config_file=config_files,
**params,
),
"",
)

def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess:
"""
Expand All @@ -216,34 +232,26 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc

ps_environ = os.environ.copy()
ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"])

# delete pattern "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces
cmd = re.sub(r"^\s*\w+=.*?\s+", "", cmd)

if int(self.device_setting["NUM_NODES"]) > 1:
if self.device_setting["MN_START_METHOD"] == "bcprun":
cmd_list = [
"bcprun",
"-n",
str(self.device_setting["NUM_NODES"]),
"-p",
str(self.device_setting["n_devices"]),
"-c",
cmd,
]
else:
try:
look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"])
except ValueError as err:
raise NotImplementedError(
f"{self.device_setting['MN_START_METHOD']} is not supported yet. "
f"{self.device_setting['MN_START_METHOD']} is not supported yet."
"Try modify BundleAlgo._run_cmd for your cluster."
)
else:
cmd_list = cmd.split()

_idx = 0
for _idx, c in enumerate(cmd_list):
if "=" not in c: # remove variable assignments before the command such as "OMP_NUM_THREADS=1"
break
cmd_list = cmd_list[_idx:]
) from err

logger.info(f"Launching: {' '.join(cmd_list)}")

return run_cmd(cmd_list, env=ps_environ, check=True)
return _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"])
elif int(self.device_setting["n_devices"]) > 1:
return _run_cmd_torchrun(
cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True
)
else:
return run_cmd(cmd.split(), env=ps_environ, check=True)

def train(
self, train_params: None | dict = None, device_setting: None | dict = None
Expand Down
30 changes: 8 additions & 22 deletions monai/apps/auto3dseg/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history
from monai.apps.utils import get_logger
from monai.auto3dseg import concat_val_to_np
from monai.auto3dseg.utils import datafold_read
from monai.auto3dseg.utils import _create_bcprun, _create_torchrun, _run_cmd_bcprun, _run_cmd_torchrun, datafold_read
from monai.bundle import ConfigParser
from monai.data import partition_dataset
from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble
from monai.utils import RankFilter, deprecated_arg
from monai.utils.enums import AlgoKeys
from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd
from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class
from monai.utils.module import look_up_option, optional_import

tqdm, has_tqdm = optional_import("tqdm", name="tqdm")
Expand Down Expand Up @@ -642,34 +642,20 @@ def _create_cmd(self) -> None:
# define env for subprocess
ps_environ = os.environ.copy()
ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"])
cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore
if cmd is not None and not str(cmd).endswith(" "):
cmd += " "
if int(self.device_setting["NUM_NODES"]) > 1:
if self.device_setting["MN_START_METHOD"] != "bcprun":
raise NotImplementedError(
f"{self.device_setting['MN_START_METHOD']} is not supported yet. "
"Try modify EnsembleRunner._create_cmd for your cluster."
)
logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!")
cmd = "python " if cmd is None else cmd
cmd = f"{cmd} -m {base_cmd}"
cmd_list = [
"bcprun",
"-n",
str(self.device_setting["NUM_NODES"]),
"-p",
str(self.device_setting["n_devices"]),
"-c",
cmd,
]
cmd = _create_bcprun("-m " + base_cmd, cmd_prefix=str(self.device_setting["CMD_PREFIX"]))
_run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"])

else:
logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!")
if cmd is None:
cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} "
cmd = f"{cmd} -m {base_cmd}"
cmd_list = cmd.split()

run_cmd(cmd_list, env=ps_environ, check=True)
cmd = _create_torchrun("-m " + base_cmd)
_run_cmd_torchrun(
cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True
)
return
149 changes: 148 additions & 1 deletion monai/auto3dseg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import os
import pickle
import subprocess
import sys
from copy import deepcopy
from numbers import Number
Expand All @@ -28,7 +29,7 @@
from monai.config import PathLike
from monai.data.meta_tensor import MetaTensor
from monai.transforms import CropForeground, ToCupy
from monai.utils import min_version, optional_import
from monai.utils import min_version, optional_import, run_cmd

__all__ = [
"get_foreground_image",
Expand Down Expand Up @@ -372,3 +373,149 @@ def algo_from_pickle(pkl_filename: str, template_path: PathLike | None = None, *
algo_meta_data.update({k: v})

return algo, algo_meta_data


def list_to_python_fire_arg_str(args: list) -> str:
"""
Convert a list of arguments to a string that can be used in python-fire.

Args:
args: the list of arguments.

Returns:
the string that can be used in python-fire.
"""
args_str = ",".join([str(arg) for arg in args])
return f"'{args_str}'"


def check_and_set_optional_args(params: dict) -> str:
""" """
cmd_mod_opt = ""
for k, v in params.items():
if isinstance(v, dict):
raise ValueError("Nested dict is not supported.")
elif isinstance(v, list):
v = list_to_python_fire_arg_str(v)
cmd_mod_opt += f" --{k} {str(v)}"
return cmd_mod_opt


def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str:
mingxin-zheng marked this conversation as resolved.
Show resolved Hide resolved
"""
Prepare the command for subprocess to run the script with the given arguments.

Args:
cmd: the command or script to run in the distributed job.
cmd_prefix: the command prefix to run the script, e.g., "python", "python -m", "python3", "/opt/conda/bin/python3.8 ".
kwargs: the keyword arguments to be passed to the script.

Returns:
the command to run with ``subprocess``.

Examples:
To prepare a subprocess command
"python train.py run -k --config 'a,b'", the function can be called as
- _create_default("train.py run -k", config=['a','b'])
- _create_default("train.py run -k --config 'a,b'")

"""
params = kwargs.copy()

if not cmd_prefix.endswith(" "):
cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended

return cmd_prefix + cmd + check_and_set_optional_args(params)


def _create_torchrun(cmd: str, **kwargs: Any) -> str:
"""
Prepare the command for multi-gpu/multi-node job execution using torchrun.

Args:
cmd: the command or script to run in the distributed job.
kwargs: the keyword arguments to be passed to the script.

Returns:
the command to append to ``torchrun``

Examples:
For command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'",
it only prepares command after the torchrun arguments, i.e., "train.py run -k --config 'a,b'".
The function can be called as
- _create_torchrun("train.py run -k", config=['a','b'])
- _create_torchrun("train.py run -k --config 'a,b'")
"""
params = kwargs.copy()
return cmd + check_and_set_optional_args(params)


def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str:
"""
Prepare the command for distributed job running using bcprun.

Args:
script: the script to run in the distributed job.
cmd_prefix: the command prefix to run the script, e.g., "python".
kwargs: the keyword arguments to be passed to the script.

Returns:
The command to run the script in the distributed job.

Examples:
For command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'",
it only prepares command after the bcprun arguments, i.e., "train.py run -k --config 'a,b'".
the function can be called as
- _create_bcprun("train.py run -k", config=['a','b'], n=2, p=8)
- _create_bcprun("train.py run -k --config 'a,b'", n=2, p=8)
"""

return _create_default(cmd, cmd_prefix=cmd_prefix, **kwargs)


def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess:
"""
Run the command with torchrun.

Args:
cmd: the command to run. Typically it is prepared by ``_create_torchrun``.
kwargs: the keyword arguments to be passed to the ``torchrun``.

Return:
the return code of the subprocess command.
"""
params = kwargs.copy()

cmd_list = cmd.split()

# append arguments to the command list
torchrun_list = ["torchrun"]
required_args = ["nnodes", "nproc_per_node"]
for arg in required_args:
if arg not in params:
raise ValueError(f"Missing required argument {arg} for torchrun.")
torchrun_list += [f"--{arg}", str(params.pop(arg))]
torchrun_list += cmd_list
return run_cmd(torchrun_list, **params)


def _run_cmd_bcprun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess:
"""
Run the command with bcprun.

Args:
cmd: the command to run. Typically it is prepared by ``_create_bcprun``.
kwargs: the keyword arguments to be passed to the ``bcprun``.

Returns:
the return code of the subprocess command.
"""
params = kwargs.copy()
cmd_list = ["bcprun"]
required_args = ["n", "p"]
for arg in required_args:
if arg not in params:
raise ValueError(f"Missing required argument {arg} for bcprun.")
cmd_list += [f"-{arg}", str(params.pop(arg))]
cmd_list.extend(["-c", cmd])
return run_cmd(cmd_list, **params)