diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c09c2f3216c1..18523ad7e0705 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -125,6 +125,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added `ModelSummary` callback ([#9344](https://github.com/PyTorchLightning/pytorch-lightning/pull/9344)) +- Added `PL_RECONCILE_PROCESS` environment variable to enable process reconciliation regardless of cluster environment settings ([#9389](https://github.com/PyTorchLightning/pytorch-lightning/pull/9389)) + + ### Changed - `pytorch_lightning.loggers.neptune.NeptuneLogger` is now consistent with new [neptune-client](https://github.com/neptune-ai/neptune-client) API ([#6867](https://github.com/PyTorchLightning/pytorch-lightning/pull/6867)). @@ -378,6 +381,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed collision of user argument when using ShardedDDP ([#9512](https://github.com/PyTorchLightning/pytorch-lightning/pull/9512)) +- Fixed error reporting in DDP process reconciliation when processes are launched by an external agent ([#9389](https://github.com/PyTorchLightning/pytorch-lightning/pull/9389)) + + ## [1.4.5] - 2021-08-31 - Fixed reduction using `self.log(sync_dict=True, reduce_fx={mean,max})` ([#9142](https://github.com/PyTorchLightning/pytorch-lightning/pull/9142)) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 647ff764f7c89..df0f658bf712a 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -126,6 +126,7 @@ def __init__( self._model_averaging_period = model_averaging_period self._pids: Optional[List[int]] = None self._sync_dir: Optional[str] = None + self._rank_0_has_called_call_children_scripts: bool = False self.set_world_ranks() @property @@ -252,6 +253,8 @@ def _call_children_scripts(self): delay = np.random.uniform(1, 5, 1)[0] sleep(delay) + self._rank_0_has_called_call_children_scripts = True + def setup_distributed(self): reset_seed() @@ -373,7 +376,9 @@ def determine_ddp_device_ids(self): def pre_dispatch(self): # share ddp pids to all processes - self._share_information_to_prevent_deadlock() + self._rank_0_has_called_call_children_scripts = self.broadcast(self._rank_0_has_called_call_children_scripts) + if self._should_run_deadlock_detection(): + self._share_information_to_prevent_deadlock() # move the model to the correct device self.model_to_device() @@ -454,7 +459,16 @@ def register_plugins(cls, plugin_registry: Dict) -> None: find_unused_parameters=False, ) - def _share_information_to_prevent_deadlock(self): + def _should_run_deadlock_detection(self) -> bool: + """Determines whether the plugin will perform process reconciliation in case of errors. + + If the environment variable `PL_RECONCILE_PROCESS` is set, run detection regardless of the cluster environment. + By default this is disabled. Otherwise, if the cluster environment creates the processes, allow the scheduler / + parent process to perform the process termination, external to Lightning. + """ + return os.getenv("PL_RECONCILE_PROCESS", "0") == "1" or self._rank_0_has_called_call_children_scripts + + def _share_information_to_prevent_deadlock(self) -> None: self._share_pids() # there should be a unique sync_dir per nodes. @@ -470,17 +484,20 @@ def _share_information_to_prevent_deadlock(self): self._sync_dir = sync_dirs[self.node_rank] - def _share_pids(self): + def _share_pids(self) -> None: """Make all DDP processes aware of all processes pids.""" self.barrier() pids = self.all_gather(torch.tensor(os.getpid(), device=self.root_device)) pids = pids.cpu().numpy().tolist() self._pids = pids if isinstance(pids, list) else [pids] - def reconciliate_processes(self, trace: str): + def reconciliate_processes(self, trace: str) -> None: if self.world_size < 2: return + if not self._should_run_deadlock_detection(): + return + sync_dir = self._sync_dir if not sync_dir: diff --git a/tests/plugins/environments/torch_elastic_deadlock.py b/tests/plugins/environments/torch_elastic_deadlock.py index ac2348285d9af..a34f2dfcb9847 100644 --- a/tests/plugins/environments/torch_elastic_deadlock.py +++ b/tests/plugins/environments/torch_elastic_deadlock.py @@ -7,7 +7,7 @@ from pytorch_lightning.utilities.exceptions import DeadlockDetectedException from tests.helpers.boring_model import BoringModel -if os.getenv("PL_RUNNING_SPECIAL_TESTS", "0") == "1": +if os.getenv("PL_RUNNING_SPECIAL_TESTS", "0") == "1" and os.getenv("PL_RECONCILE_PROCESS", "0") == "1": class CustomException(Exception): pass diff --git a/tests/special_tests.sh b/tests/special_tests.sh index 78599a54c963b..1346cea295d54 100755 --- a/tests/special_tests.sh +++ b/tests/special_tests.sh @@ -80,7 +80,7 @@ fi # TODO: enable when CI uses torch>=1.9 # test deadlock is properly handled with TorchElastic. -# LOGS=$(PL_RUNNING_SPECIAL_TESTS=1 python -m torch.distributed.run --nproc_per_node=2 --max_restarts 0 -m coverage run --source pytorch_lightning -a tests/plugins/environments/torch_elastic_deadlock.py | grep "SUCCEEDED") +# LOGS=$(PL_RUNNING_SPECIAL_TESTS=1 PL_RECONCILE_PROCESS=1 python -m torch.distributed.run --nproc_per_node=2 --max_restarts 0 -m coverage run --source pytorch_lightning -a tests/plugins/environments/torch_elastic_deadlock.py | grep "SUCCEEDED") # if [ -z "$LOGS" ]; then # exit 1 # fi diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index 308f37bf47917..3549730312276 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -1823,13 +1823,14 @@ def test_exception_when_lightning_module_is_not_set_on_trainer(): trainer.predict() +class CustomException(Exception): + pass + + @RunIf(min_gpus=2, special=True) def test_ddp_terminate_when_deadlock_is_detected(tmpdir): """Test that DDP kills the remaining processes when only one rank is throwing an exception.""" - class CustomException(Exception): - pass - class TestModel(BoringModel): def training_step(self, batch, batch_idx): if batch_idx == 1 and self.trainer.is_global_zero: