Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-109162: Refactor Regrtest.action_run_tests() #109170

Merged
merged 1 commit into from
Sep 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 49 additions & 49 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def __init__(self, ns: Namespace):
# used by --slow
self.test_times = []

# used by --coverage, trace.Trace instance
self.tracer = None

# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
self.test_count_text = ''
Expand Down Expand Up @@ -443,28 +440,18 @@ def display_result(self):
print(count(len(self.run_no_tests), "test"), "run no tests:")
printlist(self.run_no_tests)

def run_test(self, test_index, test_name, previous_test, save_modules):
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)

if self.tracer:
def run_test(self, test_name: str, runtests: RunTests, tracer):
if tracer is not None:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = runtest(self.ns, test_name); '
'self.accumulate_result(result)')
cmd = ('result = runtest(self.ns, test_name)')
ns = dict(locals())
self.tracer.runctx(cmd, globals=globals(), locals=ns)
tracer.runctx(cmd, globals=globals(), locals=ns)
result = ns['result']
else:
result = runtest(self.ns, test_name)
self.accumulate_result(result)

# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
support.unload(module)
self.accumulate_result(result)

return result

Expand All @@ -477,7 +464,9 @@ def run_tests_sequentially(self, runtests):

if coverage:
import trace
self.tracer = trace.Trace(trace=False, count=True)
tracer = trace.Trace(trace=False, count=True)
else:
tracer = None

save_modules = sys.modules.keys()

Expand All @@ -491,8 +480,17 @@ def run_tests_sequentially(self, runtests):
for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()

result = self.run_test(test_index, test_name,
previous_test, save_modules)
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.display_progress(test_index, text)

result = self.run_test(test_name, runtests, tracer)

# Unload the newly imported modules (best effort finalization)
for module in sys.modules.keys():
if module not in save_modules and module.startswith("test."):
support.unload(module)

if result.must_stop(fail_fast, fail_env_changed):
break
Expand All @@ -508,6 +506,8 @@ def run_tests_sequentially(self, runtests):
if previous_test:
print(previous_test)

return tracer

def display_header(self):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
Expand Down Expand Up @@ -604,45 +604,28 @@ def set_tests(self, runtests: RunTests):
self.test_count_text = '/{}'.format(len(self.tests))
self.test_count_width = len(self.test_count_text) - 1

def run_tests(self):
# For a partial run, we do not need to clutter the output.
if (self.want_header
or not(self.ns.pgo or self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
self.display_header()

if self.ns.huntrleaks:
warmup, repetitions, _ = self.ns.huntrleaks
if warmup < 3:
msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
"3 warmup repetitions can give false positives!")
print(msg, file=sys.stdout, flush=True)

if self.randomize:
print("Using random seed", self.random_seed)

tests = self.selected
runtests = RunTests(tuple(tests), forever=self.forever)
def run_tests(self, runtests: RunTests):
self.first_runtests = runtests
self.set_tests(runtests)
if self.ns.use_mp:
self._run_tests_mp(runtests)
tracer = None
else:
self.run_tests_sequentially(runtests)
return runtests
tracer = self.run_tests_sequentially(runtests)
return tracer

def finalize(self):
def finalize_tests(self, tracer):
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
fp.write(self.next_single_test + '\n')
else:
os.unlink(self.next_single_filename)

if self.tracer:
r = self.tracer.results()
r.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)
if tracer is not None:
results = tracer.results()
results.write_results(show_missing=True, summary=True,
coverdir=self.ns.coverdir)

if self.ns.runleaks:
os.system("leaks %d" % os.getpid())
Expand Down Expand Up @@ -858,15 +841,32 @@ def get_exitcode(self):
return exitcode

def action_run_tests(self):
runtests = self.run_tests()
if self.ns.huntrleaks:
warmup, repetitions, _ = self.ns.huntrleaks
if warmup < 3:
msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
"3 warmup repetitions can give false positives!")
print(msg, file=sys.stdout, flush=True)

# For a partial run, we do not need to clutter the output.
if (self.want_header
or not(self.ns.pgo or self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
self.display_header()

if self.randomize:
print("Using random seed", self.random_seed)

runtests = RunTests(tuple(self.selected), forever=self.forever)
tracer = self.run_tests(runtests)
self.display_result()

need_rerun = self.need_rerun
if self.ns.rerun and need_rerun:
self.rerun_failed_tests(need_rerun, runtests)

self.display_summary()
self.finalize()
self.finalize_tests(tracer)

def _main(self):
if self.is_worker():
Expand Down