Skip to content

Commit

Permalink
Refactor result output and duration tracking (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyreese authored Oct 14, 2024
1 parent 3549416 commit 8e7bb2c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 42 deletions.
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ venv: .venv
echo 'run `source .venv/bin/activate` to activate virtualenv'

test:
python -m unittest_ft -s $(PKG).tests
python -m unittest_ft -rs $(PKG).tests
python -m mypy -p $(PKG)

lint:
Expand Down
16 changes: 9 additions & 7 deletions unittest_ft/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@


@click.command()
@click.option("--debug/--quiet", default=None, help="Increase or decrease logging")
@click.option("--debug", default=None, help="Enable debug logging")
@click.option("--verbose", "-v", is_flag=True, help="Verbose output")
@click.option("--quiet", "-q", is_flag=True, help="Quiet output")
@click.option("--stress-test", "-s", is_flag=True, help="Run every test 10 times")
@click.option("--randomize", "-r", is_flag=True, help="Randomize test order")
@click.option(
Expand All @@ -26,24 +28,24 @@
)
@click.argument("module")
def main(
debug: bool | None,
debug: bool,
verbose: bool,
quiet: bool,
module: str,
randomize: bool,
stress_test: bool,
threads: int,
) -> NoReturn:
logging.basicConfig(
level=(
logging.DEBUG
if debug
else (logging.WARNING if debug is None else logging.ERROR)
),
level=(logging.DEBUG if debug else logging.WARNING),
stream=sys.stderr,
)
verbosity = 2 if verbose else 0 if quiet else 1
result = run(
module,
randomize=randomize,
stress_test=stress_test,
threads=threads,
verbosity=verbosity,
)
sys.exit(0 if result.wasSuccessful() else 1)
104 changes: 70 additions & 34 deletions unittest_ft/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import logging
import os
import random
import sys
import time
from concurrent.futures import as_completed, ThreadPoolExecutor
from typing import Generator
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from typing import Any, Generator, TextIO
from unittest import TestCase, TestLoader, TestResult, TestSuite

from rich import print
# from rich import print
from typing_extensions import Self

LOG = logging.getLogger(__name__)
Expand All @@ -21,17 +22,58 @@


class FTTestResult(TestResult):
def __init__(
self,
stream: TextIO | None = None,
descriptions: bool | None = None,
verbosity: int | None = None,
) -> None:
super().__init__(stream=stream, descriptions=descriptions, verbosity=verbosity)
self.verbosity = verbosity or 1
self.before = time.monotonic_ns()
self.duration = 0
self.collected_duration = 0

def stopTest(self, test: Any) -> None:
super().stopTest(test)
self.duration = time.monotonic_ns() - self.before

def stopTestRun(self) -> None:
super().stopTestRun()
self.duration = time.monotonic_ns() - self.before

def __str__(self) -> str:
items = [(f"ERROR: {test_case}", trace) for test_case, trace in self.errors]
items += [(f"FAIL: {test_case}", trace) for test_case, trace in self.failures]

longest = max(len(label) for label, _ in items) if items else 70

msg = "\n".join(
msg = "\n"
msg += "\n".join(
f"{'=' * longest}\n{label}\n{'-' * longest}\n{trace}"
for label, trace in items
)
msg += "-" * longest
msg += f"\nRan {self.testsRun} tests in {format_ns(self.duration)}"

saved = self.collected_duration - self.duration
if saved > 0 and (saved / self.duration) > 0.10:
msg += f" (saved {format_ns(self.collected_duration - self.duration)})"
msg += "\n\n"

if self.wasSuccessful():
msg += "OK"
else:
parts = []
if self.errors:
parts += [f"errors={len(self.errors)}"]
if self.failures:
parts += [f"failures={len(self.failures)}"]
if self.skipped:
parts += [f"skipped={len(self.skipped)}"]
if self.expectedFailures:
parts += [f"expected failures={len(self.expectedFailures)}"]
msg += f"FAILED ({', '.join(parts)})"

return msg

Expand All @@ -48,6 +90,8 @@ def __add__(self, other: object) -> FTTestResult:
result.unexpectedSuccesses = (
self.unexpectedSuccesses + other.unexpectedSuccesses
)
if isinstance(other, FTTestResult):
result.collected_duration = self.duration + other.duration
return result

def __iadd__(self, other: object) -> Self:
Expand All @@ -60,6 +104,8 @@ def __iadd__(self, other: object) -> Self:
self.skipped += other.skipped
self.testsRun += other.testsRun
self.unexpectedSuccesses += other.unexpectedSuccesses
if isinstance(other, FTTestResult):
self.collected_duration += other.duration
return self


Expand All @@ -71,17 +117,15 @@ def get_individual_tests(suite: TestSuite) -> Generator[TestCase, None, None]:
yield test


def run_single_test(test_id: str) -> tuple[str, TestResult, int]:
def run_single_test(test_id: str) -> tuple[str, FTTestResult]:
LOG.debug("Loading test %s", test_id)
loader = TestLoader()
result = FTTestResult(descriptions=True, verbosity=2)
suite = loader.loadTestsFromName(test_id)
LOG.debug("Running test %s", test_id)
before = time.monotonic_ns()
result = FTTestResult(descriptions=True, verbosity=2)
suite.run(result)
duration = time.monotonic_ns() - before
LOG.debug("Finished test %s", test_id)
return (test_id, result, duration)
return (test_id, result)


def format_ns(duration: int) -> str:
Expand All @@ -97,13 +141,13 @@ def run(
randomize: bool = False,
stress_test: bool = False,
threads: int = DEFAULT_THREADS,
verbosity: int = 1,
) -> TestResult:
loaded_module = importlib.import_module(module)
loader = TestLoader()
suite = loader.loadTestsFromModule(loaded_module)
LOG.debug("loaded %d test cases from %s", suite.countTestCases(), module)

before = time.monotonic_ns()
test_ids = [test.id() for test in get_individual_tests(suite)]
if stress_test:
test_ids = test_ids * 10
Expand All @@ -114,33 +158,25 @@ def run(

LOG.debug("ready to run %d tests:\n %s", len(test_ids), "\n ".join(test_ids))
pool = ThreadPoolExecutor(max_workers=threads)
futs = [pool.submit(run_single_test, test_id) for test_id in test_ids]
futs = {pool.submit(run_single_test, test_id) for test_id in test_ids}

test_duration = 0
stream = sys.stdout
result = FTTestResult()
while futs:
done, futs = wait(futs, timeout=0.1, return_when=FIRST_COMPLETED)
for fut in done:
test_id, test_result = fut.result()
if verbosity == 2:
stream.write(
f"{test_id} ... {'OK' if test_result.wasSuccessful() else 'FAIL'} "
f" {format_ns(test_result.duration)}\n"
)
elif verbosity == 1:
stream.write("." if test_result.wasSuccessful() else "F")
stream.flush()
result += test_result
result.stopTestRun()

for fut in as_completed(futs):
test_id, test_result, duration = fut.result()
print(
f"{test_id} ... {'OK' if test_result.wasSuccessful() else 'FAIL'} "
f" {format_ns(duration)}"
)

test_duration += duration
result += test_result

runner_duration = time.monotonic_ns() - before

print()
print(result)
print(
f"Ran {len(test_ids)} tests in {format_ns(runner_duration)} "
f"(saved {format_ns(test_duration - runner_duration)})\n"
)

if result.wasSuccessful():
print("OK")
else:
print("FAILED")

return result

0 comments on commit 8e7bb2c

Please sign in to comment.