From afbf24ea8a5aa0e0107ac3036407cffbd8080928 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 18 Jun 2023 15:01:24 +0000 Subject: [PATCH 01/18] Refractor multi-node running command into dedicated functions Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/utils/dist.py | 80 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/monai/utils/dist.py b/monai/utils/dist.py index 47e6de4a98..e7f642dfcf 100644 --- a/monai/utils/dist.py +++ b/monai/utils/dist.py @@ -11,10 +11,13 @@ from __future__ import annotations +import logging +import subprocess import sys import warnings from collections.abc import Callable from logging import Filter +from typing import Any if sys.version_info >= (3, 8): from typing import Literal @@ -205,3 +208,80 @@ def __init__(self, rank: int | None = None, filter_fn: Callable = lambda rank: r def filter(self, *_args): return self.filter_fn(self.rank) + + +def prepare_dist_job_default(script, cmd_prefix: str = None, **kwargs: Any) -> str: + """ + Prepare the command for distributed job submission. + + Args: + script: the script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python" or "torchrun". + kwargs: the keyword arguments to be passed to the script. + + Returns: + the command to run the distributed job. + """ + cmd = cmd_prefix + if cmd is not None and not cmd.endswith(" "): + cmd += " " + cmd = "torchrun " if cmd is None else cmd + if "num_nodes" not in kwargs or "nproc_per_node" not in kwargs: + raise ValueError("num_nodes and nproc_per_node must be specified.") + cmd += f"{script}" + for k, v in kwargs.items(): + if isinstance(v, dict): + raise ValueError("Nested dict is not supported.") + elif isinstance(v, list): + raise ValueError("List is not supported.") + cmd += f" --{k} {str(v)}" + return cmd + + +def prepare_bcprun(script, cmd_prefix: str = None, **kwargs: Any) -> str: + """ + Prepare the command for distributed job submission using bcprun. + + Args: + script: the script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python". + kwargs: the keyword arguments to be passed to the script. + + Returns: + The command to run the script in the distributed job. + """ + bcprun_cmd = "bcprun " + hyperparam = kwargs.copy() + num_nodes = hyperparam.pop("n", None) + n_devices = hyperparam.pop("p", None) + if num_nodes is None or n_devices is None: + raise ValueError("num_nodes(n) and n_devices(p) must be specified.") + bcprun_cmd += f"-n {num_nodes} -p {n_devices} " + + cmd_prefix = "python " if cmd_prefix is None else cmd_prefix + if not cmd_prefix.endswith(" "): + cmd_prefix += " " + + bcprun_cmd += cmd_prefix + cmd += f"{script}" + for k, v in hyperparam.items(): + if isinstance(v, dict): + raise ValueError("Nested dict is not supported.") + elif isinstance(v, list): + raise ValueError("List is not supported.") + cmd += f" --{k} {str(v)}" + return cmd + + +def launch_dist_job_default(cmd: str) -> subprocess.CompletedProcess: + """ + Launch the distributed job using the command. + + Args: + cmd: the command to launch the distributed job. + + Returns: + The subprocess.CompletedProcess object that contains the information of the launched job. + """ + logging.info(f"Running command: {cmd}") + return subprocess.run(cmd, check=True, capture_output=True) From 178132c73d709124890326be90c3877f7185eb9d Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Wed, 21 Jun 2023 02:42:18 +0000 Subject: [PATCH 02/18] fix undefined name cmd Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/utils/dist.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/monai/utils/dist.py b/monai/utils/dist.py index e7f642dfcf..a2a43f0d71 100644 --- a/monai/utils/dist.py +++ b/monai/utils/dist.py @@ -263,14 +263,14 @@ def prepare_bcprun(script, cmd_prefix: str = None, **kwargs: Any) -> str: cmd_prefix += " " bcprun_cmd += cmd_prefix - cmd += f"{script}" + bcprun_cmd += f"{script}" for k, v in hyperparam.items(): if isinstance(v, dict): raise ValueError("Nested dict is not supported.") elif isinstance(v, list): raise ValueError("List is not supported.") - cmd += f" --{k} {str(v)}" - return cmd + bcprun_cmd += f" --{k} {str(v)}" + return bcprun_cmd def launch_dist_job_default(cmd: str) -> subprocess.CompletedProcess: From 7edf1e101dc92036ce655eeba1ba5104138d487b Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Wed, 21 Jun 2023 03:09:18 +0000 Subject: [PATCH 03/18] fix mypy Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/utils/dist.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monai/utils/dist.py b/monai/utils/dist.py index a2a43f0d71..9052398e59 100644 --- a/monai/utils/dist.py +++ b/monai/utils/dist.py @@ -210,7 +210,7 @@ def filter(self, *_args): return self.filter_fn(self.rank) -def prepare_dist_job_default(script, cmd_prefix: str = None, **kwargs: Any) -> str: +def prepare_dist_job_default(script: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: """ Prepare the command for distributed job submission. @@ -238,7 +238,7 @@ def prepare_dist_job_default(script, cmd_prefix: str = None, **kwargs: Any) -> s return cmd -def prepare_bcprun(script, cmd_prefix: str = None, **kwargs: Any) -> str: +def prepare_bcprun(script: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: """ Prepare the command for distributed job submission using bcprun. From 3ef23006877f18b34c6534a0bc72bdeaadf520c5 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sat, 24 Jun 2023 08:25:21 +0000 Subject: [PATCH 04/18] refractor Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/auto3dseg/utils.py | 136 ++++++++++++++++++++++++++++++++++++++- monai/utils/__init__.py | 1 + monai/utils/dist.py | 79 ----------------------- monai/utils/enums.py | 11 ++++ 4 files changed, 147 insertions(+), 80 deletions(-) diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 2f5e1b26eb..5af3a4ce1d 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -15,6 +15,7 @@ import os import pickle import sys +import subprocess from copy import deepcopy from numbers import Number from typing import Any, cast @@ -28,7 +29,7 @@ from monai.config import PathLike from monai.data.meta_tensor import MetaTensor from monai.transforms import CropForeground, ToCupy -from monai.utils import min_version, optional_import +from monai.utils import min_version, optional_import, AlgoLaunchKeys, look_up_option __all__ = [ "get_foreground_image", @@ -372,3 +373,136 @@ def algo_from_pickle(pkl_filename: str, template_path: PathLike | None = None, * algo_meta_data.update({k: v}) return algo, algo_meta_data + +def list_to_python_fire_arg_str(args: list) -> str: + """ + Convert a list of arguments to a string that can be used in python-fire. + + Args: + args: the list of arguments. + + Returns: + the string that can be used in python-fire. + """ + args_str = ",".join(str(args)) + return f"'{args_str}'" + +def check_and_set_required_args(params: dict, required_args: list) -> str: + """ + """ + cmd_mod = "" + for arg in required_args: + val = params.pop(arg, None) + if val is None: + raise ValueError(f"The {arg} should be specified in the kwargs.") + cmd_mod += f" --{arg} {val}" + + return cmd_mod + +def check_and_set_optional_args(params: dict) -> str: + """ + """ + cmd_mod_opt = "" + for k, v in params.items(): + if isinstance(v, dict): + raise ValueError("Nested dict is not supported.") + elif isinstance(v, list): + v = list_to_python_fire_arg_str(v) + cmd_mod_opt += f" --{k} {str(v)}" + return cmd_mod_opt + + +def prepare_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: + """ + Prepare the command for job to run the script with the given arguments. + + Args: + cmd: the command or script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python", "python -m", "python3", "/opt/conda/bin/python3.8 ". + kwargs: the keyword arguments to be passed to the script. + + Returns: + the command to run the distributed job. + + Examples: + To prepare a subprocess command + "python train.py run -k --config 'a,b'", the function can be called as + - prepare_default("train.py run -k", config=['a','b']) + - prepare_default("train.py run -k --config 'a,b'") + + """ + params = kwargs.copy() + + if not cmd_prefix.endswith(" "): + cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended + + return cmd_prefix + cmd + check_and_set_optional_args(params) + +def prepare_torchrun(cmd: str, **kwargs: Any) -> str: + """ + Prepare the command for multi-gpu/multi-node job execution using torchrun. + + Args: + cmd: the command or script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "torchrun ", "python -m torch.distributed.launch ". + kwargs: the keyword arguments to be passed to the script. + + Returns: + the command to run the multi-gpu/multi-node job. + + Examples: + To prepare a subprocess command + + "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", the function can be called as + - prepare_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) + - prepare_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) + """ + params = kwargs.copy() + + torchrun_cmd = "torchrun " + check_and_set_required_args(params, ["nproc_per_node", "nnodes"]) + if not torchrun_cmd.endswith(" "): + torchrun_cmd += " " # ensure a space after the command prefix so that the script can be appended + + return torchrun_cmd + cmd + check_and_set_optional_args(params) + + +def prepare_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: + """ + Prepare the command for distributed job submission using bcprun. + + Args: + script: the script to run in the distributed job. + cmd_prefix: the command prefix to run the script, e.g., "python". + kwargs: the keyword arguments to be passed to the script. + + Returns: + The command to run the script in the distributed job. + + Examples: + To prepare a subprocess command + "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", the function can be called as + - prepare_bcprun("train.py run -k", config=['a','b'], n=2, p=8) + - prepare_bcprun("train.py run -k --config 'a,b'", n=2, p=8) + """ + params = kwargs.copy() + + if not cmd_prefix.endswith(" "): + cmd_prefix += " " + + bcprun_cmd = "bcprun " + check_and_set_required_args(params, ["n", "p"]) + " -c " + + return bcprun_cmd + cmd_prefix + cmd + check_and_set_optional_args(params) + + +def launch_dist_job_default(cmd: str) -> subprocess.CompletedProcess: + """ + Launch the distributed job using the command. + + Args: + cmd: the command to launch the distributed job. + + Returns: + The subprocess.CompletedProcess object that contains the information of the launched job. + """ + logging.info(f"Running command in subprocess: {cmd}") + return subprocess.run(cmd, check=True, capture_output=True) diff --git a/monai/utils/__init__.py b/monai/utils/__init__.py index 4a8e439f0a..ea61300634 100644 --- a/monai/utils/__init__.py +++ b/monai/utils/__init__.py @@ -60,6 +60,7 @@ UpsampleMode, Weight, WSIPatchKeys, + AlgoLaunchKeys, ) from .jupyter_utils import StatusMembers, ThreadContainer from .misc import ( diff --git a/monai/utils/dist.py b/monai/utils/dist.py index 9052398e59..123226d39c 100644 --- a/monai/utils/dist.py +++ b/monai/utils/dist.py @@ -11,8 +11,6 @@ from __future__ import annotations -import logging -import subprocess import sys import warnings from collections.abc import Callable @@ -208,80 +206,3 @@ def __init__(self, rank: int | None = None, filter_fn: Callable = lambda rank: r def filter(self, *_args): return self.filter_fn(self.rank) - - -def prepare_dist_job_default(script: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: - """ - Prepare the command for distributed job submission. - - Args: - script: the script to run in the distributed job. - cmd_prefix: the command prefix to run the script, e.g., "python" or "torchrun". - kwargs: the keyword arguments to be passed to the script. - - Returns: - the command to run the distributed job. - """ - cmd = cmd_prefix - if cmd is not None and not cmd.endswith(" "): - cmd += " " - cmd = "torchrun " if cmd is None else cmd - if "num_nodes" not in kwargs or "nproc_per_node" not in kwargs: - raise ValueError("num_nodes and nproc_per_node must be specified.") - cmd += f"{script}" - for k, v in kwargs.items(): - if isinstance(v, dict): - raise ValueError("Nested dict is not supported.") - elif isinstance(v, list): - raise ValueError("List is not supported.") - cmd += f" --{k} {str(v)}" - return cmd - - -def prepare_bcprun(script: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: - """ - Prepare the command for distributed job submission using bcprun. - - Args: - script: the script to run in the distributed job. - cmd_prefix: the command prefix to run the script, e.g., "python". - kwargs: the keyword arguments to be passed to the script. - - Returns: - The command to run the script in the distributed job. - """ - bcprun_cmd = "bcprun " - hyperparam = kwargs.copy() - num_nodes = hyperparam.pop("n", None) - n_devices = hyperparam.pop("p", None) - if num_nodes is None or n_devices is None: - raise ValueError("num_nodes(n) and n_devices(p) must be specified.") - bcprun_cmd += f"-n {num_nodes} -p {n_devices} " - - cmd_prefix = "python " if cmd_prefix is None else cmd_prefix - if not cmd_prefix.endswith(" "): - cmd_prefix += " " - - bcprun_cmd += cmd_prefix - bcprun_cmd += f"{script}" - for k, v in hyperparam.items(): - if isinstance(v, dict): - raise ValueError("Nested dict is not supported.") - elif isinstance(v, list): - raise ValueError("List is not supported.") - bcprun_cmd += f" --{k} {str(v)}" - return bcprun_cmd - - -def launch_dist_job_default(cmd: str) -> subprocess.CompletedProcess: - """ - Launch the distributed job using the command. - - Args: - cmd: the command to launch the distributed job. - - Returns: - The subprocess.CompletedProcess object that contains the information of the launched job. - """ - logging.info(f"Running command: {cmd}") - return subprocess.run(cmd, check=True, capture_output=True) diff --git a/monai/utils/enums.py b/monai/utils/enums.py index 572cd9293d..e6f7ea26cd 100644 --- a/monai/utils/enums.py +++ b/monai/utils/enums.py @@ -60,6 +60,7 @@ "BundleProperty", "BundlePropertyConfig", "AlgoKeys", + "AlgoLaunchKeys", ] @@ -692,3 +693,13 @@ class AlgoKeys(StrEnum): ALGO = "algo_instance" IS_TRAINED = "is_trained" SCORE = "best_metric" + +class AlgoLaunchKeys(StrEnum): + """ + Multi-node training start methods. + `DEFAULT` is the default method. + `FILE` is the method to start multi-node training from a python file. + `FUNCTION` is the method to start multi-node training from a python function. + """ + + NGC_BCP = "bcprun" From ad37c926b30ce58524ea17f3ed4a161927cf7823 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sat, 24 Jun 2023 16:29:00 +0000 Subject: [PATCH 05/18] refractor Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 112 +++++++++++++++-------------- monai/auto3dseg/utils.py | 59 ++++++--------- 2 files changed, 81 insertions(+), 90 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 8bd5cdd4f2..570fb25648 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -33,8 +33,9 @@ from monai.auto3dseg.utils import algo_to_pickle from monai.bundle.config_parser import ConfigParser from monai.config import PathLike -from monai.utils import ensure_tuple, run_cmd -from monai.utils.enums import AlgoKeys +from monai.utils import ensure_tuple, run_cmd, look_up_option +from monai.utils.enums import AlgoKeys, AlgoLaunchKeys +from monai.utils.dist import _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun logger = get_logger(module_name=__name__) ALGO_HASH = os.environ.get("MONAI_ALGO_HASH", "b5c01d4") @@ -87,7 +88,7 @@ def __init__(self, template_path: PathLike): "CUDA_VISIBLE_DEVICES": ",".join([str(x) for x in range(torch.cuda.device_count())]), "n_devices": int(torch.cuda.device_count()), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), - "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), + "MN_START_METHOD": os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP), "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore } @@ -175,36 +176,43 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: train_py = os.path.join(self.output_path, "scripts", "train.py") config_dir = os.path.join(self.output_path, "configs") + config_files = [] if os.path.isdir(config_dir): - base_cmd = "" for file in sorted(os.listdir(config_dir)): - if not (file.endswith("yaml") or file.endswith("json")): - continue - base_cmd += f"{train_py} run --config_file=" if len(base_cmd) == 0 else "," - # Python Fire may be confused by single-quoted WindowsPath - config_yaml = Path(os.path.join(config_dir, file)).as_posix() - base_cmd += f"'{config_yaml}'" - cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore - # make sure cmd end with a space - if cmd is not None and not cmd.endswith(" "): - cmd += " " - if (int(self.device_setting["NUM_NODES"]) > 1 and self.device_setting["MN_START_METHOD"] == "bcprun") or ( - int(self.device_setting["NUM_NODES"]) <= 1 and int(self.device_setting["n_devices"]) <= 1 - ): - cmd = "python " if cmd is None else cmd - elif int(self.device_setting["NUM_NODES"]) > 1: - raise NotImplementedError( - f"{self.device_setting['MN_START_METHOD']} is not supported yet." - "Try modify BundleAlgo._create_cmd for your cluster." - ) + if (file.endswith("yaml") or file.endswith("json")): + # Python Fire may be confused by single-quoted WindowsPath + config_files.append(Path(os.path.join(config_dir, file)).as_posix()) + + if int(self.device_setting["NUM_NODES"]) > 1: + # multi-node command + # only bcprun is supported for now + try: + look_up_option(self.device_setting["MN_START_METHOD"], [AlgoLaunchKeys.NGC_BCP]) + except ValueError as err: + raise NotImplementedError( + f"{self.device_setting['MN_START_METHOD']} is not supported yet." + "Try modify BundleAlgo._create_cmd for your cluster." + ) from err + + return _create_bcprun( + f"{train_py} run", + cmd_prefix=self.device_setting.cmd_prefix, + config_file=config_files + **params, + ), "" + elif int(self.device_setting["n_devices"]) > 1: + return _create_torchrun( + f"{train_py} run", + config_file=config_files + **params, + ), "" else: - if cmd is None: - cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} " - cmd += base_cmd - if params and isinstance(params, Mapping): - for k, v in params.items(): - cmd += f" --{k}={v}" - return cmd, "" + return _create_default( + f"{train_py} run", + cmd_prefix=self.device_setting.cmd_prefix, + config_file=config_files, + **params + ) def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess: """ @@ -217,33 +225,29 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) if int(self.device_setting["NUM_NODES"]) > 1: - if self.device_setting["MN_START_METHOD"] == "bcprun": - cmd_list = [ - "bcprun", - "-n", - str(self.device_setting["NUM_NODES"]), - "-p", - str(self.device_setting["n_devices"]), - "-c", - cmd, - ] - else: + try: + look_up_option(self.device_setting["MN_START_METHOD"], [AlgoLaunchKeys.NGC_BCP]) + except ValueError as err: raise NotImplementedError( - f"{self.device_setting['MN_START_METHOD']} is not supported yet. " + f"{self.device_setting['MN_START_METHOD']} is not supported yet." "Try modify BundleAlgo._run_cmd for your cluster." - ) - else: - cmd_list = cmd.split() - - _idx = 0 - for _idx, c in enumerate(cmd_list): - if "=" not in c: # remove variable assignments before the command such as "OMP_NUM_THREADS=1" - break - cmd_list = cmd_list[_idx:] - - logger.info(f"Launching: {' '.join(cmd_list)}") + ) from err - return run_cmd(cmd_list, env=ps_environ, check=True) + return _run_cmd_bcprun( + cmd, + n=self.device_setting["NUM_NODES"], + p=self.device_setting["n_devices"], + ) + elif int(self.device_setting["n_devices"]) > 1: + return _run_cmd_torchrun( + cmd, + nnodes=1, + nproc_per_node=self.device_setting["n_devices"], + env=ps_environ, + check=True + ) + else: + return run_cmd(cmd.split(), env=ps_environ, check=True) def train( self, train_params: None | dict = None, device_setting: None | dict = None diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 5af3a4ce1d..1c58f79e01 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -29,7 +29,7 @@ from monai.config import PathLike from monai.data.meta_tensor import MetaTensor from monai.transforms import CropForeground, ToCupy -from monai.utils import min_version, optional_import, AlgoLaunchKeys, look_up_option +from monai.utils import min_version, optional_import, run_cmd __all__ = [ "get_foreground_image", @@ -412,7 +412,7 @@ def check_and_set_optional_args(params: dict) -> str: return cmd_mod_opt -def prepare_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ Prepare the command for job to run the script with the given arguments. @@ -427,8 +427,8 @@ def prepare_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: Examples: To prepare a subprocess command "python train.py run -k --config 'a,b'", the function can be called as - - prepare_default("train.py run -k", config=['a','b']) - - prepare_default("train.py run -k --config 'a,b'") + - _create_default("train.py run -k", config=['a','b']) + - _create_default("train.py run -k --config 'a,b'") """ params = kwargs.copy() @@ -438,7 +438,7 @@ def prepare_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: return cmd_prefix + cmd + check_and_set_optional_args(params) -def prepare_torchrun(cmd: str, **kwargs: Any) -> str: +def _create_torchrun(cmd: str, **kwargs: Any) -> str: """ Prepare the command for multi-gpu/multi-node job execution using torchrun. @@ -454,19 +454,14 @@ def prepare_torchrun(cmd: str, **kwargs: Any) -> str: To prepare a subprocess command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", the function can be called as - - prepare_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) - - prepare_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) + - _create_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) + - _create_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) """ params = kwargs.copy() - - torchrun_cmd = "torchrun " + check_and_set_required_args(params, ["nproc_per_node", "nnodes"]) - if not torchrun_cmd.endswith(" "): - torchrun_cmd += " " # ensure a space after the command prefix so that the script can be appended - - return torchrun_cmd + cmd + check_and_set_optional_args(params) + return cmd + check_and_set_optional_args(params) -def prepare_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ Prepare the command for distributed job submission using bcprun. @@ -479,30 +474,22 @@ def prepare_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: The command to run the script in the distributed job. Examples: - To prepare a subprocess command + To prepare a subprocess command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", the function can be called as - - prepare_bcprun("train.py run -k", config=['a','b'], n=2, p=8) - - prepare_bcprun("train.py run -k --config 'a,b'", n=2, p=8) + - _create_bcprun("train.py run -k", config=['a','b'], n=2, p=8) + - _create_bcprun("train.py run -k --config 'a,b'", n=2, p=8) """ - params = kwargs.copy() - - if not cmd_prefix.endswith(" "): - cmd_prefix += " " - - bcprun_cmd = "bcprun " + check_and_set_required_args(params, ["n", "p"]) + " -c " - - return bcprun_cmd + cmd_prefix + cmd + check_and_set_optional_args(params) + return _create_default(cmd, cmd_prefix, **kwargs) -def launch_dist_job_default(cmd: str) -> subprocess.CompletedProcess: - """ - Launch the distributed job using the command. - - Args: - cmd: the command to launch the distributed job. +def _run_cmd_torchrun(cmd: str, **kwargs): + params = kwargs.copy() + torchrun_args = check_and_set_required_args(params, ["nnodes", "nproc_per_node"]) + cmd_list = ["torchrun"] + torchrun_args.split(" ") + cmd.split(" ") + return run_cmd(cmd_list, **kwargs) - Returns: - The subprocess.CompletedProcess object that contains the information of the launched job. - """ - logging.info(f"Running command in subprocess: {cmd}") - return subprocess.run(cmd, check=True, capture_output=True) +def _run_cmd_bcprun(cmd: str, **kwargs): + params = kwargs.copy() + bcprun_args = check_and_set_required_args(params, ["n", "p"]) + cmd_list = ["bcprun"] + bcprun_args.split(" ") + ["-c"] + cmd + return run_cmd(cmd_list, **kwargs) From 1ff5bca1d00b6feb4fd7c2b01d630053b9eba17a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 24 Jun 2023 16:29:28 +0000 Subject: [PATCH 06/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- monai/apps/auto3dseg/bundle_gen.py | 1 - monai/auto3dseg/utils.py | 21 ++++++++++----------- monai/utils/dist.py | 1 - 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 570fb25648..5cffa96a35 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -18,7 +18,6 @@ import sys import time import warnings -from collections.abc import Mapping from copy import deepcopy from pathlib import Path from tempfile import TemporaryDirectory diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 1c58f79e01..c05a611639 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -15,7 +15,6 @@ import os import pickle import sys -import subprocess from copy import deepcopy from numbers import Number from typing import Any, cast @@ -396,7 +395,7 @@ def check_and_set_required_args(params: dict, required_args: list) -> str: if val is None: raise ValueError(f"The {arg} should be specified in the kwargs.") cmd_mod += f" --{arg} {val}" - + return cmd_mod def check_and_set_optional_args(params: dict) -> str: @@ -415,7 +414,7 @@ def check_and_set_optional_args(params: dict) -> str: def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ Prepare the command for job to run the script with the given arguments. - + Args: cmd: the command or script to run in the distributed job. cmd_prefix: the command prefix to run the script, e.g., "python", "python -m", "python3", "/opt/conda/bin/python3.8 ". @@ -423,19 +422,19 @@ def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: Returns: the command to run the distributed job. - + Examples: To prepare a subprocess command "python train.py run -k --config 'a,b'", the function can be called as - _create_default("train.py run -k", config=['a','b']) - _create_default("train.py run -k --config 'a,b'") - + """ params = kwargs.copy() if not cmd_prefix.endswith(" "): cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended - + return cmd_prefix + cmd + check_and_set_optional_args(params) def _create_torchrun(cmd: str, **kwargs: Any) -> str: @@ -446,20 +445,20 @@ def _create_torchrun(cmd: str, **kwargs: Any) -> str: cmd: the command or script to run in the distributed job. cmd_prefix: the command prefix to run the script, e.g., "torchrun ", "python -m torch.distributed.launch ". kwargs: the keyword arguments to be passed to the script. - + Returns: the command to run the multi-gpu/multi-node job. - + Examples: To prepare a subprocess command - + "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", the function can be called as - _create_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) - _create_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) """ params = kwargs.copy() return cmd + check_and_set_optional_args(params) - + def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ @@ -472,7 +471,7 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: Returns: The command to run the script in the distributed job. - + Examples: To prepare a subprocess command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", the function can be called as diff --git a/monai/utils/dist.py b/monai/utils/dist.py index 123226d39c..47e6de4a98 100644 --- a/monai/utils/dist.py +++ b/monai/utils/dist.py @@ -15,7 +15,6 @@ import warnings from collections.abc import Callable from logging import Filter -from typing import Any if sys.version_info >= (3, 8): from typing import Literal From c0215f95f680e804a1d16ef5223ed3c7ca897359 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 11:53:38 +0000 Subject: [PATCH 07/18] fixes Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 5 ++--- monai/auto3dseg/utils.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 5cffa96a35..7360226cb2 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -29,12 +29,11 @@ from monai.apps import download_and_extract from monai.apps.utils import get_logger from monai.auto3dseg.algo_gen import Algo, AlgoGen -from monai.auto3dseg.utils import algo_to_pickle +from monai.auto3dseg.utils import algo_to_pickle, _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun from monai.bundle.config_parser import ConfigParser from monai.config import PathLike from monai.utils import ensure_tuple, run_cmd, look_up_option from monai.utils.enums import AlgoKeys, AlgoLaunchKeys -from monai.utils.dist import _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun logger = get_logger(module_name=__name__) ALGO_HASH = os.environ.get("MONAI_ALGO_HASH", "b5c01d4") @@ -211,7 +210,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: cmd_prefix=self.device_setting.cmd_prefix, config_file=config_files, **params - ) + ), "" def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess: """ diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index c05a611639..fb34110815 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -450,9 +450,8 @@ def _create_torchrun(cmd: str, **kwargs: Any) -> str: the command to run the multi-gpu/multi-node job. Examples: - To prepare a subprocess command - - "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", the function can be called as + To prepare a subprocess command to append to torchrun command (torchrun --nnodes=1 --nproc_per_node=8) + "train.py run -k --config 'a,b'", the function can be called as - _create_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) - _create_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) """ From 745599e9abe1b88fb87f1da722aad566114c0c13 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 14:47:44 +0000 Subject: [PATCH 08/18] fix ensemble test Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 9 +++++-- monai/auto3dseg/utils.py | 40 ++++++++++++++++-------------- tmp.py | 20 +++++++++++++++ 3 files changed, 48 insertions(+), 21 deletions(-) create mode 100644 tmp.py diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 7360226cb2..1b71581861 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -25,6 +25,7 @@ from urllib.parse import urlparse import torch +import re from monai.apps import download_and_extract from monai.apps.utils import get_logger @@ -198,10 +199,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: config_file=config_files **params, ), "" - elif int(self.device_setting["n_devices"]) > 1: + elif int(self.device_setting["n_devices"]) > 1: return _create_torchrun( f"{train_py} run", - config_file=config_files + config_file=config_files, **params, ), "" else: @@ -222,6 +223,10 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) + + # delete pattern "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces + cmd = re.sub(r"^\s*\w+=.*?\s+", "", cmd) + if int(self.device_setting["NUM_NODES"]) > 1: try: look_up_option(self.device_setting["MN_START_METHOD"], [AlgoLaunchKeys.NGC_BCP]) diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index fb34110815..b7c74d5d4b 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -383,21 +383,9 @@ def list_to_python_fire_arg_str(args: list) -> str: Returns: the string that can be used in python-fire. """ - args_str = ",".join(str(args)) + args_str = ",".join([str(arg) for arg in args]) return f"'{args_str}'" -def check_and_set_required_args(params: dict, required_args: list) -> str: - """ - """ - cmd_mod = "" - for arg in required_args: - val = params.pop(arg, None) - if val is None: - raise ValueError(f"The {arg} should be specified in the kwargs.") - cmd_mod += f" --{arg} {val}" - - return cmd_mod - def check_and_set_optional_args(params: dict) -> str: """ """ @@ -482,12 +470,26 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: def _run_cmd_torchrun(cmd: str, **kwargs): params = kwargs.copy() - torchrun_args = check_and_set_required_args(params, ["nnodes", "nproc_per_node"]) - cmd_list = ["torchrun"] + torchrun_args.split(" ") + cmd.split(" ") - return run_cmd(cmd_list, **kwargs) + + cmd_list = cmd.split(" ") + + # append arguments to the command list + torchrun_list = ["torchrun"] + required_args = ["nnodes", "nproc_per_node"] + for arg in required_args: + if arg not in params: + raise ValueError(f"Missing required argument {arg} for torchrun.") + torchrun_list += [f"--{arg}", str(params.pop(arg))] + torchrun_list += cmd_list + return run_cmd(torchrun_list, **params) def _run_cmd_bcprun(cmd: str, **kwargs): params = kwargs.copy() - bcprun_args = check_and_set_required_args(params, ["n", "p"]) - cmd_list = ["bcprun"] + bcprun_args.split(" ") + ["-c"] + cmd - return run_cmd(cmd_list, **kwargs) + cmd_list = ["bcprun"] + required_args = ["n", "p"] + for arg in required_args: + if arg not in params: + raise ValueError(f"Missing required argument {arg} for bcprun.") + cmd_list += [f"-{arg}", str(params.pop(arg))] + cmd_list += ["-c"] + cmd + return run_cmd(cmd_list, **params) diff --git a/tmp.py b/tmp.py new file mode 100644 index 0000000000..18ec023d63 --- /dev/null +++ b/tmp.py @@ -0,0 +1,20 @@ +import re + +def remove_env_var_pattern(input_string): + # Pattern to match "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces + pattern = r"^\s*\w+=.*?\s+" + + # re.sub() replaces the matched pattern with an empty string + result = re.sub(r"^\s*\w+=.*?\s+", "", input_string) + + return result + +s = "OMP_NUM_THREAD=1 CUDA_VISIBLE_DEVICES=0,1 python train.py -k --abc=d --num 2" + +# Remove "OMP_NUM_THREAD=1 " +s = remove_env_var_pattern(s) + +# Remove "CUDA_VISIBLE_DEVICES=0,1 " +s = remove_env_var_pattern(s) + +print(s) # Should print: "python train.py -k --abc=d --num 2" \ No newline at end of file From 6b94a9883bc9c11caa17c3c315ab5e461698be4a Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 14:48:06 +0000 Subject: [PATCH 09/18] fix wrong file Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- tmp.py | 20 -------------------- 1 file changed, 20 deletions(-) delete mode 100644 tmp.py diff --git a/tmp.py b/tmp.py deleted file mode 100644 index 18ec023d63..0000000000 --- a/tmp.py +++ /dev/null @@ -1,20 +0,0 @@ -import re - -def remove_env_var_pattern(input_string): - # Pattern to match "VAR=VALUE" at the beginning of the string, with optional leading/trailing whitespaces - pattern = r"^\s*\w+=.*?\s+" - - # re.sub() replaces the matched pattern with an empty string - result = re.sub(r"^\s*\w+=.*?\s+", "", input_string) - - return result - -s = "OMP_NUM_THREAD=1 CUDA_VISIBLE_DEVICES=0,1 python train.py -k --abc=d --num 2" - -# Remove "OMP_NUM_THREAD=1 " -s = remove_env_var_pattern(s) - -# Remove "CUDA_VISIBLE_DEVICES=0,1 " -s = remove_env_var_pattern(s) - -print(s) # Should print: "python train.py -k --abc=d --num 2" \ No newline at end of file From 906a6e83ee1c8457e4fb6f0711963d91fd2e44b5 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 14:59:09 +0000 Subject: [PATCH 10/18] Update ensemble Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/auto_runner.py | 4 +-- monai/apps/auto3dseg/ensemble_builder.py | 37 +++++++++--------------- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/monai/apps/auto3dseg/auto_runner.py b/monai/apps/auto3dseg/auto_runner.py index 031f8358d1..6c8f61ecc4 100644 --- a/monai/apps/auto3dseg/auto_runner.py +++ b/monai/apps/auto3dseg/auto_runner.py @@ -29,7 +29,7 @@ from monai.auto3dseg.utils import algo_to_pickle from monai.bundle import ConfigParser from monai.transforms import SaveImage -from monai.utils import AlgoKeys, has_option, look_up_option, optional_import +from monai.utils import AlgoKeys, has_option, look_up_option, optional_import, AlgoLaunchKeys from monai.utils.misc import check_kwargs_exist_in_class_init, run_cmd logger = get_logger(module_name=__name__) @@ -521,7 +521,7 @@ def set_device_info( self.device_setting["NUM_NODES"] = num_nodes if mn_start_method is None: - mn_start_method = os.environ.get("MN_START_METHOD", "bcprun") + mn_start_method = os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP) self.device_setting["MN_START_METHOD"] = mn_start_method if cmd_prefix is None: diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index afb15d5d3e..cc3897983a 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -26,12 +26,12 @@ from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history from monai.apps.utils import get_logger from monai.auto3dseg import concat_val_to_np -from monai.auto3dseg.utils import datafold_read +from monai.auto3dseg.utils import datafold_read, _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun from monai.bundle import ConfigParser from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble from monai.utils import RankFilter, deprecated_arg -from monai.utils.enums import AlgoKeys +from monai.utils.enums import AlgoKeys, AlgoLaunchKeys from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd from monai.utils.module import look_up_option, optional_import @@ -446,7 +446,7 @@ def __init__( "CUDA_VISIBLE_DEVICES": ",".join([str(x) for x in range(torch.cuda.device_count())]), "n_devices": torch.cuda.device_count(), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), - "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), + "MN_START_METHOD": os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP), "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore } @@ -642,34 +642,23 @@ def _create_cmd(self) -> None: # define env for subprocess ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) - cmd: str | None = self.device_setting["CMD_PREFIX"] # type: ignore - if cmd is not None and not str(cmd).endswith(" "): - cmd += " " if int(self.device_setting["NUM_NODES"]) > 1: - if self.device_setting["MN_START_METHOD"] != "bcprun": + if self.device_setting["MN_START_METHOD"] != AlgoLaunchKeys.NGC_BCP: raise NotImplementedError( f"{self.device_setting['MN_START_METHOD']} is not supported yet. " "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = "python " if cmd is None else cmd - cmd = f"{cmd} -m {base_cmd}" - cmd_list = [ - "bcprun", - "-n", - str(self.device_setting["NUM_NODES"]), - "-p", - str(self.device_setting["n_devices"]), - "-c", - cmd, - ] + cmd = _create_bcprun(base_cmd, cmd_prefix=self.device_setting.cmd_prefix) + _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!") - if cmd is None: - cmd = f"torchrun --nnodes={1:d} --nproc_per_node={self.device_setting['n_devices']:d} " - cmd = f"{cmd} -m {base_cmd}" - cmd_list = cmd.split() - - run_cmd(cmd_list, env=ps_environ, check=True) + cmd = _create_torchrun(base_cmd) + _run_cmd_torchrun(cmd, + nnodes=1, + nproc_per_node=self.device_setting["n_devices"], + env=ps_environ, + check=True + ) return From 82a2296d41bd1ec86374e6a372af9d22dee4ad65 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 15:38:03 +0000 Subject: [PATCH 11/18] fix integration Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/auto_runner.py | 4 ++-- monai/apps/auto3dseg/bundle_gen.py | 12 ++++++------ monai/apps/auto3dseg/ensemble_builder.py | 10 +++++----- monai/auto3dseg/utils.py | 2 +- monai/utils/__init__.py | 1 - monai/utils/enums.py | 11 ----------- 6 files changed, 14 insertions(+), 26 deletions(-) diff --git a/monai/apps/auto3dseg/auto_runner.py b/monai/apps/auto3dseg/auto_runner.py index 6c8f61ecc4..031f8358d1 100644 --- a/monai/apps/auto3dseg/auto_runner.py +++ b/monai/apps/auto3dseg/auto_runner.py @@ -29,7 +29,7 @@ from monai.auto3dseg.utils import algo_to_pickle from monai.bundle import ConfigParser from monai.transforms import SaveImage -from monai.utils import AlgoKeys, has_option, look_up_option, optional_import, AlgoLaunchKeys +from monai.utils import AlgoKeys, has_option, look_up_option, optional_import from monai.utils.misc import check_kwargs_exist_in_class_init, run_cmd logger = get_logger(module_name=__name__) @@ -521,7 +521,7 @@ def set_device_info( self.device_setting["NUM_NODES"] = num_nodes if mn_start_method is None: - mn_start_method = os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP) + mn_start_method = os.environ.get("MN_START_METHOD", "bcprun") self.device_setting["MN_START_METHOD"] = mn_start_method if cmd_prefix is None: diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 1b71581861..f6d4822778 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -34,7 +34,7 @@ from monai.bundle.config_parser import ConfigParser from monai.config import PathLike from monai.utils import ensure_tuple, run_cmd, look_up_option -from monai.utils.enums import AlgoKeys, AlgoLaunchKeys +from monai.utils.enums import AlgoKeys logger = get_logger(module_name=__name__) ALGO_HASH = os.environ.get("MONAI_ALGO_HASH", "b5c01d4") @@ -87,7 +87,7 @@ def __init__(self, template_path: PathLike): "CUDA_VISIBLE_DEVICES": ",".join([str(x) for x in range(torch.cuda.device_count())]), "n_devices": int(torch.cuda.device_count()), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), - "MN_START_METHOD": os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP), + "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore } @@ -186,7 +186,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: # multi-node command # only bcprun is supported for now try: - look_up_option(self.device_setting["MN_START_METHOD"], [AlgoLaunchKeys.NGC_BCP]) + look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"]) except ValueError as err: raise NotImplementedError( f"{self.device_setting['MN_START_METHOD']} is not supported yet." @@ -195,7 +195,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return _create_bcprun( f"{train_py} run", - cmd_prefix=self.device_setting.cmd_prefix, + cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files **params, ), "" @@ -208,7 +208,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: else: return _create_default( f"{train_py} run", - cmd_prefix=self.device_setting.cmd_prefix, + cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params ), "" @@ -229,7 +229,7 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc if int(self.device_setting["NUM_NODES"]) > 1: try: - look_up_option(self.device_setting["MN_START_METHOD"], [AlgoLaunchKeys.NGC_BCP]) + look_up_option(self.device_setting["MN_START_METHOD"], ["bcprun"]) except ValueError as err: raise NotImplementedError( f"{self.device_setting['MN_START_METHOD']} is not supported yet." diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index cc3897983a..dcf0b3a979 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -31,7 +31,7 @@ from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble from monai.utils import RankFilter, deprecated_arg -from monai.utils.enums import AlgoKeys, AlgoLaunchKeys +from monai.utils.enums import AlgoKeys from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd from monai.utils.module import look_up_option, optional_import @@ -446,7 +446,7 @@ def __init__( "CUDA_VISIBLE_DEVICES": ",".join([str(x) for x in range(torch.cuda.device_count())]), "n_devices": torch.cuda.device_count(), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), - "MN_START_METHOD": os.environ.get("MN_START_METHOD", AlgoLaunchKeys.NGC_BCP), + "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore } @@ -643,18 +643,18 @@ def _create_cmd(self) -> None: ps_environ = os.environ.copy() ps_environ["CUDA_VISIBLE_DEVICES"] = str(self.device_setting["CUDA_VISIBLE_DEVICES"]) if int(self.device_setting["NUM_NODES"]) > 1: - if self.device_setting["MN_START_METHOD"] != AlgoLaunchKeys.NGC_BCP: + if self.device_setting["MN_START_METHOD"] != "bcprun": raise NotImplementedError( f"{self.device_setting['MN_START_METHOD']} is not supported yet. " "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = _create_bcprun(base_cmd, cmd_prefix=self.device_setting.cmd_prefix) + cmd = _create_bcprun("-m " + base_cmd, cmd_prefix=self.device_setting["CMD_PREFIX"]) _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!") - cmd = _create_torchrun(base_cmd) + cmd = _create_torchrun("-m " + base_cmd) _run_cmd_torchrun(cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index b7c74d5d4b..5d05d5d4e2 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -471,7 +471,7 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: def _run_cmd_torchrun(cmd: str, **kwargs): params = kwargs.copy() - cmd_list = cmd.split(" ") + cmd_list = cmd.split() # append arguments to the command list torchrun_list = ["torchrun"] diff --git a/monai/utils/__init__.py b/monai/utils/__init__.py index b052be19e5..5fa62ed36b 100644 --- a/monai/utils/__init__.py +++ b/monai/utils/__init__.py @@ -60,7 +60,6 @@ UpsampleMode, Weight, WSIPatchKeys, - AlgoLaunchKeys, ) from .jupyter_utils import StatusMembers, ThreadContainer from .misc import ( diff --git a/monai/utils/enums.py b/monai/utils/enums.py index e6f7ea26cd..572cd9293d 100644 --- a/monai/utils/enums.py +++ b/monai/utils/enums.py @@ -60,7 +60,6 @@ "BundleProperty", "BundlePropertyConfig", "AlgoKeys", - "AlgoLaunchKeys", ] @@ -693,13 +692,3 @@ class AlgoKeys(StrEnum): ALGO = "algo_instance" IS_TRAINED = "is_trained" SCORE = "best_metric" - -class AlgoLaunchKeys(StrEnum): - """ - Multi-node training start methods. - `DEFAULT` is the default method. - `FILE` is the method to start multi-node training from a python file. - `FUNCTION` is the method to start multi-node training from a python function. - """ - - NGC_BCP = "bcprun" From 8214299ed7cb26c9c0d6bc686324ca5856a4811d Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 15:52:22 +0000 Subject: [PATCH 12/18] update docstrings Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/auto3dseg/utils.py | 45 +++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 5d05d5d4e2..8fb95099ff 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -401,7 +401,7 @@ def check_and_set_optional_args(params: dict) -> str: def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ - Prepare the command for job to run the script with the given arguments. + Prepare the command for subprocess to run the script with the given arguments. Args: cmd: the command or script to run in the distributed job. @@ -409,7 +409,7 @@ def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: kwargs: the keyword arguments to be passed to the script. Returns: - the command to run the distributed job. + the command to run with ``subprocess``. Examples: To prepare a subprocess command @@ -431,17 +431,17 @@ def _create_torchrun(cmd: str, **kwargs: Any) -> str: Args: cmd: the command or script to run in the distributed job. - cmd_prefix: the command prefix to run the script, e.g., "torchrun ", "python -m torch.distributed.launch ". kwargs: the keyword arguments to be passed to the script. Returns: - the command to run the multi-gpu/multi-node job. + the command to append to ``torchrun`` Examples: - To prepare a subprocess command to append to torchrun command (torchrun --nnodes=1 --nproc_per_node=8) - "train.py run -k --config 'a,b'", the function can be called as - - _create_torchrun("train.py run -k", config=['a','b'], nnodes=1, nproc_per_node=8) - - _create_torchrun("train.py run -k --config 'a,b'", nnodes=1, nproc_per_node=8) + For command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", + it only prepares command after the torchrun arguments, i.e., "train.py run -k --config 'a,b'". + The function can be called as + - _create_torchrun("train.py run -k", config=['a','b']) + - _create_torchrun("train.py run -k --config 'a,b'") """ params = kwargs.copy() return cmd + check_and_set_optional_args(params) @@ -449,7 +449,7 @@ def _create_torchrun(cmd: str, **kwargs: Any) -> str: def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ - Prepare the command for distributed job submission using bcprun. + Prepare the command for distributed job running using bcprun. Args: script: the script to run in the distributed job. @@ -460,15 +460,26 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: The command to run the script in the distributed job. Examples: - To prepare a subprocess command - "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", the function can be called as + For command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", + it only prepares command after the bcprun arguments, i.e., "train.py run -k --config 'a,b'". + the function can be called as - _create_bcprun("train.py run -k", config=['a','b'], n=2, p=8) - _create_bcprun("train.py run -k --config 'a,b'", n=2, p=8) """ - return _create_default(cmd, cmd_prefix, **kwargs) + return _create_default(cmd, cmd_prefix=cmd_prefix, **kwargs) def _run_cmd_torchrun(cmd: str, **kwargs): + """ + Run the command with torchrun. + + Args: + cmd: the command to run. Typically it is prepared by ``_create_torchrun``. + kwargs: the keyword arguments to be passed to the ``torchrun``. + + Return: + the return code of the subprocess command. + """ params = kwargs.copy() cmd_list = cmd.split() @@ -484,6 +495,16 @@ def _run_cmd_torchrun(cmd: str, **kwargs): return run_cmd(torchrun_list, **params) def _run_cmd_bcprun(cmd: str, **kwargs): + """ + Run the command with bcprun. + + Args: + cmd: the command to run. Typically it is prepared by ``_create_bcprun``. + kwargs: the keyword arguments to be passed to the ``bcprun``. + + Returns: + the return code of the subprocess command. + """ params = kwargs.copy() cmd_list = ["bcprun"] required_args = ["n", "p"] From 93bdd6385aef461b749e86403d339d62a5a8c51e Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 15:52:49 +0000 Subject: [PATCH 13/18] autofix Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 57 +++++++++++------------- monai/apps/auto3dseg/ensemble_builder.py | 16 ++++--- monai/auto3dseg/utils.py | 14 +++--- 3 files changed, 45 insertions(+), 42 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index f6d4822778..3463eddb97 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -13,6 +13,7 @@ import importlib import os +import re import shutil import subprocess import sys @@ -25,15 +26,21 @@ from urllib.parse import urlparse import torch -import re from monai.apps import download_and_extract from monai.apps.utils import get_logger from monai.auto3dseg.algo_gen import Algo, AlgoGen -from monai.auto3dseg.utils import algo_to_pickle, _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun +from monai.auto3dseg.utils import ( + _create_bcprun, + _create_default, + _create_torchrun, + _run_cmd_bcprun, + _run_cmd_torchrun, + algo_to_pickle, +) from monai.bundle.config_parser import ConfigParser from monai.config import PathLike -from monai.utils import ensure_tuple, run_cmd, look_up_option +from monai.utils import ensure_tuple, look_up_option, run_cmd from monai.utils.enums import AlgoKeys logger = get_logger(module_name=__name__) @@ -178,7 +185,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: config_files = [] if os.path.isdir(config_dir): for file in sorted(os.listdir(config_dir)): - if (file.endswith("yaml") or file.endswith("json")): + if file.endswith("yaml") or file.endswith("json"): # Python Fire may be confused by single-quoted WindowsPath config_files.append(Path(os.path.join(config_dir, file)).as_posix()) @@ -193,25 +200,21 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: "Try modify BundleAlgo._create_cmd for your cluster." ) from err - return _create_bcprun( - f"{train_py} run", - cmd_prefix=self.device_setting["CMD_PREFIX"], - config_file=config_files - **params, - ), "" + return ( + _create_bcprun( + f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files**params + ), + "", + ) elif int(self.device_setting["n_devices"]) > 1: - return _create_torchrun( - f"{train_py} run", - config_file=config_files, - **params, - ), "" + return _create_torchrun(f"{train_py} run", config_file=config_files, **params), "" else: - return _create_default( - f"{train_py} run", - cmd_prefix=self.device_setting["CMD_PREFIX"], - config_file=config_files, - **params - ), "" + return ( + _create_default( + f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params + ), + "", + ) def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProcess: """ @@ -236,18 +239,10 @@ def _run_cmd(self, cmd: str, devices_info: str = "") -> subprocess.CompletedProc "Try modify BundleAlgo._run_cmd for your cluster." ) from err - return _run_cmd_bcprun( - cmd, - n=self.device_setting["NUM_NODES"], - p=self.device_setting["n_devices"], - ) + return _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) elif int(self.device_setting["n_devices"]) > 1: return _run_cmd_torchrun( - cmd, - nnodes=1, - nproc_per_node=self.device_setting["n_devices"], - env=ps_environ, - check=True + cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True ) else: return run_cmd(cmd.split(), env=ps_environ, check=True) diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index dcf0b3a979..56c09a0473 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -26,7 +26,14 @@ from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history from monai.apps.utils import get_logger from monai.auto3dseg import concat_val_to_np -from monai.auto3dseg.utils import datafold_read, _create_torchrun, _create_bcprun, _create_default, _run_cmd_bcprun, _run_cmd_torchrun +from monai.auto3dseg.utils import ( + _create_bcprun, + _create_default, + _create_torchrun, + _run_cmd_bcprun, + _run_cmd_torchrun, + datafold_read, +) from monai.bundle import ConfigParser from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble @@ -655,10 +662,7 @@ def _create_cmd(self) -> None: else: logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!") cmd = _create_torchrun("-m " + base_cmd) - _run_cmd_torchrun(cmd, - nnodes=1, - nproc_per_node=self.device_setting["n_devices"], - env=ps_environ, - check=True + _run_cmd_torchrun( + cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True ) return diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 8fb95099ff..97a8cba5af 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -373,6 +373,7 @@ def algo_from_pickle(pkl_filename: str, template_path: PathLike | None = None, * return algo, algo_meta_data + def list_to_python_fire_arg_str(args: list) -> str: """ Convert a list of arguments to a string that can be used in python-fire. @@ -386,9 +387,9 @@ def list_to_python_fire_arg_str(args: list) -> str: args_str = ",".join([str(arg) for arg in args]) return f"'{args_str}'" + def check_and_set_optional_args(params: dict) -> str: - """ - """ + """ """ cmd_mod_opt = "" for k, v in params.items(): if isinstance(v, dict): @@ -425,6 +426,7 @@ def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: return cmd_prefix + cmd + check_and_set_optional_args(params) + def _create_torchrun(cmd: str, **kwargs: Any) -> str: """ Prepare the command for multi-gpu/multi-node job execution using torchrun. @@ -469,6 +471,7 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: return _create_default(cmd, cmd_prefix=cmd_prefix, **kwargs) + def _run_cmd_torchrun(cmd: str, **kwargs): """ Run the command with torchrun. @@ -476,7 +479,7 @@ def _run_cmd_torchrun(cmd: str, **kwargs): Args: cmd: the command to run. Typically it is prepared by ``_create_torchrun``. kwargs: the keyword arguments to be passed to the ``torchrun``. - + Return: the return code of the subprocess command. """ @@ -494,14 +497,15 @@ def _run_cmd_torchrun(cmd: str, **kwargs): torchrun_list += cmd_list return run_cmd(torchrun_list, **params) + def _run_cmd_bcprun(cmd: str, **kwargs): """ Run the command with bcprun. Args: cmd: the command to run. Typically it is prepared by ``_create_bcprun``. - kwargs: the keyword arguments to be passed to the ``bcprun``. - + kwargs: the keyword arguments to be passed to the ``bcprun``. + Returns: the return code of the subprocess command. """ From f4354782c3a4d01a7f128189de0a0b3d63228ab9 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Sun, 25 Jun 2023 16:00:09 +0000 Subject: [PATCH 14/18] fix mypy Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 10 ++++++++-- monai/apps/auto3dseg/ensemble_builder.py | 13 +++---------- monai/auto3dseg/utils.py | 7 ++++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 3463eddb97..7778d13527 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -202,7 +202,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return ( _create_bcprun( - f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files**params + f"{train_py} run", + cmd_prefix=str(self.device_setting["CMD_PREFIX"]), + config_file=config_files, + **params, ), "", ) @@ -211,7 +214,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: else: return ( _create_default( - f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params + f"{train_py} run", + cmd_prefix=str(self.device_setting["CMD_PREFIX"]), + config_file=config_files, + **params, ), "", ) diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index 56c09a0473..fae281cb28 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -26,20 +26,13 @@ from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history from monai.apps.utils import get_logger from monai.auto3dseg import concat_val_to_np -from monai.auto3dseg.utils import ( - _create_bcprun, - _create_default, - _create_torchrun, - _run_cmd_bcprun, - _run_cmd_torchrun, - datafold_read, -) +from monai.auto3dseg.utils import _create_bcprun, _create_torchrun, _run_cmd_bcprun, _run_cmd_torchrun, datafold_read from monai.bundle import ConfigParser from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble from monai.utils import RankFilter, deprecated_arg from monai.utils.enums import AlgoKeys -from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class, run_cmd +from monai.utils.misc import check_kwargs_exist_in_class_init, prob2class from monai.utils.module import look_up_option, optional_import tqdm, has_tqdm = optional_import("tqdm", name="tqdm") @@ -656,7 +649,7 @@ def _create_cmd(self) -> None: "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = _create_bcprun("-m " + base_cmd, cmd_prefix=self.device_setting["CMD_PREFIX"]) + cmd = _create_bcprun("-m " + base_cmd, cmd_prefix=str(self.device_setting["CMD_PREFIX"])) _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 97a8cba5af..7a4d79cb8d 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -14,6 +14,7 @@ import logging import os import pickle +import subprocess import sys from copy import deepcopy from numbers import Number @@ -472,7 +473,7 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: return _create_default(cmd, cmd_prefix=cmd_prefix, **kwargs) -def _run_cmd_torchrun(cmd: str, **kwargs): +def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: """ Run the command with torchrun. @@ -498,7 +499,7 @@ def _run_cmd_torchrun(cmd: str, **kwargs): return run_cmd(torchrun_list, **params) -def _run_cmd_bcprun(cmd: str, **kwargs): +def _run_cmd_bcprun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: """ Run the command with bcprun. @@ -516,5 +517,5 @@ def _run_cmd_bcprun(cmd: str, **kwargs): if arg not in params: raise ValueError(f"Missing required argument {arg} for bcprun.") cmd_list += [f"-{arg}", str(params.pop(arg))] - cmd_list += ["-c"] + cmd + cmd_list.extend(["-c", cmd]) return run_cmd(cmd_list, **params) From 68793787c49f329d987d220acc44348ebb8d20f0 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Tue, 4 Jul 2023 14:16:08 +0000 Subject: [PATCH 15/18] fix comments Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 12 ++++++------ monai/apps/auto3dseg/ensemble_builder.py | 12 +++++++++--- monai/auto3dseg/utils.py | 24 ++++++++++++------------ 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 264e64080e..d81a23c04f 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -31,9 +31,9 @@ from monai.apps.utils import get_logger from monai.auto3dseg.algo_gen import Algo, AlgoGen from monai.auto3dseg.utils import ( - _create_bcprun, - _create_default, - _create_torchrun, + _prepare_cmd_bcprun, + _prepare_cmd_default, + _prepare_cmd_torchrun, _run_cmd_bcprun, _run_cmd_torchrun, algo_to_pickle, @@ -201,7 +201,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: ) from err return ( - _create_bcprun( + _prepare_cmd_bcprun( f"{train_py} run", cmd_prefix=str(self.device_setting["CMD_PREFIX"]), config_file=config_files, @@ -210,10 +210,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: "", ) elif int(self.device_setting["n_devices"]) > 1: - return _create_torchrun(f"{train_py} run", config_file=config_files, **params), "" + return _prepare_cmd_torchrun(f"{train_py} run", config_file=config_files, **params), "" else: return ( - _create_default( + _prepare_cmd_default( f"{train_py} run", cmd_prefix=str(self.device_setting["CMD_PREFIX"]), config_file=config_files, diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index fae281cb28..93854d7dfa 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -26,7 +26,13 @@ from monai.apps.auto3dseg.utils import get_name_from_algo_id, import_bundle_algo_history from monai.apps.utils import get_logger from monai.auto3dseg import concat_val_to_np -from monai.auto3dseg.utils import _create_bcprun, _create_torchrun, _run_cmd_bcprun, _run_cmd_torchrun, datafold_read +from monai.auto3dseg.utils import ( + _prepare_cmd_bcprun, + _prepare_cmd_torchrun, + _run_cmd_bcprun, + _run_cmd_torchrun, + datafold_read, +) from monai.bundle import ConfigParser from monai.data import partition_dataset from monai.transforms import MeanEnsemble, SaveImage, VoteEnsemble @@ -649,12 +655,12 @@ def _create_cmd(self) -> None: "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = _create_bcprun("-m " + base_cmd, cmd_prefix=str(self.device_setting["CMD_PREFIX"])) + cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=str(self.device_setting["CMD_PREFIX"])) _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: logger.info(f"Ensembling using {self.device_setting['n_devices']} GPU!") - cmd = _create_torchrun("-m " + base_cmd) + cmd = _prepare_cmd_torchrun("-m " + base_cmd) _run_cmd_torchrun( cmd, nnodes=1, nproc_per_node=self.device_setting["n_devices"], env=ps_environ, check=True ) diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 7a4d79cb8d..91aabcdbd2 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -401,7 +401,7 @@ def check_and_set_optional_args(params: dict) -> str: return cmd_mod_opt -def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _prepare_cmd_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ Prepare the command for subprocess to run the script with the given arguments. @@ -416,8 +416,8 @@ def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: Examples: To prepare a subprocess command "python train.py run -k --config 'a,b'", the function can be called as - - _create_default("train.py run -k", config=['a','b']) - - _create_default("train.py run -k --config 'a,b'") + - _prepare_cmd_default("train.py run -k", config=['a','b']) + - _prepare_cmd_default("train.py run -k --config 'a,b'") """ params = kwargs.copy() @@ -428,7 +428,7 @@ def _create_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: return cmd_prefix + cmd + check_and_set_optional_args(params) -def _create_torchrun(cmd: str, **kwargs: Any) -> str: +def _prepare_cmd_torchrun(cmd: str, **kwargs: Any) -> str: """ Prepare the command for multi-gpu/multi-node job execution using torchrun. @@ -443,14 +443,14 @@ def _create_torchrun(cmd: str, **kwargs: Any) -> str: For command "torchrun --nnodes=1 --nproc_per_node=8 train.py run -k --config 'a,b'", it only prepares command after the torchrun arguments, i.e., "train.py run -k --config 'a,b'". The function can be called as - - _create_torchrun("train.py run -k", config=['a','b']) - - _create_torchrun("train.py run -k --config 'a,b'") + - _prepare_cmd_torchrun("train.py run -k", config=['a','b']) + - _prepare_cmd_torchrun("train.py run -k --config 'a,b'") """ params = kwargs.copy() return cmd + check_and_set_optional_args(params) -def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _prepare_cmd_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: """ Prepare the command for distributed job running using bcprun. @@ -466,11 +466,11 @@ def _create_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: For command "bcprun -n 2 -p 8 -c python train.py run -k --config 'a,b'", it only prepares command after the bcprun arguments, i.e., "train.py run -k --config 'a,b'". the function can be called as - - _create_bcprun("train.py run -k", config=['a','b'], n=2, p=8) - - _create_bcprun("train.py run -k --config 'a,b'", n=2, p=8) + - _prepare_cmd_bcprun("train.py run -k", config=['a','b'], n=2, p=8) + - _prepare_cmd_bcprun("train.py run -k --config 'a,b'", n=2, p=8) """ - return _create_default(cmd, cmd_prefix=cmd_prefix, **kwargs) + return _prepare_cmd_default(cmd, cmd_prefix=cmd_prefix, **kwargs) def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: @@ -478,7 +478,7 @@ def _run_cmd_torchrun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: Run the command with torchrun. Args: - cmd: the command to run. Typically it is prepared by ``_create_torchrun``. + cmd: the command to run. Typically it is prepared by ``_prepare_cmd_torchrun``. kwargs: the keyword arguments to be passed to the ``torchrun``. Return: @@ -504,7 +504,7 @@ def _run_cmd_bcprun(cmd: str, **kwargs: Any) -> subprocess.CompletedProcess: Run the command with bcprun. Args: - cmd: the command to run. Typically it is prepared by ``_create_bcprun``. + cmd: the command to run. Typically it is prepared by ``_prepare_cmd_bcprun``. kwargs: the keyword arguments to be passed to the ``bcprun``. Returns: From eff330504a902499db4b7c8b6a85ae95f94256d8 Mon Sep 17 00:00:00 2001 From: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> Date: Wed, 5 Jul 2023 13:03:57 +0000 Subject: [PATCH 16/18] fix test error Signed-off-by: Mingxin <18563433+mingxin-zheng@users.noreply.github.com> --- monai/apps/auto3dseg/bundle_gen.py | 4 ++-- monai/apps/auto3dseg/ensemble_builder.py | 2 +- monai/auto3dseg/utils.py | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index d81a23c04f..1cc02ac8b4 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -203,7 +203,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return ( _prepare_cmd_bcprun( f"{train_py} run", - cmd_prefix=str(self.device_setting["CMD_PREFIX"]), + cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params, ), @@ -215,7 +215,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return ( _prepare_cmd_default( f"{train_py} run", - cmd_prefix=str(self.device_setting["CMD_PREFIX"]), + cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params, ), diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index 93854d7dfa..3ca8ee0f6d 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -655,7 +655,7 @@ def _create_cmd(self) -> None: "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=str(self.device_setting["CMD_PREFIX"])) + cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=self.device_setting["CMD_PREFIX"]) _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: diff --git a/monai/auto3dseg/utils.py b/monai/auto3dseg/utils.py index 91aabcdbd2..0da4ee9f26 100644 --- a/monai/auto3dseg/utils.py +++ b/monai/auto3dseg/utils.py @@ -401,7 +401,7 @@ def check_and_set_optional_args(params: dict) -> str: return cmd_mod_opt -def _prepare_cmd_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _prepare_cmd_default(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: """ Prepare the command for subprocess to run the script with the given arguments. @@ -422,6 +422,8 @@ def _prepare_cmd_default(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> """ params = kwargs.copy() + cmd_prefix = cmd_prefix or "python" + if not cmd_prefix.endswith(" "): cmd_prefix += " " # ensure a space after the command prefix so that the script can be appended @@ -450,7 +452,7 @@ def _prepare_cmd_torchrun(cmd: str, **kwargs: Any) -> str: return cmd + check_and_set_optional_args(params) -def _prepare_cmd_bcprun(cmd: str, cmd_prefix: str = "python", **kwargs: Any) -> str: +def _prepare_cmd_bcprun(cmd: str, cmd_prefix: str | None = None, **kwargs: Any) -> str: """ Prepare the command for distributed job running using bcprun. From 39efc625159be40c294bab00df3d6b2eaee7897b Mon Sep 17 00:00:00 2001 From: monai-bot Date: Wed, 5 Jul 2023 14:49:12 +0000 Subject: [PATCH 17/18] [MONAI] code formatting Signed-off-by: monai-bot --- monai/apps/auto3dseg/bundle_gen.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 1cc02ac8b4..930d4419fd 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -202,10 +202,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return ( _prepare_cmd_bcprun( - f"{train_py} run", - cmd_prefix=self.device_setting["CMD_PREFIX"], - config_file=config_files, - **params, + f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params ), "", ) @@ -214,10 +211,7 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: else: return ( _prepare_cmd_default( - f"{train_py} run", - cmd_prefix=self.device_setting["CMD_PREFIX"], - config_file=config_files, - **params, + f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params ), "", ) From 93d6e052d1a3053e7071c79681b482c6ddc565b8 Mon Sep 17 00:00:00 2001 From: Wenqi Li Date: Wed, 5 Jul 2023 16:29:18 +0100 Subject: [PATCH 18/18] fixes mypy Signed-off-by: Wenqi Li --- monai/apps/auto3dseg/bundle_gen.py | 12 +++++++++--- monai/apps/auto3dseg/ensemble_builder.py | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/monai/apps/auto3dseg/bundle_gen.py b/monai/apps/auto3dseg/bundle_gen.py index 930d4419fd..69cb25cc0e 100644 --- a/monai/apps/auto3dseg/bundle_gen.py +++ b/monai/apps/auto3dseg/bundle_gen.py @@ -95,7 +95,7 @@ def __init__(self, template_path: PathLike): "n_devices": int(torch.cuda.device_count()), "NUM_NODES": int(os.environ.get("NUM_NODES", 1)), "MN_START_METHOD": os.environ.get("MN_START_METHOD", "bcprun"), - "CMD_PREFIX": os.environ.get("CMD_PREFIX"), # type: ignore + "CMD_PREFIX": os.environ.get("CMD_PREFIX", ""), # type: ignore } def pre_check_skip_algo(self, skip_bundlegen: bool = False, skip_info: str = "") -> tuple[bool, str]: @@ -202,7 +202,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: return ( _prepare_cmd_bcprun( - f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params + f"{train_py} run", + cmd_prefix=f"{self.device_setting['CMD_PREFIX']}", + config_file=config_files, + **params, ), "", ) @@ -211,7 +214,10 @@ def _create_cmd(self, train_params: None | dict = None) -> tuple[str, str]: else: return ( _prepare_cmd_default( - f"{train_py} run", cmd_prefix=self.device_setting["CMD_PREFIX"], config_file=config_files, **params + f"{train_py} run", + cmd_prefix=f"{self.device_setting['CMD_PREFIX']}", + config_file=config_files, + **params, ), "", ) diff --git a/monai/apps/auto3dseg/ensemble_builder.py b/monai/apps/auto3dseg/ensemble_builder.py index 3ca8ee0f6d..ce2c2895d6 100644 --- a/monai/apps/auto3dseg/ensemble_builder.py +++ b/monai/apps/auto3dseg/ensemble_builder.py @@ -655,7 +655,7 @@ def _create_cmd(self) -> None: "Try modify EnsembleRunner._create_cmd for your cluster." ) logger.info(f"Ensembling on {self.device_setting['NUM_NODES']} nodes!") - cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=self.device_setting["CMD_PREFIX"]) + cmd = _prepare_cmd_bcprun("-m " + base_cmd, cmd_prefix=f"{self.device_setting['CMD_PREFIX']}") _run_cmd_bcprun(cmd, n=self.device_setting["NUM_NODES"], p=self.device_setting["n_devices"]) else: