Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Distributed Evaluation #2775

Merged
merged 6 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions parlai/scripts/distributed_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""
Distributed evaluation script. NOT MEANT TO BE CALLED DIRECTLY BY USER.

This script is meant to be in conjunction with
`SLURM <https://slurm.schedmd.com/>`, which provides environmental variables
describing the environment.

An example sbatch script is below, for a 2-host, 8-GPU setup (16 total gpus):

.. code-block:: bash\n\n

#!/bin/sh
#SBATCH --job-name=distributed_example
#SBATCH --output=/path/to/savepoint/stdout.%j
#SBATCH --error=/path/to/savepoint/stderr.%j
#SBATCH --partition=priority
#SBATCH --nodes=2
#SBATCH --time=0:10:00
#SBATCH --signal=SIGINT
#SBATCH --gres=gpu:8
#SBATCH --ntasks-per-node=8
#SBATCH --mem=64G
#SBATCH --cpus-per-task=10
srun python -u -m parlai.scripts.distributed_eval \
-m seq2seq -t convai2 --dict-file /path/to/dict-file
"""

import os

import parlai.scripts.eval_model as eval_model
import parlai.utils.distributed as distributed_utils


def main():
parser = eval_model.setup_args()
parser.add_distributed_training_args()
parser.add_argument('--port', type=int, default=61337, help='TCP port number')
opt = parser.parse_args(print_args=(os.environ['SLURM_PROCID'] == '0'))

with distributed_utils.slurm_distributed_context(opt) as opt:
return eval_model.eval_model(opt)


if __name__ == '__main__':
main()
49 changes: 3 additions & 46 deletions parlai/scripts/distributed_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,9 @@
-m seq2seq -t convai2 --dict-file /path/to/dict-file
"""

import os
import socket
import subprocess

import parlai.scripts.train_model as single_train
import parlai.utils.logging as logging
from parlai.scripts.multiprocessing_train import multiprocess_train
from parlai.scripts.script import ParlaiScript
import parlai.utils.distributed as distributed_utils


def setup_args():
Expand All @@ -48,52 +43,14 @@ def setup_args():
return parser


def dist_train(opt, node_list):
# We can determine the init method automatically for Slurm.
try:
# Figure out the main host, and which rank we are.
hostnames = subprocess.check_output(
['scontrol', 'show', 'hostnames', node_list]
)
main_host = hostnames.split()[0].decode('utf-8')
distributed_rank = int(os.environ['SLURM_PROCID'])
if opt.get('model_parallel'):
# -1 signals to multiprocessing_train to use all GPUs available.
# (A value of None signals to multiprocessing_train to use the GPU
# corresponding to the rank.
device_id = -1
else:
device_id = int(os.environ['SLURM_LOCALID'])
port = opt['port']
logging.info(
f'Initializing host {socket.gethostname()} as rank {distributed_rank}, '
f'main is {main_host}'
)
# Begin distributed training
multiprocess_train(distributed_rank, opt, port, 0, device_id, main_host)
except subprocess.CalledProcessError as e:
# scontrol failed
raise e
except FileNotFoundError:
# Slurm is not installed
raise RuntimeError('SLURM does not appear to be installed.')


class DistributedTrain(ParlaiScript):
@classmethod
def setup_args(cls):
return setup_args()

def run(self):
# double check we're using SLURM
node_list = os.environ.get('SLURM_JOB_NODELIST')
if node_list is None:
raise RuntimeError(
'Does not appear to be in a SLURM environment. '
'You should not call this script directly; '
'see launch_distributed.py'
)
return dist_train(self.opt, node_list)
with distributed_utils.slurm_distributed_context(self.opt) as opt:
return single_train.TrainLoop(opt).train_model()


if __name__ == '__main__':
Expand Down
32 changes: 27 additions & 5 deletions parlai/scripts/eval_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
from parlai.core.params import ParlaiParser, print_announcements
from parlai.core.agents import create_agent
from parlai.core.logs import TensorboardLogger
from parlai.core.metrics import aggregate_named_reports, Metric
from parlai.core.metrics import (
aggregate_named_reports,
aggregate_unnamed_reports,
Metric,
)
from parlai.core.worlds import create_task
from parlai.utils.misc import TimeLogger, nice_report
from parlai.utils.world_logging import WorldLogger
Expand All @@ -30,6 +34,13 @@
import json
import random

from parlai.utils.distributed import (
is_primary_worker,
all_gather_list,
is_distributed,
get_rank,
)


def setup_args(parser=None):
if parser is None:
Expand Down Expand Up @@ -85,6 +96,8 @@ def setup_args(parser=None):


def _save_eval_stats(opt, report):
if not is_primary_worker:
return
report_fname = opt['report_filename']
if report_fname == '':
return
Expand Down Expand Up @@ -122,6 +135,10 @@ def _eval_single_world(opt, agent, task):
# max number of examples to evaluate
max_cnt = opt['num_examples'] if opt['num_examples'] > 0 else float('inf')
cnt = 0
total_cnt = world.num_examples()

if is_distributed():
logging.warn('Progress bar is approximate in distributed mode.')

while not world.epoch_done() and cnt < max_cnt:
cnt += opt.get('batchsize', 1)
Expand All @@ -134,18 +151,22 @@ def _eval_single_world(opt, agent, task):
if log_time.time() > log_every_n_secs:
report = world.report()
text, report = log_time.log(
report.get('exs', 0), min(max_cnt, world.num_examples()), report
report.get('exs', 0), min(max_cnt, total_cnt), report
)
logging.info(text)

report = world.report()
report = aggregate_unnamed_reports(all_gather_list(world.report()))
world.reset()

if world_logger is not None:
# dump world acts to file
world_logger.reset() # add final acts to logs
base_outfile = opt['report_filename'].split('.')[0]
outfile = base_outfile + f'_{task}_replies.jsonl'
if is_distributed():
rank = get_rank()
outfile = base_outfile + f'_{task}_{rank}_replies.jsonl'
else:
outfile = base_outfile + f'_{task}_replies.jsonl'
world_logger.write(outfile, world, file_format=opt['save_format'])

return report
Expand Down Expand Up @@ -195,6 +216,7 @@ def eval_model(opt, print_parser=None):
logging.info(
f'Finished evaluating tasks {tasks} using datatype {opt.get("datatype")}'
)

print(nice_report(report))
_save_eval_stats(opt, report)
return report
Expand All @@ -206,7 +228,7 @@ def setup_args(cls):
return setup_args()

def run(self):
return eval_model(self.opt)
return eval_model(self.opt, print_parser=self.parser)


if __name__ == '__main__':
Expand Down
76 changes: 76 additions & 0 deletions parlai/scripts/multiprocessing_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3

# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


"""
Main launch script for single-host, multi-GPU training.

This is a drop-in replacement for train_model.py. This script will launch N
subprocess, each which runs the full training loop independently.

Uses torch.nn.parallel.DistributedDataParallel for its main uses. Agents must
specifically implement the wrapper of DistributedDatParallel, but all
stephenroller marked this conversation as resolved.
Show resolved Hide resolved
TorchRankerAgents and TorchGeneratorAgents support this.
"""

import torch
import random
import os
import signal
import parlai.utils.distributed as distributed_utils
import parlai.scripts.eval_model as eval_model


def multiprocess_eval(
rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost'
):
with distributed_utils.distributed_context(
stephenroller marked this conversation as resolved.
Show resolved Hide resolved
rank, opt, port, rank_offset, gpu, hostname
) as opt:
return eval_model.eval_model(opt)


def launch_and_eval(opt, port):
"""
Perform a fork() to many processes.
"""
# Launch multiple subprocesses
spawncontext = torch.multiprocessing.spawn(
multiprocess_eval,
# need to give rank offset as 1 to cover the fact that the main
# process is rank 0, but that spawn() doesn't let you control rank
(opt, port, 1),
nprocs=opt['distributed_world_size'] - 1, # main proc will also run loop
join=False,
)

try:
retval = multiprocess_eval(0, opt, port)
spawncontext.join()
return retval
except KeyboardInterrupt:
# tell the subprocesses to stop too
for p in spawncontext.processes:
if p.is_alive():
os.kill(p.pid, signal.SIGINT)
raise


def setup_args():
parser = eval_model.setup_args()
parser.add_distributed_training_args()
parser.set_defaults(distributed_world_size=torch.cuda.device_count())
return parser


def main():
opt = setup_args().parse_args()
port = random.randint(32000, 48000)
return launch_and_eval(opt, port)


if __name__ == '__main__':
main()
61 changes: 3 additions & 58 deletions parlai/scripts/multiprocessing_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,19 @@

import torch
import random
import copy
import os
import signal
import torch.distributed as dist
import parlai.scripts.train_model as single_train
import parlai.utils.distributed as distributed_utils
import parlai.utils.logging as logging
from parlai.scripts.script import ParlaiScript


def multiprocess_train(
rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost'
):
"""
Subprocess which initializes distributed training, and begins training.

This should be launched n times for n GPUs; this is handled either in main
or via srun.

:param int rank: This process's rank - 1. (Starts at -1 ... n - 2). See comments.
:param opt: command line options
:param int port: A TCP port to use. This will need to be changed to run
multiple distributed training setups on the same machine.
:param int gpu: Which GPU to use. Defaults to using rank and local devices,
but must be manually specified when using many-hosts.
:param str hostname: Hostname of the main server.
"""
# Set per-host options
opt = copy.deepcopy(opt)
# we need to manually adjust the rank differently in multiprocessing
# and distributed train
rank = rank + rank_offset
opt['rank'] = rank
if gpu is None:
# default assumption is local GPUs
gpu = rank % torch.cuda.device_count()
opt['gpu'] = gpu
# make sure we don't just use whatever GPU was saved in the model file
if 'override' not in opt:
opt['override'] = {}
opt['override']['gpu'] = gpu

# Suppress output of workers except the main host.
if opt.get('verbose') or rank != 0:
print_prefix = 'rank:{:3d} |'.format(rank)
else:
print_prefix = None
suppress_output = not opt.get('verbose') and rank != 0

with distributed_utils.override_print(suppress_output, print_prefix):
# perform distributed setup, ensuring all hosts are ready
if opt['gpu'] != -1:
torch.cuda.set_device(opt['gpu'])
dist.init_process_group(
backend="nccl",
init_method="tcp://{}:{}".format(hostname, port),
world_size=opt['distributed_world_size'],
rank=rank,
)
logging.info("Distributed group initialized")

# manual_seed can be a noop without this
torch.cuda.init()
# make sure all parameters will be in sync
torch.manual_seed(42)
# force a sync so that no one gets ahead, and all are seeded together
distributed_utils.sync_object(None)

with distributed_utils.distributed_context(
rank, opt, port, rank_offset, gpu, hostname
) as opt:
# Run the actual training
return single_train.TrainLoop(opt).train()

Expand Down
4 changes: 4 additions & 0 deletions parlai/tasks/self_feeding/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,7 @@ def __init__(self, opt, shared=None):
@staticmethod
def add_cmdline_args(argparser):
SelfFeedingTeacher.add_cmdline_args(argparser)


class DefaultTeacher(AllTeacher):
pass
Loading