Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup regr_test.py by running test cases concurrently #10714

Merged
merged 7 commits into from
Sep 23, 2023
170 changes: 121 additions & 49 deletions tests/regr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
from __future__ import annotations

import argparse
import concurrent.futures
import os
import queue
import re
import shutil
import subprocess
import sys
import tempfile
import threading
from contextlib import ExitStack, suppress
from dataclasses import dataclass
from enum import IntEnum
from itertools import product
from pathlib import Path
Expand All @@ -24,7 +29,6 @@
get_mypy_req,
make_venv,
print_error,
print_success_msg,
testcase_dir_from_package_name,
)

Expand Down Expand Up @@ -103,18 +107,20 @@ class Verbosity(IntEnum):
),
)

_PRINT_QUEUE: queue.SimpleQueue[str] = queue.SimpleQueue()


def verbose_log(msg: str) -> None:
print(colored("\n" + msg, "blue"))
_PRINT_QUEUE.put(colored(msg, "blue"))


def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: Path, verbosity: Verbosity) -> None:
def setup_testcase_dir(package: PackageInfo, tempdir: Path, verbosity: Verbosity) -> None:
if verbosity is verbosity.VERBOSE:
verbose_log(f"Setting up testcase dir in {tempdir}")
verbose_log(f"{package.name}: Setting up testcase dir in {tempdir}")
# --warn-unused-ignores doesn't work for files inside typeshed.
# SO, to work around this, we copy the test_cases directory into a TemporaryDirectory,
# and run the test cases inside of that.
shutil.copytree(package.test_case_directory, new_test_case_dir)
shutil.copytree(package.test_case_directory, tempdir / TEST_CASES)
if package.is_stdlib:
return

Expand All @@ -137,16 +143,15 @@ def setup_testcase_dir(package: PackageInfo, tempdir: Path, new_test_case_dir: P
shutil.copytree(Path("stubs", requirement), new_typeshed / "stubs" / requirement)

if requirements.external_pkgs:
if verbosity is Verbosity.VERBOSE:
verbose_log(f"Setting up venv in {tempdir / VENV_DIR}")
pip_exe = make_venv(tempdir / VENV_DIR).pip_exe
pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs]
# Use --no-cache-dir to avoid issues with concurrent read/writes to the cache
pip_command = [pip_exe, "install", get_mypy_req(), *requirements.external_pkgs, "--no-cache-dir"]
if verbosity is Verbosity.VERBOSE:
verbose_log(f"{pip_command=}")
verbose_log(f"{package.name}: Setting up venv in {tempdir / VENV_DIR}. {pip_command=}\n")
try:
subprocess.run(pip_command, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(e.stderr)
_PRINT_QUEUE.put(f"{package.name}\n{e.stderr}")
raise


Expand All @@ -155,10 +160,6 @@ def run_testcases(
) -> subprocess.CompletedProcess[str]:
env_vars = dict(os.environ)
new_test_case_dir = tempdir / TEST_CASES
testcasedir_already_setup = new_test_case_dir.exists() and new_test_case_dir.is_dir()

if not testcasedir_already_setup:
setup_testcase_dir(package, tempdir=tempdir, new_test_case_dir=new_test_case_dir, verbosity=verbosity)

# "--enable-error-code ignore-without-code" is purposefully omitted.
# See https://github.com/python/typeshed/pull/8083
Expand Down Expand Up @@ -202,39 +203,103 @@ def run_testcases(

mypy_command = [python_exe, "-m", "mypy"] + flags
if verbosity is Verbosity.VERBOSE:
verbose_log(f"{mypy_command=}")
description = f"{package.name}/{version}/{platform}"
msg = f"{description}: {mypy_command=}\n"
if "MYPYPATH" in env_vars:
verbose_log(f"{env_vars['MYPYPATH']=}")
msg += f"{description}: {env_vars['MYPYPATH']=}"
else:
verbose_log("MYPYPATH not set")
msg += f"{description}: MYPYPATH not set"
msg += "\n"
verbose_log(msg)
return subprocess.run(mypy_command, capture_output=True, text=True, env=env_vars)


def test_testcase_directory(
package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path
) -> ReturnCode:
msg = f"Running mypy --platform {platform} --python-version {version} on the "
msg += "standard library test cases..." if package.is_stdlib else f"test cases for {package.name!r}..."
@dataclass(frozen=True)
class Result:
code: int
command_run: str
stderr: str
stdout: str
test_case_dir: Path
tempdir: Path

def print_description(self, *, verbosity: Verbosity) -> None:
if self.code:
print(f"{self.command_run}:", end=" ")
print_error("FAILURE\n")
replacements = (str(self.tempdir / TEST_CASES), str(self.test_case_dir))
if self.stderr:
print_error(self.stderr, fix_path=replacements)
if self.stdout:
print_error(self.stdout, fix_path=replacements)


def test_testcase_directory(package: PackageInfo, version: str, platform: str, *, verbosity: Verbosity, tempdir: Path) -> Result:
msg = f"mypy --platform {platform} --python-version {version} on the "
msg += "standard library test cases" if package.is_stdlib else f"test cases for {package.name!r}"
if verbosity > Verbosity.QUIET:
print(msg, end=" ", flush=True)

result = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity)

if result.returncode:
if verbosity is Verbosity.QUIET:
# We'll already have printed this if --verbosity QUIET wasn't passed.
# If --verbosity QUIET was passed, only print this if there were errors.
# If there are errors, the output is inscrutable if this isn't printed.
print(msg, end=" ")
print_error("failure\n")
replacements = (str(tempdir / TEST_CASES), str(package.test_case_directory))
if result.stderr:
print_error(result.stderr, fix_path=replacements)
if result.stdout:
print_error(result.stdout, fix_path=replacements)
elif verbosity > Verbosity.QUIET:
print_success_msg()
return result.returncode
_PRINT_QUEUE.put(f"Running {msg}...")

proc_info = run_testcases(package=package, version=version, platform=platform, tempdir=tempdir, verbosity=verbosity)
return Result(
code=proc_info.returncode,
command_run=msg,
stderr=proc_info.stderr,
stdout=proc_info.stdout,
test_case_dir=package.test_case_directory,
tempdir=tempdir,
)


def print_queued_messages(ev: threading.Event) -> None:
while not ev.is_set():
with suppress(queue.Empty):
print(_PRINT_QUEUE.get(timeout=0.5), flush=True)
while True:
try:
msg = _PRINT_QUEUE.get_nowait()
except queue.Empty:
return
else:
print(msg, flush=True)


def concurrently_run_testcases(
stack: ExitStack,
testcase_directories: list[PackageInfo],
verbosity: Verbosity,
platforms_to_test: list[str],
versions_to_test: list[str],
) -> list[Result]:
packageinfo_to_tempdir = {
package_info: Path(stack.enter_context(tempfile.TemporaryDirectory())) for package_info in testcase_directories
}

event = threading.Event()
printer_thread = threading.Thread(target=print_queued_messages, args=(event,))
printer_thread.start()

with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
# Each temporary directory may be used by multiple processes concurrently during the next step;
# must make sure that they're all setup correctly before starting the next step,
# in order to avoid race conditions
testcase_futures = [
executor.submit(setup_testcase_dir, package, tempdir, verbosity)
for package, tempdir in packageinfo_to_tempdir.items()
]
concurrent.futures.wait(testcase_futures)

mypy_futures = [
executor.submit(test_testcase_directory, testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir)
for (testcase_dir, tempdir), platform, version in product(
packageinfo_to_tempdir.items(), platforms_to_test, versions_to_test
)
]
results = [future.result() for future in mypy_futures]

event.set()
printer_thread.join()
return results


def main() -> ReturnCode:
Expand All @@ -253,16 +318,23 @@ def main() -> ReturnCode:
versions_to_test = args.versions_to_test or [f"3.{sys.version_info[1]}"]

code = 0
for testcase_dir in testcase_directories:
with tempfile.TemporaryDirectory() as td:
tempdir = Path(td)
for platform, version in product(platforms_to_test, versions_to_test):
this_code = test_testcase_directory(testcase_dir, version, platform, verbosity=verbosity, tempdir=tempdir)
code = max(code, this_code)
results: list[Result] | None = None

with ExitStack() as stack:
results = concurrently_run_testcases(stack, testcase_directories, verbosity, platforms_to_test, versions_to_test)

assert results is not None
print()

for result in results:
result.print_description(verbosity=verbosity)

code = max(result.code for result in results)

if code:
print_error("\nTest completed with errors")
print_error("Test completed with errors")
else:
print(colored("\nTest completed successfully!", "green"))
print(colored("Test completed successfully!", "green"))

return code

Expand Down