Skip to content

Commit

Permalink
bpo-46205: exit if no workers are alive in runtest_mp
Browse files Browse the repository at this point in the history
There is a race condition in runtest_mp where if a worker already
pushed its final output to the queue, but is still alive, then the
the main thread waits forever on the the now-empty queue.

The workers now indicate they have finished by putting a sentintel
"WorkerExited" object in the output queue.
  • Loading branch information
colesbury committed Jan 7, 2022
1 parent 74d1663 commit 406e5d7
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 21 deletions.
47 changes: 26 additions & 21 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class MultiprocessResult(NamedTuple):

ExcStr = str
QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr]
WorkerExited = object()


class ExitThread(Exception):
Expand Down Expand Up @@ -287,23 +288,26 @@ def _runtest(self, test_name: str) -> MultiprocessResult:
return MultiprocessResult(result, stdout, err_msg)

def run(self) -> None:
while not self._stopped:
try:
try:
while not self._stopped:
try:
test_name = next(self.pending)
except StopIteration:
break
try:
test_name = next(self.pending)
except StopIteration:
break

mp_result = self._runtest(test_name)
self.output.put((False, mp_result))
mp_result = self._runtest(test_name)
self.output.put((False, mp_result))

if must_stop(mp_result.result, self.ns):
if must_stop(mp_result.result, self.ns):
break
except ExitThread:
break
except ExitThread:
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
except BaseException:
self.output.put((True, traceback.format_exc()))
break
finally:
self.output.put(WorkerExited)

def _wait_completed(self) -> None:
popen = self._popen
Expand Down Expand Up @@ -360,6 +364,7 @@ def __init__(self, regrtest: Regrtest) -> None:
self.regrtest = regrtest
self.log = self.regrtest.log
self.ns = regrtest.ns
self.num_alive_workers = 0
self.output: queue.Queue[QueueOutput] = queue.Queue()
self.pending = MultiprocessIterator(self.regrtest.tests)
if self.ns.timeout is not None:
Expand All @@ -383,6 +388,7 @@ def start_workers(self) -> None:
self.log(msg)
for worker in self.workers:
worker.start()
self.num_alive_workers = len(self.workers)

def stop_workers(self) -> None:
start_time = time.monotonic()
Expand All @@ -392,13 +398,6 @@ def stop_workers(self) -> None:
worker.wait_stopped(start_time)

def _get_result(self) -> QueueOutput | None:
if not any(worker.is_alive() for worker in self.workers):
# all worker threads are done: consume pending results
try:
return self.output.get(timeout=0)
except queue.Empty:
return None

use_faulthandler = (self.ns.timeout is not None)
timeout = PROGRESS_UPDATE
while True:
Expand All @@ -408,7 +407,13 @@ def _get_result(self) -> QueueOutput | None:

# wait for a thread
try:
return self.output.get(timeout=timeout)
result = self.output.get(timeout=timeout)
if result is WorkerExited:
self.num_alive_workers -= 1
if self.num_alive_workers == 0:
return None
continue
return result
except queue.Empty:
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix hang in runtest_mp due to race condition

0 comments on commit 406e5d7

Please sign in to comment.