From 88515b8a2dfa6af4b1d86bc513c53187c76586b9 Mon Sep 17 00:00:00 2001 From: Stephen Roller Date: Fri, 18 Sep 2020 15:06:34 -0700 Subject: [PATCH 1/5] [dist] Allow non-tcp based distributed setup. --- parlai/core/metrics.py | 4 ++++ parlai/scripts/multiprocessing_train.py | 3 ++- parlai/scripts/train_model.py | 14 +++++++------- parlai/utils/distributed.py | 6 +++--- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/parlai/core/metrics.py b/parlai/core/metrics.py index 56354470071..ec9f9fb11aa 100644 --- a/parlai/core/metrics.py +++ b/parlai/core/metrics.py @@ -592,6 +592,10 @@ def aggregate_unnamed_reports(reports: List[Dict[str, Metric]]) -> Dict[str, Met return m +def dict_report(report: Dict[str, Metric]): + return {k: v.value() if isinstance(v, Metric) else v for k, v in report.items()} + + class Metrics(object): """ Metrics aggregator. diff --git a/parlai/scripts/multiprocessing_train.py b/parlai/scripts/multiprocessing_train.py index b27fcbf4d76..85adefd6ff6 100644 --- a/parlai/scripts/multiprocessing_train.py +++ b/parlai/scripts/multiprocessing_train.py @@ -35,8 +35,9 @@ def multiprocess_train( rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost' ): + init_method = f"tcp://{hostname}:{port}" with distributed_utils.distributed_context( - rank, opt, port, rank_offset, gpu, hostname + rank, opt, rank_offset, gpu, init_method=init_method ) as opt: # Run the actual training opt['multiprocessing'] = True diff --git a/parlai/scripts/train_model.py b/parlai/scripts/train_model.py index f239f2b2bb7..f4bdc75de5c 100644 --- a/parlai/scripts/train_model.py +++ b/parlai/scripts/train_model.py @@ -28,13 +28,16 @@ import json import numpy as np import signal -from typing import Dict from parlai.core.metrics import Metric from parlai.core.agents import create_agent, create_agent_from_shared from parlai.core.exceptions import StopTrainException from parlai.core.logs import TensorboardLogger -from parlai.core.metrics import aggregate_named_reports, aggregate_unnamed_reports +from parlai.core.metrics import ( + aggregate_named_reports, + aggregate_unnamed_reports, + dict_report, +) from parlai.core.params import ParlaiParser, print_announcements from parlai.core.worlds import create_task from parlai.scripts.build_dict import build_dict, setup_args as setup_dict_args @@ -384,9 +387,6 @@ def save_model(self, suffix=None): except KeyboardInterrupt: pass - def _safe_report(self, report: Dict[str, Metric]): - return {k: v.value() if isinstance(v, Metric) else v for k, v in report.items()} - def _save_train_stats(self, suffix=None): fn = self.opt['model_file'] if suffix: @@ -423,7 +423,7 @@ def validate(self): valid_report = self._run_eval( self.valid_worlds, opt, 'valid', opt['validation_max_exs'] ) - v = self._safe_report(valid_report.copy()) + v = dict_report(valid_report) v['train_time'] = self.train_time.time() v['parleys'] = self.parleys v['total_exs'] = self._total_exs @@ -613,7 +613,7 @@ def log(self): train_report = self._sync_metrics(train_report) self.world.reset_metrics() - train_report_trainstats = self._safe_report(train_report) + train_report_trainstats = dict_report(train_report) train_report_trainstats['total_epochs'] = self._total_epochs train_report_trainstats['total_exs'] = self._total_exs train_report_trainstats['parleys'] = self.parleys diff --git a/parlai/utils/distributed.py b/parlai/utils/distributed.py index 81e372d6139..24204933b09 100644 --- a/parlai/utils/distributed.py +++ b/parlai/utils/distributed.py @@ -269,7 +269,7 @@ def sync_parameters(model: torch.nn.Module) -> bool: @contextlib.contextmanager def distributed_context( - rank, opt, port=61337, rank_offset=0, gpu=None, hostname='localhost' + rank, opt, rank_offset=0, gpu=None, init_method="tcp://localhost:61337" ): """ A context which wraps initialization of a distributed/multiprocessing run. @@ -322,7 +322,7 @@ def distributed_context( torch.cuda.set_device(opt['gpu']) dist.init_process_group( backend="nccl", - init_method="tcp://{}:{}".format(hostname, port), + init_method=init_method, world_size=opt['distributed_world_size'], rank=rank, ) @@ -379,7 +379,7 @@ def slurm_distributed_context(opt): ) # Begin distributed training with distributed_context( - distributed_rank, opt, port, 0, device_id, main_host + distributed_rank, opt, 0, device_id, init_method="tcp://{main_host}:{port}" ) as opt: yield opt except subprocess.CalledProcessError as e: From 87f58638b00c71eb57bd2d1ecac86cf3ed48eb98 Mon Sep 17 00:00:00 2001 From: Stephen Roller Date: Fri, 18 Sep 2020 15:13:14 -0700 Subject: [PATCH 2/5] Also catch mp_eval. --- parlai/scripts/multiprocessing_eval.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parlai/scripts/multiprocessing_eval.py b/parlai/scripts/multiprocessing_eval.py index a32c1e75810..c31627bcd23 100644 --- a/parlai/scripts/multiprocessing_eval.py +++ b/parlai/scripts/multiprocessing_eval.py @@ -39,8 +39,9 @@ def multiprocess_eval( Invoked by launch_and_eval, not instantiated directly. """ + init_method = f'tcp://{hostname}:{port}' with distributed_utils.distributed_context( - rank, opt, port, rank_offset, gpu, hostname + rank, opt, rank_offset, gpu, init_method=init_method ) as opt: opt['multiprocessing'] = True return eval_model.eval_model(opt) From c1ce07c3bb24f73d2b08c8d1d99c6edd34c4a146 Mon Sep 17 00:00:00 2001 From: Stephen Roller Date: Fri, 18 Sep 2020 15:15:22 -0700 Subject: [PATCH 3/5] Lint. --- parlai/utils/distributed.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/parlai/utils/distributed.py b/parlai/utils/distributed.py index 24204933b09..f040faf38a5 100644 --- a/parlai/utils/distributed.py +++ b/parlai/utils/distributed.py @@ -285,14 +285,12 @@ def distributed_context( non-primary workers. :param opt: command line options - :param int port: - A TCP port to use. This will need to be changed to run multiple distributed training setups on the same machine. :param int gpu: Which GPU to use. Defaults to using rank and local devices, but must be manually specified when using many-hosts. - :param str hostname: - Hostname of the main server. + :param str init method: + Init method, such as tcp://localhost:61337. See torch.distributed docs. """ # Set per-host options opt = copy.deepcopy(opt) @@ -379,7 +377,7 @@ def slurm_distributed_context(opt): ) # Begin distributed training with distributed_context( - distributed_rank, opt, 0, device_id, init_method="tcp://{main_host}:{port}" + distributed_rank, opt, 0, device_id, init_method=f"tcp://{main_host}:{port}" ) as opt: yield opt except subprocess.CalledProcessError as e: From e5803aa83dc5c8ecd55a6c4c88a2eadc13e67b28 Mon Sep 17 00:00:00 2001 From: Stephen Roller Date: Sun, 20 Sep 2020 08:25:39 -0700 Subject: [PATCH 4/5] Docstring correction. --- parlai/utils/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parlai/utils/distributed.py b/parlai/utils/distributed.py index f040faf38a5..97004110046 100644 --- a/parlai/utils/distributed.py +++ b/parlai/utils/distributed.py @@ -290,7 +290,7 @@ def distributed_context( Which GPU to use. Defaults to using rank and local devices, but must be manually specified when using many-hosts. :param str init method: - Init method, such as tcp://localhost:61337. See torch.distributed docs. + Init method, such as ``tcp://localhost:61337``. See torch.distributed docs. """ # Set per-host options opt = copy.deepcopy(opt) From 55d8b8463356effd0c38332822a41704aa4c5b0d Mon Sep 17 00:00:00 2001 From: Stephen Roller Date: Tue, 22 Sep 2020 09:36:03 -0700 Subject: [PATCH 5/5] Importing chunks from other patch. --- parlai/utils/logging.py | 12 ++++++++++-- parlai/utils/misc.py | 23 ++++++++++++----------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/parlai/utils/logging.py b/parlai/utils/logging.py index 53e2af698ed..9ec511b154d 100644 --- a/parlai/utils/logging.py +++ b/parlai/utils/logging.py @@ -4,6 +4,7 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. +import os import sys import logging @@ -48,6 +49,8 @@ def _is_interactive(): + if os.environ.get('PARLAI_FORCE_COLOR'): + return True try: __IPYTHON__ return True @@ -72,19 +75,20 @@ def __init__(self, name, console_level=INFO): self.streamHandler = logging.StreamHandler(sys.stdout) # Log to stdout levels: console_level and above self.prefix = None + self.interactive = _is_interactive() self.streamHandler.setFormatter(self._build_formatter()) super().addHandler(self.streamHandler) def _build_formatter(self): prefix_format = f'{self.prefix} ' if self.prefix else '' - if COLORED_LOGS and _is_interactive(): + if COLORED_LOGS and self.interactive: return coloredlogs.ColoredFormatter( prefix_format + COLORED_FORMAT, datefmt=CONSOLE_DATE_FORMAT, level_styles=COLORED_LEVEL_STYLES, field_styles={}, ) - elif _is_interactive(): + elif self.interactive: return logging.Formatter( prefix_format + CONSOLE_FORMAT, datefmt=CONSOLE_DATE_FORMAT ) @@ -93,6 +97,10 @@ def _build_formatter(self): prefix_format + LOGFILE_FORMAT, datefmt=LOGFILE_DATE_FORMAT ) + def force_interactive(self): + self.interactive = True + self.streamHandler.setFormatter(self._build_formatter()) + def log(self, msg, level=INFO): """ Default Logging function. diff --git a/parlai/utils/misc.py b/parlai/utils/misc.py index f0b33b05f6c..80b40f6f29d 100644 --- a/parlai/utils/misc.py +++ b/parlai/utils/misc.py @@ -16,6 +16,7 @@ import re import shutil import json +import os from parlai.core.message import Message from parlai.utils.strings import colorize @@ -357,6 +358,11 @@ def float_formatter(f: Union[float, int]) -> str: def _line_width(): + if os.environ.get('PARLAI_FORCE_WIDTH'): + try: + return int(os.environ['PARLAI_FORCE_WIDTH']) + except ValueError: + pass try: # if we're in an interactive ipython notebook, hardcode a longer width __IPYTHON__ @@ -410,17 +416,12 @@ def nice_report(report) -> str: df = pd.DataFrame([output]) df.columns = pd.MultiIndex.from_tuples(df.columns) df = df.stack().transpose().droplevel(0, axis=1) - result = ( - " " - + df.to_string( - na_rep="", - line_width=line_width - 3, # -3 for the extra spaces we add - float_format=float_formatter, - index=df.shape[0] > 1, - ) - .replace("\n\n", "\n") - .replace("\n", "\n ") - ) + result = " " + df.to_string( + na_rep="", + line_width=line_width - 3, # -3 for the extra spaces we add + float_format=float_formatter, + index=df.shape[0] > 1, + ).replace("\n\n", "\n").replace("\n", "\n ") result = re.sub(r"\s+$", "", result) return result else: