Skip to content

Commit

Permalink
Monitor subprocesses to avoid zombies (#18218)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
awaelchli and pre-commit-ci[bot] authored Aug 8, 2023
1 parent e42cb0d commit 7e13eb7
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 45 deletions.
3 changes: 3 additions & 0 deletions src/lightning/fabric/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Allowed accessing rank information in the main process before processes are launched when using the `XLAStrategy` ([#18194](https://github.com/Lightning-AI/lightning/pull/18194))


- Added automatic process cleanup to avoid zombie child processes and stalls when exceptions are raised ([#18218](https://github.com/Lightning-AI/lightning/pull/18218))


### Changed

- Allow using iterable-style datasets with TPUs ([#17331](https://github.com/Lightning-AI/lightning/pull/17331))
Expand Down
35 changes: 0 additions & 35 deletions src/lightning/fabric/strategies/launchers/base.py

This file was deleted.

65 changes: 63 additions & 2 deletions src/lightning/fabric/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import signal
import subprocess
import sys
from typing import Any, Callable, Optional, Sequence, Tuple
import time
from threading import Thread
from typing import Any, Callable, List, Optional, Sequence, Tuple

from lightning_utilities.core.imports import RequirementCache

from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
from lightning.fabric.strategies.launchers.launcher import _Launcher

_logger = logging.getLogger(__name__)
_HYDRA_AVAILABLE = RequirementCache("hydra-core")


Expand Down Expand Up @@ -71,6 +76,7 @@ def __init__(
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes
self.procs: List[subprocess.Popen] = [] # launched child subprocesses, does not include the launcher

@property
def is_interactive_compatible(self) -> bool:
Expand All @@ -87,6 +93,7 @@ def launch(self, function: Callable, *args: Any, **kwargs: Any) -> Any:
"""
if not self.cluster_environment.creates_processes_externally:
self._call_children_scripts()
_launch_process_observer(self.procs)
return function(*args, **kwargs)

def _call_children_scripts(self) -> None:
Expand Down Expand Up @@ -122,9 +129,13 @@ def _call_children_scripts(self) -> None:
command, cwd = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy, cwd=cwd)

proc = subprocess.Popen(command, env=env_copy, cwd=cwd)
self.procs.append(proc)

def _check_can_spawn_children(self) -> None:
if len(self.procs) > 0:
raise RuntimeError("The launcher can only create subprocesses once.")
if self.cluster_environment.local_rank() != 0:
raise RuntimeError(
"Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen."
Expand Down Expand Up @@ -159,3 +170,53 @@ def _hydra_subprocess_cmd(local_rank: int) -> Tuple[Sequence[str], str]:
# Set output_subdir null since we don't want different subprocesses trying to write to config.yaml
command += [f"hydra.run.dir={rundir}", f"hydra.job.name=train_ddp_process_{local_rank}", "hydra.output_subdir=null"]
return command, cwd


def _launch_process_observer(child_processes: List[subprocess.Popen]) -> None:
"""Launches a thread that runs along the main process and monitors the health of all processes."""
monitor_thread = Thread(
target=_ChildProcessObserver(child_processes=child_processes, main_pid=os.getpid()),
daemon=True, # thread stops if the main process exits
)
monitor_thread.start()


class _ChildProcessObserver:
def __init__(self, main_pid: int, child_processes: List[subprocess.Popen], sleep_period: int = 5) -> None:
self._main_pid = main_pid
self._child_processes = child_processes
self._sleep_period = sleep_period
# Note: SIGTERM is not aggressive enough to terminate processes hanging in collectives
self._termination_signal = signal.SIGTERM if sys.platform == "win32" else signal.SIGKILL
self._finished = False

def __call__(self) -> None:
while not self._finished:
time.sleep(self._sleep_period)
self._finished = self._run()

def _run(self) -> bool:
"""Runs once over all child processes to check whether they are still running."""
for proc in self._child_processes:
proc.poll()

return_codes = [proc.returncode for proc in self._child_processes]
if all(return_code == 0 for return_code in return_codes):
return True

for proc in self._child_processes:
if proc.returncode:
_logger.info(
f"Child process with PID {proc.pid} terminated with code {proc.returncode}."
f" Forcefully terminating all other processes to avoid zombies 🧟"
)
self._terminate_all()
return True

return False

def _terminate_all(self) -> None:
"""Terminates the main process and all its children."""
for p in self._child_processes:
p.send_signal(self._termination_signal)
os.kill(self._main_pid, self._termination_signal)
3 changes: 3 additions & 0 deletions src/lightning/pytorch/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added support for true half-precision training via `Trainer(precision="16-true"|"bf16-true")` ([#18193](https://github.com/Lightning-AI/lightning/pull/18193))


- Added automatic process cleanup to avoid zombie child processes and stalls when exceptions are raised ([#18218](https://github.com/Lightning-AI/lightning/pull/18218))


### Changed

- Removed the limitation to call `self.trainer.model.parameters()` in `LightningModule.configure_optimizers()` ([#17309](https://github.com/Lightning-AI/lightning/pull/17309))
Expand Down
11 changes: 9 additions & 2 deletions src/lightning/pytorch/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import lightning.pytorch as pl
from lightning.fabric.plugins import ClusterEnvironment
from lightning.fabric.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd
from lightning.fabric.strategies.launchers.subprocess_script import (
_basic_subprocess_cmd,
_hydra_subprocess_cmd,
_launch_process_observer,
)
from lightning.pytorch.strategies.launchers.launcher import _Launcher
from lightning.pytorch.trainer.connectors.signal_connector import _SIGNUM

Expand Down Expand Up @@ -70,7 +74,7 @@ def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int,
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes
self.procs: List[subprocess.Popen] = [] # launched subprocesses. does not include the launcher
self.procs: List[subprocess.Popen] = [] # launched child subprocesses, does not include the launcher

@property
def is_interactive_compatible(self) -> bool:
Expand All @@ -88,6 +92,7 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"]
"""
if not self.cluster_environment.creates_processes_externally:
self._call_children_scripts()
_launch_process_observer(self.procs)
return function(*args, **kwargs)

def kill(self, signum: _SIGNUM) -> None:
Expand Down Expand Up @@ -134,6 +139,8 @@ def _call_children_scripts(self) -> None:
self.procs.append(new_process)

def _check_can_spawn_children(self) -> None:
if len(self.procs) > 0:
raise RuntimeError("The launcher can only create subprocesses once.")
if self.cluster_environment.local_rank() != 0:
raise RuntimeError(
"Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen."
Expand Down
59 changes: 54 additions & 5 deletions tests/tests_fabric/strategies/launchers/test_subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from unittest import mock
from unittest.mock import ANY, Mock

import pytest

import lightning.fabric
from lightning.fabric.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE, _SubprocessScriptLauncher
from lightning.fabric.strategies.launchers.subprocess_script import (
_ChildProcessObserver,
_HYDRA_AVAILABLE,
_SubprocessScriptLauncher,
)


def test_subprocess_script_launcher_interactive_compatible():
Expand All @@ -27,17 +33,24 @@ def test_subprocess_script_launcher_interactive_compatible():


@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_error_launching_on_non_zero_rank(popen_mock):
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.Thread")
def test_subprocess_script_launcher_can_launch(*_):
cluster_env = Mock()
cluster_env.creates_processes_externally = False
cluster_env.local_rank.return_value = 1
launcher = _SubprocessScriptLauncher(cluster_env, num_processes=2, num_nodes=1)

with pytest.raises(RuntimeError, match="attempted to launch new distributed processes with `local_rank > 0`"):
launcher.launch(Mock())

launcher.procs = [Mock()] # there are already processes running
with pytest.raises(RuntimeError, match="The launcher can only create subprocesses once"):
launcher.launch(Mock())


@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_external_processes(popen_mock):
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.Thread")
def test_subprocess_script_launcher_external_processes(_, popen_mock):
cluster_env = Mock()
cluster_env.creates_processes_externally = True
function = Mock()
Expand All @@ -48,7 +61,8 @@ def test_subprocess_script_launcher_external_processes(popen_mock):


@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_launch_processes(popen_mock):
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.Thread")
def test_subprocess_script_launcher_launch_processes(_, popen_mock):
cluster_env = Mock()
cluster_env.creates_processes_externally = False
cluster_env.local_rank.return_value = 0
Expand Down Expand Up @@ -80,7 +94,8 @@ def test_subprocess_script_launcher_launch_processes(popen_mock):

@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="hydra-core is required")
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_hydra_in_use(popen_mock, monkeypatch):
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.Thread")
def test_subprocess_script_launcher_hydra_in_use(_, popen_mock, monkeypatch):
basic_command = Mock(return_value="basic_command")
hydra_command = Mock(return_value=("hydra_command", "hydra_cwd"))
monkeypatch.setattr(lightning.fabric.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command)
Expand Down Expand Up @@ -121,3 +136,37 @@ def simulate_launch():
simulate_launch()
popen_mock.assert_called_with("hydra_command", env=ANY, cwd="hydra_cwd")
popen_mock.reset_mock()


@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.os.kill")
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.time.sleep")
def test_child_process_observer(sleep_mock, os_kill_mock):
# Case 1: All processes are running and did not exit yet
processes = [Mock(returncode=None), Mock(returncode=None)]
observer = _ChildProcessObserver(main_pid=1234, child_processes=processes)
finished = observer._run() # call _run() directly to simulate while loop
assert not finished

# Case 2: All processes have finished with exit code 0 (success)
processes = [Mock(returncode=0), Mock(returncode=0)]
observer = _ChildProcessObserver(main_pid=1234, child_processes=processes)
finished = observer._run() # call _run() directly to simulate while loop
assert finished

# Case 3: One process has finished with exit code 1 (failure)
processes = [Mock(returncode=0), Mock(returncode=1)]
observer = _ChildProcessObserver(main_pid=1234, child_processes=processes)
finished = observer._run() # call _run() directly to simulate while loop
assert finished
expected_signal = signal.SIGTERM if sys.platform == "win32" else signal.SIGKILL
processes[0].send_signal.assert_called_once_with(expected_signal)
processes[1].send_signal.assert_called_once_with(expected_signal)
os_kill_mock.assert_called_once_with(1234, expected_signal)

# The main routine stops
observer = _ChildProcessObserver(main_pid=1234, child_processes=[Mock(), Mock()])
observer._run = Mock()
assert not observer._finished
observer()
assert observer._finished
sleep_mock.assert_called_once_with(5)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import subprocess
import sys
from unittest import mock
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -77,7 +78,8 @@ def test_ddp_with_hydra_runjob(subdir, tmp_path, monkeypatch):
assert len(logs) == devices


def test_kill():
@mock.patch("lightning.fabric.strategies.launchers.subprocess_script.Thread")
def test_kill(_):
launcher = _SubprocessScriptLauncher(Mock(), 1, 1)
proc0 = Mock(autospec=subprocess.Popen)
proc1 = Mock(autospec=subprocess.Popen)
Expand Down

0 comments on commit 7e13eb7

Please sign in to comment.