From 2b26e34b27b2f33a0a8d29a5fb773b1d7665742b Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Wed, 27 Mar 2024 15:58:16 +0000 Subject: [PATCH 1/3] gh-117293: Fix race condition in run_workers.py The worker thread may still be alive after it enqueues it's last result, which can lead to a delay of 30 seconds after the test finishes. This happens much more frequently in the free-threaded build with the GIL disabled. This changes run_workers.py to track of live workers by enqueueing a `WorkerExited()` instance before the worker exits. --- Lib/test/libregrtest/run_workers.py | 38 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index 9cfe1b9d6fd07dc..6efa69b819912b8 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -79,8 +79,13 @@ class MultiprocessResult: err_msg: str | None = None +# Indicates that a worker thread has exited +class WorkerExited: + pass + ExcStr = str QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] +QueueContent = QueueOutput | WorkerExited class ExitThread(Exception): @@ -376,8 +381,8 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult: def run(self) -> None: fail_fast = self.runtests.fail_fast fail_env_changed = self.runtests.fail_env_changed - while not self._stopped: - try: + try: + while not self._stopped: try: test_name = next(self.pending) except StopIteration: @@ -396,11 +401,12 @@ def run(self) -> None: if mp_result.result.must_stop(fail_fast, fail_env_changed): break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break + except ExitThread: + pass + except BaseException: + self.output.put((True, traceback.format_exc())) + finally: + self.output.put(WorkerExited()) def _wait_completed(self) -> None: popen = self._popen @@ -458,8 +464,9 @@ def __init__(self, num_workers: int, runtests: RunTests, self.log = logger.log self.display_progress = logger.display_progress self.results: TestResults = results + self.live_worker_count = 0 - self.output: queue.Queue[QueueOutput] = queue.Queue() + self.output: queue.Queue[QueueContent] = queue.Queue() tests_iter = runtests.iter_tests() self.pending = MultiprocessIterator(tests_iter) self.timeout = runtests.timeout @@ -497,6 +504,7 @@ def start_workers(self) -> None: self.log(msg) for worker in self.workers: worker.start() + self.live_worker_count += 1 def stop_workers(self) -> None: start_time = time.monotonic() @@ -511,14 +519,18 @@ def _get_result(self) -> QueueOutput | None: # bpo-46205: check the status of workers every iteration to avoid # waiting forever on an empty queue. - while any(worker.is_alive() for worker in self.workers): + while self.live_worker_count > 0: if use_faulthandler: faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, exit=True) # wait for a thread try: - return self.output.get(timeout=PROGRESS_UPDATE) + result = self.output.get(timeout=PROGRESS_UPDATE) + if isinstance(result, WorkerExited): + self.live_worker_count -= 1 + continue + return result except queue.Empty: pass @@ -528,12 +540,6 @@ def _get_result(self) -> QueueOutput | None: if running: self.log(running) - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - def display_result(self, mp_result: MultiprocessResult) -> None: result = mp_result.result pgo = self.runtests.pgo From eb5068784de2ce4d298e541e94040dece87bddff Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Wed, 27 Mar 2024 15:21:08 -0400 Subject: [PATCH 2/3] Update Lib/test/libregrtest/run_workers.py Co-authored-by: Alex Waygood --- Lib/test/libregrtest/run_workers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index 6efa69b819912b8..cc56ebbfb1c2e9c 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -79,9 +79,8 @@ class MultiprocessResult: err_msg: str | None = None -# Indicates that a worker thread has exited class WorkerExited: - pass + """Indicates that a worker thread has exited""" ExcStr = str QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] From 561016c834a803e3c5affdb501f56d0a8cd689aa Mon Sep 17 00:00:00 2001 From: Sam Gross Date: Thu, 28 Mar 2024 12:28:44 +0000 Subject: [PATCH 3/3] Rename WorkerExited to WorkerThreadExited --- Lib/test/libregrtest/run_workers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index cc56ebbfb1c2e9c..235047cf2e563ca 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -79,12 +79,12 @@ class MultiprocessResult: err_msg: str | None = None -class WorkerExited: +class WorkerThreadExited: """Indicates that a worker thread has exited""" ExcStr = str QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] -QueueContent = QueueOutput | WorkerExited +QueueContent = QueueOutput | WorkerThreadExited class ExitThread(Exception): @@ -405,7 +405,7 @@ def run(self) -> None: except BaseException: self.output.put((True, traceback.format_exc())) finally: - self.output.put(WorkerExited()) + self.output.put(WorkerThreadExited()) def _wait_completed(self) -> None: popen = self._popen @@ -526,7 +526,7 @@ def _get_result(self) -> QueueOutput | None: # wait for a thread try: result = self.output.get(timeout=PROGRESS_UPDATE) - if isinstance(result, WorkerExited): + if isinstance(result, WorkerThreadExited): self.live_worker_count -= 1 continue return result