-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cannot use other start method for multiprocessing #204
Comments
Thanks for reporting the issue. |
Sorry for the typo, I will edit my message. |
Hi @Guriido , Thanks for the info. I tried your test script in several environments.
However, we are sure that the multiprocessing module sometimes causes a trouble and Could you provide more information about your environment? Thanks! |
I am quite surprised all the test passed, maybe the problem is related to the linux distribution ? About IB configuration: I understand the motivation behind the use of |
CA 'mlx4_0' It looks exactly the same as yours. 🤔 BTW, what is your |
As you could pass the test with a quite similar environment, I wanted to retest my script. However, I still get errors using MultiprocessIterator with Mnist train script. (in What concerns me is that the call to Click for details
about modules versions:
mnist + multiprocess iterator sample code: Click to expand#!/usr/bin/env python
from __future__ import print_function
import argparse
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training
from chainer.training import extensions
from mpi4py import MPI
import chainermn
import multiprocessing
class MLP(chainer.Chain):
def __init__(self, n_units, n_out):
super(MLP, self).__init__(
# the size of the inputs to each layer will be inferred
l1=L.Linear(784, n_units), # n_in -> n_units
l2=L.Linear(n_units, n_units), # n_units -> n_units
l3=L.Linear(n_units, n_out), # n_units -> n_out
)
def __call__(self, x):
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
return self.l3(h2)
def main():
parser = argparse.ArgumentParser(description='ChainerMN example: MNIST')
parser.add_argument('--batchsize', '-b', type=int, default=100,
help='Number of images in each mini-batch')
parser.add_argument('--communicator', type=str,
default='hierarchical', help='Type of communicator')
parser.add_argument('--epoch', '-e', type=int, default=20,
help='Number of sweeps over the dataset to train')
parser.add_argument('--gpu', '-g', action='store_true',
help='Use GPU')
parser.add_argument('--out', '-o', default='result',
help='Directory to output the result')
parser.add_argument('--resume', '-r', default='',
help='Resume the training from snapshot')
parser.add_argument('--unit', '-u', type=int, default=1000,
help='Number of units')
parser.add_argument('--loaderjob', '-j', type=int, default=0,
help='Number of parallel data loading processes')
args = parser.parse_args()
# Prepare ChainerMN communicator.
if args.gpu:
if args.communicator == 'naive':
print("Error: 'naive' communicator does not support GPU.\n")
exit(-1)
comm = chainermn.create_communicator(args.communicator)
device = comm.intra_rank
else:
if args.communicator != 'naive':
print('Warning: using naive communicator '
'because only naive supports CPU-only execution')
comm = chainermn.create_communicator('naive')
device = -1
if comm.mpi_comm.rank == 0:
print('==========================================')
print('Num process (COMM_WORLD): {}'.format(MPI.COMM_WORLD.Get_size()))
if args.gpu:
print('Using GPUs')
print('Using {} communicator'.format(args.communicator))
print('Num unit: {}'.format(args.unit))
print('Num Minibatch-size: {}'.format(args.batchsize))
print('Num epoch: {}'.format(args.epoch))
print('==========================================')
model = L.Classifier(MLP(args.unit, 10))
if device >= 0:
chainer.cuda.get_device(device).use()
model.to_gpu()
# Create a multi node optimizer from a standard Chainer optimizer.
optimizer = chainermn.create_multi_node_optimizer(
chainer.optimizers.Adam(), comm)
optimizer.setup(model)
# Split and distribute the dataset. Only worker 0 loads the whole dataset.
# Datasets of worker 0 are evenly split and distributed to all workers.
if comm.rank == 0:
train, test = chainer.datasets.get_mnist()
else:
train, test = None, None
train = chainermn.scatter_dataset(train, comm, shuffle=True)
test = chainermn.scatter_dataset(test, comm, shuffle=True)
if args.loaderjob == 0:
train_iter = chainer.iterators.SerialIterator(train, args.batchsize)
test_iter = chainer.iterators.SerialIterator(test, args.batchsize,
repeat=False, shuffle=False)
else:
multiprocessing.set_start_method('forkserver')
train_iter = chainer.iterators.MultiprocessIterator(train, args.batchsize, n_processes=args.loaderjob)
test_iter = chainer.iterators.MultiprocessIterator(test, args.batchsize, repeat=False, n_processes=args.loaderjob)
updater = training.StandardUpdater(train_iter, optimizer, device=device)
trainer = training.Trainer(updater, (args.epoch, 'epoch'), out=args.out)
# Create a multi node evaluator from a standard Chainer evaluator.
evaluator = extensions.Evaluator(test_iter, model, device=device)
evaluator = chainermn.create_multi_node_evaluator(evaluator, comm)
trainer.extend(evaluator)
# Some display and output extensions are necessary only for one worker.
# (Otherwise, there would just be repeated outputs.)
if comm.rank == 0:
trainer.extend(extensions.dump_graph('main/loss'))
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(
['epoch', 'main/loss', 'validation/main/loss',
'main/accuracy', 'validation/main/accuracy', 'elapsed_time']))
trainer.extend(extensions.ProgressBar())
if args.resume:
chainer.serializers.load_npz(args.resume, trainer)
trainer.run()
if __name__ == '__main__':
main() |
Thanks for the code. I tried your MNIST code with the following environment and start_method, and they were all OK. They are all mpi4py 3.0.0, 2 processes on physically different nodes (so IB is used) However, again, we know that there is (at least was) an issue. Hmm... |
Thanks again for all the tests. Also, the crash occurs even when all the processes are running from the same node. (so supposedly IB is not in cause) What kind of crash do you experience under docker environment ? |
We use an in-house Docker images, so I'm afraid I cannot give you information on that, unfortunately. However, we experienced the "start_method" issue since the early stage of ChainerMN development and we were not using Docker back then. So the issue can happen in non-Docker environment anyways. Let me investigate the issue a bit more with ImageNet and other codes. |
I have made further tests concerning the issues above. I made all the tests with the same environment as above, with an altered version of mnist training script. Here is the report of the tests:
I give the test script, and a sample of what I called NB: Even with a serial iterator for test dataset, I have pretty decent speed when training Imagenet on a cluster, but this doesn't solve the problem. test_script Click for details#!/usr/bin/env python
from __future__ import print_function
import argparse
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import training
from chainer.training import extensions
from mpi4py import MPI
import chainermn
import multiprocessing
import numpy
class MLP(chainer.Chain):
def __init__(self, n_units, n_out):
super(MLP, self).__init__(
# the size of the inputs to each layer will be inferred
l1=L.Linear(784, n_units), # n_in -> n_units
l2=L.Linear(n_units, n_units), # n_units -> n_units
l3=L.Linear(n_units, n_out), # n_units -> n_out
)
def __call__(self, x):
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
return self.l3(h2)
def main():
parser = argparse.ArgumentParser(description='ChainerMN example: MNIST')
parser.add_argument('--batchsize', '-b', type=int, default=100,
help='Number of images in each mini-batch')
parser.add_argument('--communicator', type=str,
default='hierarchical', help='Type of communicator')
parser.add_argument('--epoch', '-e', type=int, default=60,
help='Number of sweeps over the dataset to train')
parser.add_argument('--out', '-o', default='result',
help='Directory to output the result')
parser.add_argument('--resume', '-r', default='',
help='Resume the training from snapshot')
parser.add_argument('--unit', '-u', type=int, default=1000,
help='Number of units')
parser.add_argument('--method', type=str, default='',
help='start methods: fork forkserver spawn')
parser.add_argument('--scatter_no_comm', action='store_true', help='do not use chainermn builtin scatter function')
parser.add_argument('--loaderjob', '-j', type=int, default=0, help='Number of parallel data loading processes')
parser.add_argument('--double_buffering', action='store_true', help='improves speed')
parser.add_argument('--shuffle_seed', type=int, default=0, help='Seed used to shuffle dataset during scattering')
parser.add_argument('--test_mp', action='store_true', help='use MultiProcess iterator for test set when also used for train')
args = parser.parse_args()
# Prepare ChainerMN communicator.
if args.double_buffering:
args.communicator = 'pure_nccl'
comm = chainermn.create_communicator(args.communicator)
device = comm.intra_rank
if comm.mpi_comm.rank == 0:
print('==========================================')
print('Num process (COMM_WORLD): {}'.format(MPI.COMM_WORLD.Get_size()))
print('Using GPUs')
print('Using {} communicator'.format(args.communicator))
print('Num unit: {}'.format(args.unit))
print('Num Minibatch-size: {}'.format(args.batchsize))
print('Num epoch: {}'.format(args.epoch))
print('==========================================')
model = L.Classifier(MLP(args.unit, 10))
if device >= 0:
chainer.cuda.get_device(device).use()
model.to_gpu()
initial_lr = 0.1
# Create a multi node optimizer from a standard Chainer optimizer.
optimizer = chainermn.create_multi_node_optimizer(
chainer.optimizers.MomentumSGD(lr=initial_lr, momentum=0.9), comm, double_buffering=args.double_buffering)
optimizer.setup(model)
if args.scatter_no_comm:
train, test = chainer.datasets.get_mnist()
train = scatter_dataset_no_comm(train, comm, shuffle=True, seed=args.shuffle_seed)
test = scatter_dataset_no_comm(test, comm, shuffle=True, seed=args.shuffle_seed)
else:
# Split and distribute the dataset. Only worker 0 loads the whole dataset.
# Datasets of worker 0 are evenly split and distributed to all workers.
if comm.rank == 0:
train, test = chainer.datasets.get_mnist()
else:
train = None
test = None
train = chainermn.scatter_dataset(train, comm, shuffle=True)
test = chainermn.scatter_dataset(test, comm)
if args.loaderjob == 0:
train_iter = chainer.iterators.SerialIterator(train, args.batchsize, shuffle=False)
test_iter = chainer.iterators.SerialIterator(test, args.batchsize,
repeat=False, shuffle=False)
else:
if args.method != '':
multiprocessing.set_start_method(args.method)
train_iter = chainer.iterators.MultiprocessIterator(train, args.batchsize, shuffle=False, n_processes=args.loaderjob)
if args.test_mp:
test_iter = chainer.iterators.MultiprocessIterator(test, args.batchsize, repeat=False, shuffle=False,
n_processes=args.loaderjob)
else:
test_iter = chainer.iterators.SerialIterator(test, args.batchsize,
repeat=False, shuffle=False)
updater = training.StandardUpdater(train_iter, optimizer, device=device)
trainer = training.Trainer(updater, (args.epoch, 'epoch'), out=args.out)
# Create a multi node evaluator from a standard Chainer evaluator.
evaluator = extensions.Evaluator(test_iter, model, device=device)
evaluator = chainermn.create_multi_node_evaluator(evaluator, comm)
trainer.extend(evaluator)
# Some display and output extensions are necessary only for one worker.
# (Otherwise, there would just be repeated outputs.)
if comm.rank == 0:
# trainer.extend(extensions.dump_graph('main/loss'))
trainer.extend(extensions.LogReport())
trainer.extend(extensions.observe_lr(), trigger=(1, 'epoch'))
trainer.extend(extensions.PrintReport(
['epoch', 'main/loss', 'validation/main/loss',
'main/accuracy', 'validation/main/accuracy', 'elapsed_time', 'lr']))
trainer.extend(extensions.ProgressBar())
trainer.run()
def scatter_dataset_no_comm(dataset, comm, shuffle=False, seed=0):
"""Scatter the given dataset to the workers in the communicator.
This function does not use MPI communication.
The dataset of every worker has to be the same (assuming file sharing system like nfs),
and the seed as to be the same across all processes
The dataset is split to sub datasets of almost equal sizes and scattered
to workers. To create a sub dataset, ``chainer.datasets.SubDataset`` is
used.
Args:
dataset: A dataset (e.g., ``list``, ``numpy.ndarray``,
``chainer.datasets.TupleDataset``, ...).
comm: ChainerMN communicator or MPI4py communicator.
shuffle (bool): If ``True``, the order of examples is shuffled
before being scattered.
seed (int): Seed the generator used for the permutation of indexes.
If an integer being convertible to 32 bit unsigned integers is
specified, it is guaranteed that each sample
in the given dataset always belongs to a specific subset.
If ``None``, the permutation is changed randomly.
Returns:
Scattered dataset.
"""
if hasattr(comm, 'mpi_comm'):
comm = comm.mpi_comm
assert hasattr(comm, 'send')
assert hasattr(comm, 'recv')
order = None
n_total_samples = len(dataset)
if shuffle is not None:
order = numpy.random.RandomState(seed).permutation(
n_total_samples)
n_sub_samples = (n_total_samples + comm.size - 1) // comm.size
b = n_total_samples * comm.rank // comm.size
e = b + n_sub_samples
return chainer.datasets.SubDataset(dataset, b, e, order)
if __name__ == '__main__':
main() error A Click for details
error B Click for details
|
Thanks for the detailed report! Now I've got a docker environment internally and try to reproduce your problem. |
This issue is not inherent to chainermn, so I was confused where to submit it.
In the training example of ImageNet, I cannot run the test without removing the
multiprocessing.set_start_method('forkserver')
part beforeMultiprocessIterator
creation. (addition suggered in this pull)When I tried this kind of error occurred:
After further investigation, the problem seems to be that any other start method than
fork
in OpenMPI environment leads to this error. I also tried not to useset_start_method()
but theget_context()
as suggested in python docs, but this would lead to the same result.I provide the following sample code to reproduce the error:
The environment is as follows:
The text was updated successfully, but these errors were encountered: