Skip to content

Commit

Permalink
Avoid hanging forever after a parallel job was killed (#7834) (#7930)
Browse files Browse the repository at this point in the history
* Replace multiprocessing.pool with concurrent.futures.ProcessPoolExecutor to avoid deadlocks.

In a multiprocessing.pool, if a process terminates in a non-clean fashion
(for example, due to OOM or a segmentation fault), the pool will silently
replace said process, but the work that the process was supposed to do
will never be done, causing pylint to hang indefinitely.
The concurrent.futures.ProcessPoolExecutor will raise a
BrokenProcessPool exception in that case, avoiding the hang.

Co-authored-by: Daniël van Noord <13665637+DanielNoord@users.noreply.github.com>
(cherry picked from commit 5eca8ec)

Co-authored-by: Daniel <daniel.werner@scalableminds.com>
  • Loading branch information
github-actions[bot] and daniel-wer authored Dec 13, 2022
1 parent 4655b92 commit 391323e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
4 changes: 4 additions & 0 deletions doc/whatsnew/fragments/3899.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Pylint will no longer deadlock if a parallel job is killed but fail
immediately instead.

Closes #3899
21 changes: 11 additions & 10 deletions pylint/lint/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
except ImportError:
multiprocessing = None # type: ignore[assignment]

try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None # type: ignore[assignment,misc]

if TYPE_CHECKING:
from pylint.lint import PyLinter

# PyLinter object used by worker processes when checking files using multiprocessing
# PyLinter object used by worker processes when checking files using parallel mode
# should only be used by the worker processes
_worker_linter: PyLinter | None = None


def _worker_initialize(
linter: bytes, arguments: None | str | Sequence[str] = None
) -> None:
"""Function called to initialize a worker for a Process within a multiprocessing
Pool.
"""Function called to initialize a worker for a Process within a concurrent Pool.
:param linter: A linter-class (PyLinter) instance pickled with dill
:param arguments: File or module name(s) to lint and to be added to sys.path
Expand Down Expand Up @@ -137,9 +141,9 @@ def check_parallel(
# is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
with multiprocessing.Pool(
jobs, initializer=initializer, initargs=[dill.dumps(linter)]
) as pool:
with ProcessPoolExecutor(
max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),)
) as executor:
linter.open()
all_stats = []
all_mapreduce_data: defaultdict[
Expand All @@ -158,7 +162,7 @@ def check_parallel(
stats,
msg_status,
mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
) in executor.map(_worker_check_single_file, files):
linter.file_state.base_name = base_name
linter.file_state._is_base_filestate = False
linter.set_current_module(module, file_path)
Expand All @@ -168,8 +172,5 @@ def check_parallel(
all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status

pool.close()
pool.join()

_merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = merge_stats([linter.stats] + all_stats)
9 changes: 7 additions & 2 deletions pylint/lint/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
except ImportError:
multiprocessing = None # type: ignore[assignment]

try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None # type: ignore[assignment,misc]


def _query_cpu() -> int | None:
"""Try to determine number of CPUs allotted in a docker container.
Expand Down Expand Up @@ -185,9 +190,9 @@ def __init__(
)
sys.exit(32)
if linter.config.jobs > 1 or linter.config.jobs == 0:
if multiprocessing is None:
if ProcessPoolExecutor is None:
print(
"Multiprocessing library is missing, fallback to single process",
"concurrent.futures module is missing, fallback to single process",
file=sys.stderr,
)
linter.set_option("jobs", 1)
Expand Down
35 changes: 30 additions & 5 deletions tests/test_check_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from __future__ import annotations

import argparse
import multiprocessing
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
from pickle import PickleError

import dill
Expand Down Expand Up @@ -182,10 +183,10 @@ def test_worker_initialize_pickling(self) -> None:
"""
linter = PyLinter(reporter=Reporter())
linter.attribute = argparse.ArgumentParser() # type: ignore[attr-defined]
with multiprocessing.Pool(
2, initializer=worker_initialize, initargs=[dill.dumps(linter)]
) as pool:
pool.imap_unordered(print, [1, 2])
with ProcessPoolExecutor(
max_workers=2, initializer=worker_initialize, initargs=(dill.dumps(linter),)
) as executor:
executor.map(print, [1, 2])

def test_worker_check_single_file_uninitialised(self) -> None:
pylint.lint.parallel._worker_linter = None
Expand Down Expand Up @@ -571,3 +572,27 @@ def test_map_reduce(self, num_files, num_jobs, num_checkers):
assert str(stats_single_proc.by_msg) == str(
stats_check_parallel.by_msg
), "Single-proc and check_parallel() should return the same thing"

@pytest.mark.timeout(5)
def test_no_deadlock_due_to_initializer_error(self) -> None:
"""Tests that an error in the initializer for the parallel jobs doesn't
lead to a deadlock.
"""
linter = PyLinter(reporter=Reporter())

linter.register_checker(SequentialTestChecker(linter))

# Create a dummy file, the actual contents of which will be ignored by the
# register test checkers, but it will trigger at least a single-job to be run.
single_file_container = _gen_file_datas(count=1)

# The error in the initializer should trigger a BrokenProcessPool exception
with pytest.raises(BrokenProcessPool):
check_parallel(
linter,
jobs=1,
files=iter(single_file_container),
# This will trigger an exception in the initializer for the parallel jobs
# because arguments has to be an Iterable.
arguments=1, # type: ignore[arg-type]
)

0 comments on commit 391323e

Please sign in to comment.