Skip to content

Commit 0553fdf

Browse files
authored
gh-109162: Refactor libregrtest.runtest_mp (#109205)
* Add attributes to Regrtest and RunTests: * fail_env_changed * num_workers * Rename MultiprocessTestRunner to RunWorkers. Add num_workers parameters to RunWorkers constructor. Remove RunWorkers.ns attribute. * Rename TestWorkerProcess to WorkerThread. * get_running() now returns a string like: "running (...): ...". * Regrtest.action_run_tests() now selects the number of worker processes, instead of the command line parser.
1 parent 0c0f254 commit 0553fdf

File tree

4 files changed

+57
-56
lines changed

4 files changed

+57
-56
lines changed

Lib/test/libregrtest/cmdline.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import argparse
2-
import os
2+
import os.path
33
import shlex
44
import sys
55
from test.support import os_helper
@@ -410,10 +410,6 @@ def _parse_args(args, **kwargs):
410410
if ns.timeout is not None:
411411
if ns.timeout <= 0:
412412
ns.timeout = None
413-
if ns.use_mp is not None:
414-
if ns.use_mp <= 0:
415-
# Use all cores + extras for tests that like to sleep
416-
ns.use_mp = 2 + (os.cpu_count() or 1)
417413
if ns.use:
418414
for a in ns.use:
419415
for r in a:

Lib/test/libregrtest/main.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,18 @@ def __init__(self, ns: Namespace):
8383
self.fromfile: str | None = ns.fromfile
8484
self.starting_test: str | None = ns.start
8585

86+
# Run tests
87+
if ns.use_mp is None:
88+
num_workers = 0 # run sequentially
89+
elif ns.use_mp <= 0:
90+
num_workers = -1 # use the number of CPUs
91+
else:
92+
num_workers = ns.use_mp
93+
self.num_workers: int = num_workers
94+
8695
# Options to run tests
8796
self.fail_fast: bool = ns.failfast
97+
self.fail_env_changed: bool = ns.fail_env_changed
8898
self.forever: bool = ns.forever
8999
self.randomize: bool = ns.randomize
90100
self.random_seed: int | None = ns.random_seed
@@ -150,7 +160,6 @@ def get_executed(self):
150160
| set(self.run_no_tests))
151161

152162
def accumulate_result(self, result, rerun=False):
153-
fail_env_changed = self.ns.fail_env_changed
154163
test_name = result.test_name
155164

156165
match result.state:
@@ -167,7 +176,7 @@ def accumulate_result(self, result, rerun=False):
167176
case State.DID_NOT_RUN:
168177
self.run_no_tests.append(test_name)
169178
case _:
170-
if result.is_failed(fail_env_changed):
179+
if result.is_failed(self.fail_env_changed):
171180
self.bad.append(test_name)
172181
self.need_rerun.append(result)
173182
else:
@@ -339,9 +348,8 @@ def get_rerun_match(self, rerun_list) -> FilterDict:
339348

340349
def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
341350
# Configure the runner to re-run tests
342-
ns = self.ns
343-
if ns.use_mp is None:
344-
ns.use_mp = 1
351+
if self.num_workers == 0:
352+
self.num_workers = 1
345353

346354
# Get tests to re-run
347355
tests = [result.test_name for result in need_rerun]
@@ -363,7 +371,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
363371
match_tests_dict=match_tests_dict,
364372
output_on_failure=False)
365373
self.set_tests(runtests)
366-
self._run_tests_mp(runtests)
374+
self._run_tests_mp(runtests, self.num_workers)
367375
return runtests
368376

369377
def rerun_failed_tests(self, need_rerun, runtests: RunTests):
@@ -471,7 +479,6 @@ def run_test(self, test_name: str, runtests: RunTests, tracer):
471479
def run_tests_sequentially(self, runtests):
472480
ns = self.ns
473481
coverage = ns.trace
474-
fail_env_changed = ns.fail_env_changed
475482

476483
if coverage:
477484
import trace
@@ -503,7 +510,7 @@ def run_tests_sequentially(self, runtests):
503510
if module not in save_modules and module.startswith("test."):
504511
support.unload(module)
505512

506-
if result.must_stop(self.fail_fast, fail_env_changed):
513+
if result.must_stop(self.fail_fast, self.fail_env_changed):
507514
break
508515

509516
previous_test = str(result)
@@ -564,12 +571,10 @@ def no_tests_run(self):
564571
self.environment_changed))
565572

566573
def get_tests_state(self):
567-
fail_env_changed = self.ns.fail_env_changed
568-
569574
result = []
570575
if self.bad:
571576
result.append("FAILURE")
572-
elif fail_env_changed and self.environment_changed:
577+
elif self.fail_env_changed and self.environment_changed:
573578
result.append("ENV CHANGED")
574579
elif self.no_tests_run():
575580
result.append("NO TESTS RAN")
@@ -585,8 +590,9 @@ def get_tests_state(self):
585590
result = '%s then %s' % (self.first_state, result)
586591
return result
587592

588-
def _run_tests_mp(self, runtests: RunTests) -> None:
589-
from test.libregrtest.runtest_mp import run_tests_multiprocess
593+
def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
594+
from test.libregrtest.runtest_mp import RunWorkers
595+
590596
# If we're on windows and this is the parent runner (not a worker),
591597
# track the load average.
592598
if sys.platform == 'win32':
@@ -600,7 +606,7 @@ def _run_tests_mp(self, runtests: RunTests) -> None:
600606
print(f'Failed to create WindowsLoadTracker: {error}')
601607

602608
try:
603-
run_tests_multiprocess(self, runtests)
609+
RunWorkers(self, runtests, num_workers).run()
604610
finally:
605611
if self.win_load_tracker is not None:
606612
self.win_load_tracker.close()
@@ -618,8 +624,8 @@ def set_tests(self, runtests: RunTests):
618624
def run_tests(self, runtests: RunTests):
619625
self.first_runtests = runtests
620626
self.set_tests(runtests)
621-
if self.ns.use_mp:
622-
self._run_tests_mp(runtests)
627+
if self.num_workers:
628+
self._run_tests_mp(runtests, self.num_workers)
623629
tracer = None
624630
else:
625631
tracer = self.run_tests_sequentially(runtests)
@@ -843,7 +849,7 @@ def get_exitcode(self):
843849
exitcode = EXITCODE_BAD_TEST
844850
elif self.interrupted:
845851
exitcode = EXITCODE_INTERRUPTED
846-
elif self.ns.fail_env_changed and self.environment_changed:
852+
elif self.fail_env_changed and self.environment_changed:
847853
exitcode = EXITCODE_ENV_CHANGED
848854
elif self.no_tests_run():
849855
exitcode = EXITCODE_NO_TESTS_RAN
@@ -866,6 +872,10 @@ def action_run_tests(self):
866872
if self.randomize:
867873
print("Using random seed", self.random_seed)
868874

875+
if self.num_workers < 0:
876+
# Use all cores + extras for tests that like to sleep
877+
self.num_workers = 2 + (os.cpu_count() or 1)
878+
869879
runtests = RunTests(
870880
tuple(self.selected),
871881
fail_fast=self.fail_fast,

Lib/test/libregrtest/runtest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def get_rerun_match_tests(self) -> FilterTuple | None:
217217
class RunTests:
218218
tests: TestTuple
219219
fail_fast: bool = False
220+
fail_env_changed: bool = False
220221
match_tests: FilterTuple | None = None
221222
ignore_tests: FilterTuple | None = None
222223
match_tests_dict: FilterDict | None = None

Lib/test/libregrtest/runtest_mp.py

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from test.support import os_helper
1717
from test.support import TestStats
1818

19-
from test.libregrtest.cmdline import Namespace
2019
from test.libregrtest.main import Regrtest
2120
from test.libregrtest.runtest import (
2221
run_single_test, TestResult, State, PROGRESS_MIN_TIME,
@@ -150,14 +149,13 @@ class ExitThread(Exception):
150149
pass
151150

152151

153-
class TestWorkerProcess(threading.Thread):
154-
def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
152+
class WorkerThread(threading.Thread):
153+
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
155154
super().__init__()
156155
self.worker_id = worker_id
157156
self.runtests = runner.runtests
158157
self.pending = runner.pending
159158
self.output = runner.output
160-
self.ns = runner.ns
161159
self.timeout = runner.worker_timeout
162160
self.regrtest = runner.regrtest
163161
self.current_test_name = None
@@ -167,7 +165,7 @@ def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
167165
self._stopped = False
168166

169167
def __repr__(self) -> str:
170-
info = [f'TestWorkerProcess #{self.worker_id}']
168+
info = [f'WorkerThread #{self.worker_id}']
171169
if self.is_alive():
172170
info.append("running")
173171
else:
@@ -203,7 +201,7 @@ def _kill(self) -> None:
203201
else:
204202
popen.kill()
205203
except ProcessLookupError:
206-
# popen.kill(): the process completed, the TestWorkerProcess thread
204+
# popen.kill(): the process completed, the WorkerThread thread
207205
# read its exit status, but Popen.send_signal() read the returncode
208206
# just before Popen.wait() set returncode.
209207
pass
@@ -362,7 +360,7 @@ def _runtest(self, test_name: str) -> MultiprocessResult:
362360

363361
def run(self) -> None:
364362
fail_fast = self.runtests.fail_fast
365-
fail_env_changed = self.ns.fail_env_changed
363+
fail_env_changed = self.runtests.fail_env_changed
366364
while not self._stopped:
367365
try:
368366
try:
@@ -394,10 +392,10 @@ def _wait_completed(self) -> None:
394392
f"{exc!r}")
395393

396394
def wait_stopped(self, start_time: float) -> None:
397-
# bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop()
395+
# bpo-38207: RunWorkers.stop_workers() called self.stop()
398396
# which killed the process. Sometimes, killing the process from the
399397
# main thread does not interrupt popen.communicate() in
400-
# TestWorkerProcess thread. This loop with a timeout is a workaround
398+
# WorkerThread thread. This loop with a timeout is a workaround
401399
# for that.
402400
#
403401
# Moreover, if this method fails to join the thread, it is likely
@@ -417,7 +415,7 @@ def wait_stopped(self, start_time: float) -> None:
417415
break
418416

419417

420-
def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
418+
def get_running(workers: list[WorkerThread]) -> list[str]:
421419
running = []
422420
for worker in workers:
423421
current_test_name = worker.current_test_name
@@ -427,18 +425,17 @@ def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]:
427425
if dt >= PROGRESS_MIN_TIME:
428426
text = '%s (%s)' % (current_test_name, format_duration(dt))
429427
running.append(text)
430-
return running
428+
if not running:
429+
return None
430+
return f"running ({len(running)}): {', '.join(running)}"
431431

432432

433-
class MultiprocessTestRunner:
434-
def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
435-
ns = regrtest.ns
436-
433+
class RunWorkers:
434+
def __init__(self, regrtest: Regrtest, runtests: RunTests, num_workers: int) -> None:
437435
self.regrtest = regrtest
436+
self.log = regrtest.log
437+
self.num_workers = num_workers
438438
self.runtests = runtests
439-
self.rerun = runtests.rerun
440-
self.log = self.regrtest.log
441-
self.ns = ns
442439
self.output: queue.Queue[QueueOutput] = queue.Queue()
443440
tests_iter = runtests.iter_tests()
444441
self.pending = MultiprocessIterator(tests_iter)
@@ -453,9 +450,8 @@ def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None:
453450
self.workers = None
454451

455452
def start_workers(self) -> None:
456-
use_mp = self.ns.use_mp
457-
self.workers = [TestWorkerProcess(index, self)
458-
for index in range(1, use_mp + 1)]
453+
self.workers = [WorkerThread(index, self)
454+
for index in range(1, self.num_workers + 1)]
459455
msg = f"Run tests in parallel using {len(self.workers)} child processes"
460456
if self.timeout:
461457
msg += (" (timeout: %s, worker timeout: %s)"
@@ -489,10 +485,11 @@ def _get_result(self) -> QueueOutput | None:
489485
except queue.Empty:
490486
pass
491487

492-
# display progress
493-
running = get_running(self.workers)
494-
if running and not pgo:
495-
self.log('running: %s' % ', '.join(running))
488+
if not pgo:
489+
# display progress
490+
running = get_running(self.workers)
491+
if running:
492+
self.log(running)
496493

497494
# all worker threads are done: consume pending results
498495
try:
@@ -510,9 +507,10 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
510507
text += ' (%s)' % mp_result.err_msg
511508
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
512509
text += ' (%s)' % format_duration(result.duration)
513-
running = get_running(self.workers)
514-
if running and not pgo:
515-
text += ' -- running: %s' % ', '.join(running)
510+
if not pgo:
511+
running = get_running(self.workers)
512+
if running:
513+
text += f' -- {running}'
516514
self.regrtest.display_progress(self.test_index, text)
517515

518516
def _process_result(self, item: QueueOutput) -> bool:
@@ -537,9 +535,9 @@ def _process_result(self, item: QueueOutput) -> bool:
537535

538536
return result
539537

540-
def run_tests(self) -> None:
538+
def run(self) -> None:
541539
fail_fast = self.runtests.fail_fast
542-
fail_env_changed = self.ns.fail_env_changed
540+
fail_env_changed = self.runtests.fail_env_changed
543541

544542
self.start_workers()
545543

@@ -566,10 +564,6 @@ def run_tests(self) -> None:
566564
self.stop_workers()
567565

568566

569-
def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None:
570-
MultiprocessTestRunner(regrtest, runtests).run_tests()
571-
572-
573567
class EncodeTestResult(json.JSONEncoder):
574568
"""Encode a TestResult (sub)class object into a JSON dict."""
575569

0 commit comments

Comments
 (0)