Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ jobs:
export example_path="examples/contrib/cifar10"
# initial run
export stop_cmd="--stop_iteration=500"
export test_cmd="CI=1 python -u ${example_path}/main.py run --backend=nccl --num_procs_per_node=2"
export test_cmd="CI=1 python -u ${example_path}/main.py run --backend=nccl --nproc_per_node=2"
docker exec -it pthd /bin/bash -c "${test_cmd} ${stop_cmd}"
# resume
export resume_opt="--resume-from=/tmp/output-cifar10/resnet18_backend-nccl-2_stop-on-500/training_checkpoint_400.pt"
Expand Down
2 changes: 1 addition & 1 deletion examples/contrib/cifar10/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ python -u -m torch.distributed.launch --nproc_per_node=2 --use_env main.py run -
or
```bash
# using function spawn inside the code
python -u main.py run --backend="nccl" --num_procs_per_node=2
python -u main.py run --backend="nccl" --nproc_per_node=2
```

If user would like to provide already downloaded dataset, the path can be setup in parameters as
Expand Down
8 changes: 4 additions & 4 deletions examples/contrib/cifar10/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def run(
backend=None,
resume_from=None,
log_every_iters=15,
num_procs_per_node=None,
nproc_per_node=None,
stop_iteration=None,
with_trains=False,
**spawn_kwargs
Expand All @@ -156,22 +156,22 @@ def run(
checkpoint_every (int): store training checkpoint every ``checkpoint_every`` iterations. Default, 200.
backend (str, optional): backend to use for distributed configuration. Possible values: None, "nccl", "xla-tpu",
"gloo" etc. Default, None.
num_procs_per_node (int, optional): optional argument to setup number of processes per node. It is useful,
nproc_per_node (int, optional): optional argument to setup number of processes per node. It is useful,
when main python process is spawning training as child processes.
resume_from (str, optional): path to checkpoint to use to resume the training from. Default, None.
log_every_iters (int): argument to log progress every ``log_every_iters`` iterations. It can be 0 to disable it.
Default, 15.
stop_iteration (int, optional): iteration to stop the training. Can be used to check resume from checkpoint.
with_trains (bool): if True, experiment Trains logger is setup. Default, False.
**spawn_kwargs: Other kwargs to spawn run in child processes: master_addr, master_port, node_rank, num_nodes
**spawn_kwargs: Other kwargs to spawn run in child processes: master_addr, master_port, node_rank, nnodes

"""
# catch all local parameters
config = locals()
config.update(config["spawn_kwargs"])
del config["spawn_kwargs"]

spawn_kwargs["num_procs_per_node"] = num_procs_per_node
spawn_kwargs["nproc_per_node"] = nproc_per_node

with idist.Parallel(backend=backend, **spawn_kwargs) as parallel:

Expand Down
2 changes: 1 addition & 1 deletion ignite/distributed/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def auto_dataloader(dataset, **kwargs):
kwargs["batch_size"] //= world_size

if "num_workers" in kwargs:
nproc = idist.get_ntasks_per_node()
nproc = idist.get_nproc_per_node()
kwargs["num_workers"] = (kwargs["num_workers"] + nproc - 1) // nproc

if "batch_sampler" not in kwargs:
Expand Down
22 changes: 11 additions & 11 deletions ignite/distributed/comp_models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ class ComputationModel(metaclass=ABCMeta):

def __init__(self):
self._backend = None
self._ntasks_per_node = None
self._nproc_per_node = None
self._nnodes = None
self._node = None

def _setup_attrs(self):
if self._ntasks_per_node is None:
self._ntasks_per_node = self._compute_ntasks_per_node() if self.get_world_size() > 1 else 1
if self._nproc_per_node is None:
self._nproc_per_node = self._compute_nproc_per_node() if self.get_world_size() > 1 else 1
if self._nnodes is None:
self._nnodes = self.get_world_size() // self._ntasks_per_node
self._nnodes = self.get_world_size() // self._nproc_per_node
if self._node is None:
self._node = self.get_rank() // self._ntasks_per_node
self._node = self.get_rank() // self._nproc_per_node

@abstractmethod
def _compute_ntasks_per_node(self) -> int:
def _compute_nproc_per_node(self) -> int:
pass

@abstractmethod
Expand All @@ -46,11 +46,11 @@ def get_world_size(self) -> int:
pass

@abstractmethod
def get_ntasks_per_node(self) -> int:
def get_nproc_per_node(self) -> int:
pass

@abstractmethod
def get_num_nodes(self) -> int:
def get_nnodes(self) -> int:
pass

@abstractmethod
Expand Down Expand Up @@ -181,10 +181,10 @@ def get_rank(self) -> int:
def get_world_size(self) -> int:
return 1

def get_ntasks_per_node(self) -> int:
def get_nproc_per_node(self) -> int:
return 1

def get_num_nodes(self) -> int:
def get_nnodes(self) -> int:
return 1

def get_node_rank(self) -> int:
Expand All @@ -201,7 +201,7 @@ def backend(self) -> None:
def finalize(self):
pass

def _compute_ntasks_per_node(self) -> int:
def _compute_nproc_per_node(self) -> int:
return 1

@staticmethod
Expand Down
18 changes: 9 additions & 9 deletions ignite/distributed/comp_models/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _init_from_context(self):
self._master_addr = None
self._setup_attrs()

def _compute_ntasks_per_node(self):
def _compute_nproc_per_node(self):
tensor = torch.tensor([self.get_local_rank() + 1]).to(self.device())
dist.all_reduce(tensor, op=dist.ReduceOp.MAX)
return tensor.item()
Expand Down Expand Up @@ -202,10 +202,10 @@ def get_rank(self) -> int:
def get_world_size(self) -> int:
return dist.get_world_size()

def get_ntasks_per_node(self) -> int:
return self._ntasks_per_node
def get_nproc_per_node(self) -> int:
return self._nproc_per_node

def get_num_nodes(self) -> int:
def get_nnodes(self) -> int:
return self._nnodes

def get_node_rank(self) -> int:
Expand Down Expand Up @@ -249,15 +249,15 @@ def spawn(
fn: Callable,
args: Tuple,
kwargs_dict: Optional[Mapping] = None,
num_procs_per_node: int = 1,
num_nodes: int = 1,
nproc_per_node: int = 1,
nnodes: int = 1,
node_rank: int = 0,
master_addr: str = "127.0.0.1",
master_port: int = 2222,
backend: str = "nccl",
**kwargs
):
world_size = num_nodes * num_procs_per_node
world_size = nnodes * nproc_per_node

spawn_kwargs = {
"join": kwargs.get("join", True),
Expand All @@ -269,14 +269,14 @@ def spawn(

mp.spawn(
_NativeDistModel._dist_worker_task_fn,
nprocs=num_procs_per_node,
nprocs=nproc_per_node,
args=(
backend,
fn,
args,
kwargs_dict,
world_size,
num_procs_per_node,
nproc_per_node,
node_rank,
master_addr,
master_port,
Expand Down
14 changes: 7 additions & 7 deletions ignite/distributed/comp_models/xla.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _init_from_context(self):
self._backend = "xla-tpu"
self._setup_attrs()

def _compute_ntasks_per_node(self):
def _compute_nproc_per_node(self):
tensor = torch.tensor([self.get_local_rank() + 1.0], dtype=torch.float).to(self.device())
xm.all_reduce("max", [tensor,])
return int(tensor.item())
Expand All @@ -74,10 +74,10 @@ def get_rank(self) -> int:
def get_world_size(self) -> int:
return xm.xrt_world_size()

def get_ntasks_per_node(self) -> int:
return self._ntasks_per_node
def get_nproc_per_node(self) -> int:
return self._nproc_per_node

def get_num_nodes(self) -> int:
def get_nnodes(self) -> int:
return self._nnodes

def get_node_rank(self) -> int:
Expand Down Expand Up @@ -107,8 +107,8 @@ def spawn(
fn: Callable,
args: Tuple,
kwargs_dict: Optional[Mapping] = None,
num_procs_per_node: int = 1,
num_nodes: int = 1,
nproc_per_node: int = 1,
nnodes: int = 1,
node_rank: int = 0,
backend: str = "xla-tpu",
**kwargs
Expand All @@ -121,7 +121,7 @@ def spawn(
xmp.spawn(
_XlaDistModel._dist_worker_task_fn,
args=(backend, fn, args, kwargs_dict),
nprocs=num_procs_per_node,
nprocs=nproc_per_node,
**kwargs,
)

Expand Down
66 changes: 32 additions & 34 deletions ignite/distributed/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Parallel:

- XLA on TPUs via `pytorch/xla <https://github.com/pytorch/xla>`_

Namely, it can 1) spawn ``num_procs_per_node`` child processes and initialize a processing group according to
Namely, it can 1) spawn ``nproc_per_node`` child processes and initialize a processing group according to
provided ``backend`` (useful for standalone scripts) or 2) only initialize a processing group given the ``backend``
(useful with tools like `torch.distributed.launch`_).

Expand Down Expand Up @@ -76,7 +76,7 @@ def training(local_rank, config, **kwargs):
print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
# ...

with idist.Parallel(backend="nccl", num_procs_per_node=4) as parallel:
with idist.Parallel(backend="nccl", nproc_per_node=4) as parallel:
parallel.run(training, config, a=1, b=2)


Expand All @@ -97,7 +97,7 @@ def training(local_rank, config, **kwargs):
print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
# ...

with idist.Parallel(backend="xla-tpu", num_procs_per_node=8) as parallel:
with idist.Parallel(backend="xla-tpu", nproc_per_node=8) as parallel:
parallel.run(training, config, a=1, b=2)


Expand All @@ -124,8 +124,8 @@ def training(local_rank, config, **kwargs):
# ...

dist_config = {
"num_procs_per_node": 8,
"num_nodes": 2,
"nproc_per_node": 8,
"nnodes": 2,
"node_rank": args.node_rank,
"master_addr": "master",
"master_port": 15000
Expand All @@ -138,25 +138,25 @@ def training(local_rank, config, **kwargs):

Args:
backend (str, optional): backend to use: `nccl`, `gloo`, `xla-tpu`. If None, no distributed configuration.
num_procs_per_node (int, optional): optional argument, number of processes per
node to specify. If not None, :meth:`~ignite.distributed.Parallel.run` will spawn ``num_procs_per_node``
nproc_per_node (int, optional): optional argument, number of processes per
node to specify. If not None, :meth:`~ignite.distributed.Parallel.run` will spawn ``nproc_per_node``
processes that run input function with its arguments.
num_nodes (int, optional): optional argument, number of nodes participating in distributed configuration.
If not None, :meth:`~ignite.distributed.Parallel.run` will spawn ``num_procs_per_node``
processes that run input function with its arguments. Total world size is `num_procs_per_node * num_nodes`.
node_rank (int, optional): optional argument, current machine index. Mandatory argument if ``num_nodes`` is
nnodes (int, optional): optional argument, number of nodes participating in distributed configuration.
If not None, :meth:`~ignite.distributed.Parallel.run` will spawn ``nproc_per_node``
processes that run input function with its arguments. Total world size is `nproc_per_node * nnodes`.
node_rank (int, optional): optional argument, current machine index. Mandatory argument if ``nnodes`` is
specified and larger than one.
master_addr (str, optional): optional argument, master node TCP/IP address for torch native backends
(`nccl`, `gloo`). Mandatory argument if ``num_nodes`` is specified and larger than one.
(`nccl`, `gloo`). Mandatory argument if ``nnodes`` is specified and larger than one.
master_port (int, optional): optional argument, master node port for torch native backends
(`nccl`, `gloo`). Mandatory argument if ``master_addr`` is specified.
"""

def __init__(
self,
backend: str = None,
num_procs_per_node: Optional[int] = None,
num_nodes: Optional[int] = None,
nproc_per_node: Optional[int] = None,
nnodes: Optional[int] = None,
node_rank: Optional[int] = None,
master_addr: Optional[str] = None,
master_port: Optional[str] = None,
Expand All @@ -167,8 +167,8 @@ def __init__(
"Unknown backend '{}'. Available backends: {}".format(backend, idist.available_backends())
)
else:
arg_names = ["num_procs_per_node", "num_nodes", "node_rank", "master_addr", "master_port"]
arg_values = [num_procs_per_node, num_nodes, node_rank, master_addr, master_port]
arg_names = ["nproc_per_node", "nnodes", "node_rank", "master_addr", "master_port"]
arg_values = [nproc_per_node, nnodes, node_rank, master_addr, master_port]
for name, value in zip(arg_names, arg_values):
if value is not None:
raise ValueError(
Expand All @@ -181,39 +181,39 @@ def __init__(
# distributed_rank=0 <=> explicit rank 0, avoid call idist. Critical for TPU on Colab, avoid context is setup

if self.backend is not None:
if num_procs_per_node is not None:
if nproc_per_node is not None:
self._spawn_params = self._setup_spawn_params(
num_procs_per_node, num_nodes, node_rank, master_addr, master_port
nproc_per_node, nnodes, node_rank, master_addr, master_port
)

if self._spawn_params is not None:
self.logger.info("Initialized distributed launcher with backend: '{}'".format(self.backend))
msg = "\n\t".join(["{}: {}".format(k, v) for k, v in self._spawn_params.items() if v is not None])
self.logger.info("- Parameters to spawn processes: \n\t{}".format(msg))

def _setup_spawn_params(self, num_procs_per_node, num_nodes, node_rank, master_addr, master_port):
if num_procs_per_node < 1:
raise ValueError("Argument num_procs_per_node should positive, but given {}".format(num_procs_per_node))
if num_nodes is None:
num_nodes = 1
if num_nodes < 1:
raise ValueError("Argument num_nodes should positive, but given {}".format(num_nodes))
def _setup_spawn_params(self, nproc_per_node, nnodes, node_rank, master_addr, master_port):
if nproc_per_node < 1:
raise ValueError("Argument nproc_per_node should positive, but given {}".format(nproc_per_node))
if nnodes is None:
nnodes = 1
if nnodes < 1:
raise ValueError("Argument nnodes should positive, but given {}".format(nnodes))
if node_rank is None:
if num_nodes > 1:
if nnodes > 1:
raise ValueError("If number of nodes larger than one, arguments node_rank should be given")
node_rank = 0
if node_rank >= num_nodes or node_rank < 0:
if node_rank >= nnodes or node_rank < 0:
raise ValueError(
"Argument node_rank should be between 0 and {}, but given {}".format(num_nodes - 1, node_rank)
"Argument node_rank should be between 0 and {}, but given {}".format(nnodes - 1, node_rank)
)
if num_nodes > 1 and (master_addr is None or master_port is None):
if nnodes > 1 and (master_addr is None or master_port is None):
raise ValueError(
"If number of nodes larger than one, arguments master_addr and master_port "
"should be specified, but given master_addr={} and master_port={}".format(master_addr, master_port)
)
params = {
"num_procs_per_node": num_procs_per_node,
"num_nodes": num_nodes,
"nproc_per_node": nproc_per_node,
"nnodes": nnodes,
"node_rank": node_rank,
"master_addr": master_addr,
"master_port": master_port,
Expand All @@ -240,9 +240,7 @@ def training(local_rank, config, **kwargs):

"""
if self._spawn_params is not None:
self.logger.info(
"Spawn function '{}' in {} processes".format(func, self._spawn_params["num_procs_per_node"])
)
self.logger.info("Spawn function '{}' in {} processes".format(func, self._spawn_params["nproc_per_node"]))
idist.spawn(self.backend, func, args=args, kwargs_dict=kwargs, **self._spawn_params)
else:
self.logger.info("- Run '{}' in {} processes".format(func, idist.get_world_size()))
Expand Down
Loading