Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-109162: libregrtest: move code around #109253

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion Lib/test/libregrtest/findtests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import os
import sys
import unittest

from .utils import StrPath, TestName, TestList
from test import support

from .utils import (
StrPath, TestName, TestTuple, TestList, FilterTuple,
abs_module_name, count, printlist)


# If these test directories are encountered recurse into them and treat each
Expand Down Expand Up @@ -56,3 +62,37 @@ def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
else:
splitted.append(name)
return splitted


def _list_cases(suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())

def list_cases(tests: TestTuple, *,
match_tests: FilterTuple | None = None,
ignore_tests: FilterTuple | None = None,
test_dir: StrPath | None = None):
support.verbose = False
support.set_match_tests(match_tests, ignore_tests)

skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
101 changes: 9 additions & 92 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import locale
import os
import platform
import random
import re
import sys
import time
import unittest

from test import support
from test.support import os_helper

from .cmdline import _parse_args, Namespace
from .findtests import findtests, split_test_packages
from .findtests import findtests, split_test_packages, list_cases
from .logger import Logger
from .result import State
from .runtests import RunTests, HuntRefleak
Expand All @@ -22,8 +19,8 @@
from .utils import (
StrPath, StrJSON, TestName, TestList, TestTuple, FilterTuple,
strip_py_suffix, count, format_duration,
printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout,
abs_module_name)
printlist, get_temp_dir, get_work_dir, exit_timeout,
display_header, cleanup_temp_dir)


class Regrtest:
Expand Down Expand Up @@ -214,36 +211,6 @@ def list_tests(tests: TestTuple):
for name in tests:
print(name)

def _list_cases(self, suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
self._list_cases(test)
elif isinstance(test, unittest.TestCase):
if support.match_test(test):
print(test.id())

def list_cases(self, tests: TestTuple):
support.verbose = False
support.set_match_tests(self.match_tests, self.ignore_tests)

skipped = []
for test_name in tests:
module_name = abs_module_name(test_name, self.test_dir)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
self._list_cases(suite)
except unittest.SkipTest:
skipped.append(test_name)

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)

def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests
if self.num_workers == 0:
Expand Down Expand Up @@ -363,45 +330,6 @@ def run_tests_sequentially(self, runtests):

return tracer

@staticmethod
def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))

# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return

print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")

def get_state(self):
state = self.results.get_state(self.fail_env_changed)
if self.first_state:
Expand Down Expand Up @@ -445,20 +373,6 @@ def display_summary(self):
state = self.get_state()
print(f"Result: {state}")

@staticmethod
def cleanup_temp_dir(tmp_dir: StrPath):
import glob

path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)

def create_run_tests(self, tests: TestTuple):
return RunTests(
tests,
Expand Down Expand Up @@ -496,7 +410,7 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
if (self.want_header
or not(self.pgo or self.quiet or self.single_test_run
or tests or self.cmdline_args)):
self.display_header()
display_header()

if self.randomize:
print("Using random seed", self.random_seed)
Expand Down Expand Up @@ -554,7 +468,7 @@ def main(self, tests: TestList | None = None):
self.tmp_dir = get_temp_dir(self.tmp_dir)

if self.want_cleanup:
self.cleanup_temp_dir(self.tmp_dir)
cleanup_temp_dir(self.tmp_dir)
sys.exit(0)

if self.want_wait:
Expand All @@ -567,7 +481,10 @@ def main(self, tests: TestList | None = None):
if self.want_list_tests:
self.list_tests(selected)
elif self.want_list_cases:
self.list_cases(selected)
list_cases(selected,
match_tests=self.match_tests,
ignore_tests=self.ignore_tests,
test_dir=self.test_dir)
else:
exitcode = self.run_tests(selected, tests)

Expand Down
55 changes: 55 additions & 0 deletions Lib/test/libregrtest/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import atexit
import contextlib
import faulthandler
import locale
import math
import os.path
import platform
import random
import sys
import sysconfig
Expand Down Expand Up @@ -524,3 +526,56 @@ def adjust_rlimit_nofile():
except (ValueError, OSError) as err:
print_warning(f"Unable to raise RLIMIT_NOFILE from {fd_limit} to "
f"{new_fd_limit}: {err}.")


def display_header():
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== Python build:", ' '.join(get_build_info()))
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getencoding(), sys.getfilesystemencoding()))

# This makes it easier to remember what to set in your local
# environment when trying to reproduce a sanitizer failure.
asan = support.check_sanitizer(address=True)
msan = support.check_sanitizer(memory=True)
ubsan = support.check_sanitizer(ub=True)
sanitizers = []
if asan:
sanitizers.append("address")
if msan:
sanitizers.append("memory")
if ubsan:
sanitizers.append("undefined behavior")
if not sanitizers:
return

print(f"== sanitizers: {', '.join(sanitizers)}")
for sanitizer, env_var in (
(asan, "ASAN_OPTIONS"),
(msan, "MSAN_OPTIONS"),
(ubsan, "UBSAN_OPTIONS"),
):
options= os.environ.get(env_var)
if sanitizer and options is not None:
print(f"== {env_var}={options!r}")


def cleanup_temp_dir(tmp_dir: StrPath):
import glob

path = os.path.join(glob.escape(tmp_dir), 'test_python_*')
print("Cleanup %s directory" % tmp_dir)
for name in glob.glob(path):
if os.path.isdir(name):
print("Remove directory: %s" % name)
os_helper.rmtree(name)
else:
print("Remove file: %s" % name)
os_helper.unlink(name)