Skip to content

gh-109162: Refactor libregrtest WorkerJob #109171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def __init__(self, **kwargs) -> None:
self.ignore_tests = None
self.pgo = False
self.pgo_extended = False
self.worker_json = None

super().__init__(**kwargs)

Expand Down Expand Up @@ -205,7 +206,7 @@ def _create_parser():
group.add_argument('--wait', action='store_true',
help='wait for user input, e.g., allow a debugger '
'to be attached')
group.add_argument('--worker-args', metavar='ARGS')
group.add_argument('--worker-json', metavar='ARGS')
group.add_argument('-S', '--start', metavar='START',
help='the name of the test at which to start.' +
more_details)
Expand Down
10 changes: 5 additions & 5 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests):

# Get tests to re-run
tests = [result.test_name for result in need_rerun]
match_tests = self.get_rerun_match(need_rerun)
match_tests_dict = self.get_rerun_match(need_rerun)

# Clear previously failed tests
self.rerun_bad.extend(self.bad)
Expand All @@ -346,7 +346,7 @@ def _rerun_failed_tests(self, need_rerun, runtests: RunTests):
# Re-run failed tests
self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses")
runtests = runtests.copy(tests=tuple(tests),
match_tests=match_tests,
match_tests_dict=match_tests_dict,
rerun=True,
forever=False)
self.set_tests(runtests)
Expand Down Expand Up @@ -742,7 +742,7 @@ def set_temp_dir(self):
self.tmp_dir = os.path.abspath(self.tmp_dir)

def is_worker(self):
return (self.ns.worker_args is not None)
return (self.ns.worker_json is not None)

def create_temp_dir(self):
os.makedirs(self.tmp_dir, exist_ok=True)
Expand Down Expand Up @@ -870,8 +870,8 @@ def action_run_tests(self):

def _main(self):
if self.is_worker():
from test.libregrtest.runtest_mp import run_tests_worker
run_tests_worker(self.ns.worker_args)
from test.libregrtest.runtest_mp import worker_process
worker_process(self.ns.worker_json)
return

if self.want_wait:
Expand Down
6 changes: 3 additions & 3 deletions Lib/test/libregrtest/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def get_rerun_match_tests(self) -> FilterTuple | None:
@dataclasses.dataclass(slots=True, frozen=True)
class RunTests:
tests: TestTuple
match_tests: FilterDict | None = None
match_tests_dict: FilterDict | None = None
rerun: bool = False
forever: bool = False

Expand All @@ -218,8 +218,8 @@ def copy(self, **override):
return RunTests(**state)

def get_match_tests(self, test_name) -> FilterTuple | None:
if self.match_tests is not None:
return self.match_tests.get(test_name, None)
if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None)
else:
return None

Expand Down
62 changes: 34 additions & 28 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@

@dataclasses.dataclass(slots=True)
class WorkerJob:
test_name: str
runtests: RunTests
namespace: Namespace
rerun: bool = False
match_tests: FilterTuple | None = None


Expand All @@ -70,6 +69,7 @@ def default(self, o: Any) -> dict[str, Any]:
def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
if "__worker_job__" in d:
d.pop('__worker_job__')
d['runtests'] = RunTests(**d['runtests'])
return WorkerJob(**d)
if "__namespace__" in d:
d.pop('__namespace__')
Expand All @@ -78,17 +78,16 @@ def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]:
return d


def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]:
return json.loads(worker_json,
object_hook=_decode_worker_job)
def _parse_worker_json(worker_json: str) -> tuple[Namespace, str]:
return json.loads(worker_json, object_hook=_decode_worker_job)


def run_test_in_subprocess(worker_job: WorkerJob,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
def create_worker_process(worker_job: WorkerJob,
output_file: TextIO,
tmp_dir: str | None = None) -> subprocess.Popen:
ns = worker_job.namespace
python = ns.python
worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob)
worker_json = json.dumps(worker_job, cls=_EncodeWorkerJob)

if python is not None:
executable = python
Expand All @@ -97,7 +96,7 @@ def run_test_in_subprocess(worker_job: WorkerJob,
cmd = [*executable, *support.args_from_interpreter_flags(),
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
'--worker-args', worker_args]
'--worker-json', worker_json]

env = dict(os.environ)
if tmp_dir is not None:
Expand All @@ -122,31 +121,32 @@ def run_test_in_subprocess(worker_job: WorkerJob,
return subprocess.Popen(cmd, **kw)


def run_tests_worker(worker_json: str) -> NoReturn:
worker_job = _parse_worker_args(worker_json)
def worker_process(worker_json: str) -> NoReturn:
worker_job = _parse_worker_json(worker_json)
runtests = worker_job.runtests
ns = worker_job.namespace
test_name = worker_job.test_name
rerun = worker_job.rerun
match_tests = worker_job.match_tests
test_name = runtests.tests[0]
match_tests: FilterTuple | None = worker_job.match_tests

setup_tests(ns)

if rerun:
if runtests.rerun:
if match_tests:
matching = "matching: " + ", ".join(match_tests)
print(f"Re-running {test_name} in verbose mode ({matching})", flush=True)
else:
print(f"Re-running {test_name} in verbose mode", flush=True)
ns.verbose = True

if match_tests is not None:
ns.match_tests = match_tests
if match_tests is not None:
ns.match_tests = match_tests

result = runtest(ns, test_name)
print() # Force a newline (just in case)

# Serialize TestResult as dict in JSON
print(json.dumps(result, cls=EncodeTestResult), flush=True)
json.dump(result, sys.stdout, cls=EncodeTestResult)
sys.stdout.flush()
sys.exit(0)


Expand All @@ -173,7 +173,8 @@ def stop(self):
self.tests_iter = None


class MultiprocessResult(NamedTuple):
@dataclasses.dataclass(slots=True, frozen=True)
class MultiprocessResult:
result: TestResult
# bpo-45410: stderr is written into stdout to keep messages order
worker_stdout: str | None = None
Expand All @@ -198,7 +199,6 @@ def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None:
self.ns = runner.ns
self.timeout = runner.worker_timeout
self.regrtest = runner.regrtest
self.rerun = runner.rerun
self.current_test_name = None
self.start_time = None
self._popen = None
Expand Down Expand Up @@ -264,9 +264,8 @@ def mp_result_error(

def _run_process(self, worker_job, output_file: TextIO,
tmp_dir: str | None = None) -> int:
self.current_test_name = worker_job.test_name
try:
popen = run_test_in_subprocess(worker_job, output_file, tmp_dir)
popen = create_worker_process(worker_job, output_file, tmp_dir)

self._killed = False
self._popen = popen
Expand Down Expand Up @@ -316,6 +315,8 @@ def _run_process(self, worker_job, output_file: TextIO,
self.current_test_name = None

def _runtest(self, test_name: str) -> MultiprocessResult:
self.current_test_name = test_name

if sys.platform == 'win32':
# gh-95027: When stdout is not a TTY, Python uses the ANSI code
# page for the sys.stdout encoding. If the main process runs in a
Expand All @@ -324,15 +325,20 @@ def _runtest(self, test_name: str) -> MultiprocessResult:
else:
encoding = sys.stdout.encoding

match_tests = self.runtests.get_match_tests(test_name)
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
else:
match_tests = None
worker_runtests = self.runtests.copy(tests=tests)
worker_job = WorkerJob(
worker_runtests,
namespace=self.ns,
match_tests=match_tests)

# gh-94026: Write stdout+stderr to a tempfile as workaround for
# non-blocking pipes on Emscripten with NodeJS.
with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file:
worker_job = WorkerJob(test_name,
namespace=self.ns,
rerun=self.rerun,
match_tests=match_tests)
# gh-93353: Check for leaked temporary files in the parent process,
# since the deletion of temporary files can happen late during
# Python finalization: too late for libregrtest.
Expand Down
8 changes: 4 additions & 4 deletions Lib/test/test_regrtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def test_wait(self):
ns = libregrtest._parse_args(['--wait'])
self.assertTrue(ns.wait)

def test_worker_args(self):
ns = libregrtest._parse_args(['--worker-args', '[[], {}]'])
self.assertEqual(ns.worker_args, '[[], {}]')
self.checkError(['--worker-args'], 'expected one argument')
def test_worker_json(self):
ns = libregrtest._parse_args(['--worker-json', '[[], {}]'])
self.assertEqual(ns.worker_json, '[[], {}]')
self.checkError(['--worker-json'], 'expected one argument')

def test_start(self):
for opt in '-S', '--start':
Expand Down