Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-36670: Multiple regrtest bugfixes #16511

Merged
merged 1 commit into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,6 @@ def run_tests(self):
self.run_tests_sequential()

def finalize(self):
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None

if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
Expand Down Expand Up @@ -680,11 +676,16 @@ def _main(self, tests, kwargs):
# typeperf.exe for x64, x86 or ARM
print(f'Failed to create WindowsLoadTracker: {error}')

self.run_tests()
self.display_result()

if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()
try:
self.run_tests()
self.display_result()

if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None

self.finalize()

Expand Down
222 changes: 113 additions & 109 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
format_test_result, TestResult, is_failed, TIMEOUT)
from test.libregrtest.setup import setup_tests
from test.libregrtest.utils import format_duration
from test.libregrtest.utils import format_duration, print_warning


# Display the running tests if nothing happened last N seconds
Expand Down Expand Up @@ -103,9 +103,10 @@ class ExitThread(Exception):
pass


class MultiprocessThread(threading.Thread):
def __init__(self, pending, output, ns, timeout):
class TestWorkerProcess(threading.Thread):
def __init__(self, worker_id, pending, output, ns, timeout):
super().__init__()
self.worker_id = worker_id
self.pending = pending
self.output = output
self.ns = ns
Expand All @@ -114,12 +115,16 @@ def __init__(self, pending, output, ns, timeout):
self.start_time = None
self._popen = None
self._killed = False
self._stopped = False

def __repr__(self):
info = ['MultiprocessThread']
test = self.current_test_name
info = [f'TestWorkerProcess #{self.worker_id}']
if self.is_alive():
info.append('alive')
dt = time.monotonic() - self.start_time
info.append("running for %s" % format_duration(dt))
else:
info.append('stopped')
test = self.current_test_name
if test:
info.append(f'test={test}')
popen = self._popen
Expand All @@ -128,53 +133,24 @@ def __repr__(self):
return '<%s>' % ' '.join(info)

def _kill(self):
dt = time.monotonic() - self.start_time
if self._killed:
return
self._killed = True

popen = self._popen
pid = popen.pid
print("Kill worker process %s running for %.1f sec" % (pid, dt),
file=sys.stderr, flush=True)
if popen is None:
return

print(f"Kill {self}", file=sys.stderr, flush=True)
try:
popen.kill()
return True
except OSError as exc:
print("WARNING: Failed to kill worker process %s: %r" % (pid, exc),
file=sys.stderr, flush=True)
return False

def _close_wait(self):
popen = self._popen

# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()

try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print("WARNING: Failed to wait for worker process %s "
"completion (timeout=%.1f sec): %r"
% (popen.pid, JOIN_TIMEOUT, exc),
file=sys.stderr, flush=True)

def kill(self):
"""
Kill the current process (if any).

This method can be called by the thread running the process,
or by another thread.
"""
self._killed = True

if self._popen is None:
return

if not self._kill():
return
print_warning(f"Failed to kill {self}: {exc!r}")

self._close_wait()
def stop(self):
# Method called from a different thread to stop this thread
self._stopped = True
self._kill()

def mp_result_error(self, test_name, error_type, stdout='', stderr='',
err_msg=None):
Expand All @@ -190,59 +166,69 @@ def _timedout(self, test_name):
try:
stdout, stderr = popen.communicate(timeout=JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print("WARNING: Failed to read worker process %s output "
"(timeout=%.1f sec): %r"
% (popen.pid, JOIN_TIMEOUT, exc),
file=sys.stderr, flush=True)

self._close_wait()
print_warning(f"Failed to read {self} output "
f"(timeout={format_duration(JOIN_TIMEOUT)}): "
f"{exc!r}")

return self.mp_result_error(test_name, TIMEOUT, stdout, stderr)

def _runtest(self, test_name):
try:
self.start_time = time.monotonic()
self.current_test_name = test_name
def _run_process(self, test_name):
self.start_time = time.monotonic()

self.current_test_name = test_name
try:
self._killed = False
self._popen = run_test_in_subprocess(test_name, self.ns)
popen = self._popen
except:
self.current_test_name = None
raise

try:
if self._stopped:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self._kill()
raise ExitThread

try:
try:
if self._killed:
# If kill() has been called before self._popen is set,
# self._popen is still running. Call again kill()
# to ensure that the process is killed.
self.kill()
raise ExitThread

try:
stdout, stderr = popen.communicate(timeout=self.timeout)
except subprocess.TimeoutExpired:
if self._killed:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread

return self._timedout(test_name)
except OSError:
if self._killed:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
raise
except:
self.kill()
raise
finally:
self._close_wait()
stdout, stderr = popen.communicate(timeout=self.timeout)
except subprocess.TimeoutExpired:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread

return self._timedout(test_name)
except OSError:
if self._stopped:
# kill() has been called: communicate() fails
# on reading closed stdout/stderr
raise ExitThread
raise

retcode = popen.returncode
stdout = stdout.strip()
stderr = stderr.rstrip()

return (retcode, stdout, stderr)
except:
self._kill()
raise
finally:
self.current_test_name = None
self._wait_completed()
self._popen = None
self.current_test_name = None

def _runtest(self, test_name):
result = self._run_process(test_name)

stdout = stdout.strip()
stderr = stderr.rstrip()
if isinstance(result, MultiprocessResult):
# _timedout() case
return result

retcode, stdout, stderr = result

err_msg = None
if retcode != 0:
Expand All @@ -266,7 +252,7 @@ def _runtest(self, test_name):
return MultiprocessResult(result, stdout, stderr, err_msg)

def run(self):
while not self._killed:
while not self._stopped:
try:
try:
test_name = next(self.pending)
Expand All @@ -284,6 +270,33 @@ def run(self):
self.output.put((True, traceback.format_exc()))
break

def _wait_completed(self):
popen = self._popen

# stdout and stderr must be closed to ensure that communicate()
# does not hang
popen.stdout.close()
popen.stderr.close()

try:
popen.wait(JOIN_TIMEOUT)
except (subprocess.TimeoutExpired, OSError) as exc:
print_warning(f"Failed to wait for {self} completion "
f"(timeout={format_duration(JOIN_TIMEOUT)}): "
f"{exc!r}")

def wait_stopped(self, start_time):
while True:
# Write a message every second
self.join(1.0)
if not self.is_alive():
break
dt = time.monotonic() - start_time
print(f"Waiting for {self} thread for {format_duration(dt)}", flush=True)
if dt > JOIN_TIMEOUT:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break


def get_running(workers):
running = []
Expand All @@ -298,7 +311,7 @@ def get_running(workers):
return running


class MultiprocessRunner:
class MultiprocessTestRunner:
def __init__(self, regrtest):
self.regrtest = regrtest
self.ns = regrtest.ns
Expand All @@ -311,30 +324,20 @@ def __init__(self, regrtest):
self.workers = None

def start_workers(self):
self.workers = [MultiprocessThread(self.pending, self.output,
self.ns, self.worker_timeout)
for _ in range(self.ns.use_mp)]
self.workers = [TestWorkerProcess(index, self.pending, self.output,
self.ns, self.worker_timeout)
for index in range(1, self.ns.use_mp + 1)]
print("Run tests in parallel using %s child processes"
% len(self.workers))
for worker in self.workers:
worker.start()

def wait_workers(self):
def stop_workers(self):
start_time = time.monotonic()
for worker in self.workers:
worker.kill()
worker.stop()
for worker in self.workers:
while True:
worker.join(1.0)
if not worker.is_alive():
break
dt = time.monotonic() - start_time
print("Wait for regrtest worker %r for %.1f sec" % (worker, dt),
flush=True)
if dt > JOIN_TIMEOUT:
print("Warning -- failed to join a regrtest worker %s"
% worker, flush=True)
break
worker.wait_stopped(start_time)

def _get_result(self):
if not any(worker.is_alive() for worker in self.workers):
Expand Down Expand Up @@ -418,10 +421,11 @@ def run_tests(self):
if self.ns.timeout is not None:
faulthandler.cancel_dump_traceback_later()

# a test failed (and --failfast is set) or all tests completed
self.pending.stop()
self.wait_workers()
# Always ensure that all worker processes are no longer
# worker when we exit this function
self.pending.stop()
self.stop_workers()


def run_tests_multiprocess(regrtest):
MultiprocessRunner(regrtest).run_tests()
MultiprocessTestRunner(regrtest).run_tests()
Loading