From e1a543ea86535676f86510f5e615747ffab445f1 Mon Sep 17 00:00:00 2001 From: Bernat Gabor Date: Thu, 3 Jan 2019 11:45:37 +0000 Subject: [PATCH] Add progress spinner to parallel and dependencies --- src/tox/config/__init__.py | 12 ++- src/tox/config/parallel.py | 17 ++- src/tox/session.py | 125 +++++++++++++++------- src/tox/{util.py => util/__init__.py} | 0 src/tox/util/graph.py | 65 ++++++++++++ src/tox/util/spinner.py | 147 ++++++++++++++++++++++++++ tests/unit/session/test_parallel.py | 9 +- tox.ini | 3 +- 8 files changed, 330 insertions(+), 48 deletions(-) rename src/tox/{util.py => util/__init__.py} (100%) create mode 100644 src/tox/util/graph.py create mode 100644 src/tox/util/spinner.py diff --git a/src/tox/config/__init__.py b/src/tox/config/__init__.py index 2de0e1e72c..e2f747ffd4 100644 --- a/src/tox/config/__init__.py +++ b/src/tox/config/__init__.py @@ -21,8 +21,7 @@ import tox from tox.constants import INFO from tox.interpreters import Interpreters, NoInterpreterInfo -from .parallel import add_parallel_flags, ENV_VAR_KEY as PARALLEL_ENV_VAR_KEY - +from .parallel import add_parallel_flags, ENV_VAR_KEY as PARALLEL_ENV_VAR_KEY, add_parallel_config hookimpl = tox.hookimpl """DEPRECATED - REMOVE - this is left for compatibility with plugins importing this from here. @@ -804,6 +803,8 @@ def develop(testenv_config, value): help="list of extras to install with the source distribution or develop install", ) + add_parallel_config(parser) + def cli_skip_missing_interpreter(parser): class SkipMissingInterpreterAction(argparse.Action): @@ -1045,7 +1046,7 @@ def __init__(self, config, ini_path, ini_data): # noqa # factors stated in config envlist stated_envlist = reader.getstring("envlist", replace=False) if stated_envlist: - for env in _split_env(stated_envlist): + for env in config.envlist: known_factors.update(env.split("-")) # configure testenvs @@ -1124,6 +1125,9 @@ def make_envconfig(self, name, section, subs, config, replace=True): res = reader.getlist(env_attr.name, sep=" ") elif atype == "line-list": res = reader.getlist(env_attr.name, sep="\n") + elif atype == "env-list": + res = reader.getstring(env_attr.name, replace=False) + res = tuple(_split_env(res)) else: raise ValueError("unknown type {!r}".format(atype)) if env_attr.postprocess: @@ -1173,6 +1177,8 @@ def _getenvdata(self, reader, config): def _split_env(env): """if handed a list, action="append" was used for -e """ + if env is None: + return [] if not isinstance(env, list): env = [e.split("#", 1)[0].strip() for e in env.split("\n")] env = ",".join([e for e in env if e]) diff --git a/src/tox/config/parallel.py b/src/tox/config/parallel.py index d70d6df481..0c86440082 100644 --- a/src/tox/config/parallel.py +++ b/src/tox/config/parallel.py @@ -2,7 +2,7 @@ from argparse import ArgumentTypeError -ENV_VAR_KEY = "_PARALLEL_TOXENV" +ENV_VAR_KEY = "TOX_PARALLEL_ENV" OFF_VALUE = 0 DEFAULT_PARALLEL = OFF_VALUE @@ -57,3 +57,18 @@ def add_parallel_flags(parser): dest="parallel_live", help="connect to stdout while running environments", ) + + +def add_parallel_config(parser): + parser.add_testenv_attribute( + "depends", + type="env-list", + help="tox environments that this environment depends on (must be run after those)", + ) + parser.add_testenv_attribute( + "parallel_show_output", + type="bool", + default=False, + help="if set to True the content of the output will always be shown " + "when running in parallel mode", + ) diff --git a/src/tox/session.py b/src/tox/session.py index c5f47e5e94..6df9821c51 100644 --- a/src/tox/session.py +++ b/src/tox/session.py @@ -13,8 +13,9 @@ import subprocess import sys import time +from collections import OrderedDict from contextlib import contextmanager -from threading import Semaphore, Thread +from threading import Event, Semaphore, Thread import pkg_resources import py @@ -25,6 +26,8 @@ from tox.config.parallel import OFF_VALUE as PARALLEL_OFF from tox.result import ResultLog from tox.util import set_os_env_var +from tox.util.graph import stable_topological_sort +from tox.util.spinner import Spinner from tox.venv import VirtualEnv @@ -399,8 +402,15 @@ def __init__(self, config, popen=subprocess.Popen, Report=Reporter): self.venvlist = [self.getvenv(x) for x in self.evaluated_env_list()] except LookupError: raise SystemExit(1) - except tox.exception.ConfigError as e: - self.report.error(str(e)) + except tox.exception.ConfigError as exception: + self.report.error(str(exception)) + raise SystemExit(1) + try: + self.venv_order = stable_topological_sort( + OrderedDict((v.name, v.envconfig.depends) for v in self.venvlist) + ) + except ValueError as exception: + self.report.error("circular dependency detected: {}".format(exception)) raise SystemExit(1) self._actions = [] @@ -465,7 +475,8 @@ def cleanup(self): try: yield finally: - for tox_env in self.venvlist: + for name in self.venv_order: + tox_env = self.getvenv(name) if ( hasattr(tox_env, "package") and isinstance(tox_env.package, py.path.local) @@ -575,7 +586,8 @@ def subcommand_test(self): if self.config.skipsdist: self.report.info("skipping sdist step") else: - for venv in self.venvlist: + for name in self.venv_order: + venv = self.getvenv(name) if not venv.envconfig.skip_install: venv.package = self.hook.tox_package(session=self, venv=venv) if not venv.package: @@ -593,7 +605,8 @@ def subcommand_test(self): return retcode def run_sequential(self): - for venv in self.venvlist: + for name in self.venv_order: + venv = self.getvenv(name) if self.setupenv(venv): if venv.envconfig.skip_install: self.finishvenv(venv) @@ -625,45 +638,78 @@ def run_parallel(self): max_parallel = self.config.option.parallel if max_parallel is None: - max_parallel = len(self.venvlist) + max_parallel = len(self.venv_order) semaphore = Semaphore(max_parallel) + finished = Event() sink = None if live_out else subprocess.PIPE - def run_in_thread(venv, env): - try: - env[PARALLEL_ENV_VAR_KEY] = venv.envconfig.envname - args_sub = list(args) - if hasattr(venv, "package"): - args_sub.insert(position, str(venv.package)) - args_sub.insert(position, "--installpkg") - run = subprocess.Popen( - args_sub, env=env, stdout=sink, stderr=sink, universal_newlines=True - ) - res = run.wait() - finally: - semaphore.release() - if res is not None: - venv.status = "skipped tests" if self.config.option.notest else res + show_progress = not live_out and self.report.verbosity > Verbosity.QUIET + with Spinner(enabled=show_progress) as spinner: + + def run_in_thread(tox_env, os_env): + res = None + env_name = tox_env.envconfig.envname + try: + os_env[PARALLEL_ENV_VAR_KEY] = env_name + args_sub = list(args) + if hasattr(tox_env, "package"): + args_sub.insert(position, str(tox_env.package)) + args_sub.insert(position, "--installpkg") + process = subprocess.Popen( + args_sub, + env=os_env, + stdout=sink, + stderr=sink, + stdin=None, + universal_newlines=True, + ) + res = process.wait() + finally: + done.add(env_name) + semaphore.release() + finished.set() + report = spinner.succeed + if self.config.option.notest: + report = spinner.fail + elif res: + report = spinner.fail + report(env_name) + + tox_env.status = "skipped tests" if self.config.option.notest else res if not live_out: - venv.out, venv.err = run.communicate() - if res: - message = "Failed {} under process {}, stdout:\n{}{}".format( - venv.name, - run.pid, - venv.out, - "\nstderr:\n{}".format(venv.err) if venv.err else "", + out, err = process.communicate() + if res or tox_env.envconfig.parallel_show_output: + outcome = ( + "Failed {} under process {}, stdout:\n".format(env_name, process.pid) + if res + else "" + ) + message = "{}{}{}".format( + outcome, out, "\nstderr:\n{}".format(err) if err else "" ).rstrip() self.report.logline_if(Verbosity.QUIET, message) - threads = [] - for venv in self.venvlist: - semaphore.acquire(blocking=True) - thread = Thread(target=run_in_thread, args=(venv, os.environ.copy())) - thread.start() - threads.append(thread) - - for thread in threads: - thread.join() + threads = [] + todo = OrderedDict( + (i, set(self.getvenv(i).envconfig.depends)) for i in self.venv_order + ) + done = set() + while todo: + for name, depends in list(todo.items()): + if depends - done: + continue + del todo[name] + venv = self.getvenv(name) + semaphore.acquire(blocking=True) + spinner.add(venv.name) + thread = Thread(target=run_in_thread, args=(venv, os.environ.copy())) + thread.start() + threads.append(thread) + finished.wait() + finished.clear() + + for thread in threads: + thread.join() def runenvreport(self, venv): """ @@ -692,7 +738,8 @@ def _summary(self): if not is_parallel_child: self.report.startsummary() exit_code = 0 - for venv in self.venvlist: + for name in self.venv_order: + venv = self.getvenv(name) reporter = self.report.good status = venv.status if isinstance(status, tox.exception.InterpreterNotFound): diff --git a/src/tox/util.py b/src/tox/util/__init__.py similarity index 100% rename from src/tox/util.py rename to src/tox/util/__init__.py diff --git a/src/tox/util/graph.py b/src/tox/util/graph.py new file mode 100644 index 0000000000..439d12d024 --- /dev/null +++ b/src/tox/util/graph.py @@ -0,0 +1,65 @@ +from collections import OrderedDict, defaultdict + + +def stable_topological_sort(graph): + to_order = set(graph.keys()) # keep a log of what we need to order + + # normalize graph - fill missing nodes (assume no dependency) + for values in graph.values(): + for value in values: + if value not in graph: + graph[value] = tuple() + + inverse_graph = defaultdict(set) + for key, depends in graph.items(): + for depend in depends: + inverse_graph[depend].add(key) + + topology = [] + degree = {k: len(v) for k, v in graph.items()} + ready_to_visit = {n for n, d in degree.items() if not d} + need_to_visit = OrderedDict((i, None) for i in graph.keys()) + while need_to_visit: + # to keep stable, pick the first node ready to visit in the original order + for node in need_to_visit: + if node in ready_to_visit: + break + else: + break + del need_to_visit[node] + + topology.append(node) + + # decrease degree for nodes we're going too + for to_node in inverse_graph[node]: + degree[to_node] -= 1 + if not degree[to_node]: # if a node has no more incoming node it's ready to visit + ready_to_visit.add(to_node) + + result = [n for n in topology if n in to_order] # filter out missing nodes we extended + + if len(result) < len(to_order): + identify_cycle(graph) + raise ValueError("could not order tox environments and failed to detect circle") + return result + + +def identify_cycle(graph): + path = set() + visited = set() + + def visit(vertex): + if vertex in visited: + return None + visited.add(vertex) + path.add(vertex) + for neighbour in graph.get(vertex, ()): + if neighbour in path or visit(neighbour): + return path + path.remove(vertex) + return None + + for node in graph: + result = visit(node) + if result is not None: + raise ValueError(" | ".join(result)) diff --git a/src/tox/util/spinner.py b/src/tox/util/spinner.py new file mode 100644 index 0000000000..df1c9221af --- /dev/null +++ b/src/tox/util/spinner.py @@ -0,0 +1,147 @@ +"""A minimal non-colored version of https://pypi.org/project/halo, to track list progress""" +import os +import sys +import threading +import time +from datetime import datetime +from threading import RLock + +threads = [] + +if os.name == "nt": + import ctypes + + class _CursorInfo(ctypes.Structure): + _fields_ = [("size", ctypes.c_int), ("visible", ctypes.c_byte)] + + +class Spinner(object): + CLEAR_LINE = "\033[K" + stream = sys.stdout + refresh_rate = 0.1 + max_width = 120 + frames = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] + + def __init__(self, enabled=True): + self._lock = RLock() + self._envs = dict() + self._frame_index = 0 + self.enabled = enabled + + def add(self, name): + self._envs[name] = datetime.now() + + def clear(self): + if self.enabled: + with self._lock: + self.stream.write("\r") + self.stream.write(self.CLEAR_LINE) + + def render(self): + while not self._stop_spinner.is_set(): + if self.enabled: + with self._lock: + frame = self.frame() + output = "\r{}".format(frame) + self.clear() + self.stream.write(output) + time.sleep(self.refresh_rate) + return self + + def frame(self): + frame = self.frames[self._frame_index] + self._frame_index += 1 + self._frame_index = self._frame_index % len(self.frames) + text_frame = "[{}] {}".format(len(self._envs), " | ".join(self._envs)) + if len(text_frame) > self.max_width - 1: + text_frame = "{}...".format(text_frame[: self.max_width - 1 - 3]) + return u"{} {}".format(*[(frame, text_frame)][0]) + + def __enter__(self): + if self.enabled: + if self.stream.isatty(): + if os.name == "nt": + ci = _CursorInfo() + handle = ctypes.windll.kernel32.GetStdHandle(-11) + ctypes.windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(ci)) + ci.visible = False + ctypes.windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(ci)) + elif os.name == "posix": + self.stream.write("\033[?25l") + self.stream.flush() + + self._stop_spinner = threading.Event() + self._spinner_thread = threading.Thread(target=self.render) + self._spinner_thread.setDaemon(True) + self._spinner_id = self._spinner_thread.name + self._spinner_thread.start() + return self + + def succeed(self, key): + self.finalize(key, "✔ OK") + + def fail(self, key): + self.finalize(key, "✖ FAIL") + + def skip(self, key): + self.finalize(key, "⚠ SKIP") + + def finalize(self, key, status): + with self._lock: + start_at = self._envs[key] + del self._envs[key] + if not self._envs: + self.__exit__(None, None, None) + if self.enabled: + self.clear() + self.stream.write( + "{} {} in {}{}".format( + status, key, td_human_readable(datetime.now() - start_at), os.linesep + ) + ) + + def __exit__(self, exc_type, exc_val, exc_tb): + if not self._stop_spinner.is_set(): + if self._spinner_thread: + self._stop_spinner.set() + self._spinner_thread.join() + + self._frame_index = 0 + self._spinner_id = None + if self.enabled: + self.clear() + if self.stream.isatty(): + if os.name == "nt": + ci = _CursorInfo() + handle = ctypes.windll.kernel32.GetStdHandle(-11) + ctypes.windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(ci)) + ci.visible = True + ctypes.windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(ci)) + elif os.name == "posix": + self.stream.write("\033[?25h") + self.stream.flush() + + return self + + +def td_human_readable(delta): + seconds = int(delta.total_seconds()) + periods = [ + ("year", 60 * 60 * 24 * 365), + ("month", 60 * 60 * 24 * 30), + ("day", 60 * 60 * 24), + ("hour", 60 * 60), + ("minute", 60), + ("second", 1), + ] + + texts = [] + for period_name, period_seconds in periods: + if seconds > period_seconds or period_seconds == 1: + period_value, seconds = divmod(seconds, period_seconds) + if period_name == "second": + ms = delta.total_seconds() - int(delta.total_seconds()) + period_value += round(ms, 3) + has_s = "s" if period_value > 1 else "" + texts.append("{} {}{}".format(period_value, period_name, has_s)) + return ", ".join(texts) diff --git a/tests/unit/session/test_parallel.py b/tests/unit/session/test_parallel.py index 84d59653af..0d8de35f3b 100644 --- a/tests/unit/session/test_parallel.py +++ b/tests/unit/session/test_parallel.py @@ -1,7 +1,7 @@ import os -def test_parallel_live(cmd, initproj): +def test_parallel_live(cmd, initproj, mock_venv): initproj( "pkg123-0.7", filedefs={ @@ -17,7 +17,7 @@ def test_parallel_live(cmd, initproj): assert result.ret == 0, "{}{}{}".format(result.err, os.linesep, result.out) -def test_parallel(cmd, initproj): +def test_parallel(cmd, initproj, mock_venv): initproj( "pkg123-0.7", filedefs={ @@ -33,7 +33,7 @@ def test_parallel(cmd, initproj): assert result.ret == 0, "{}{}{}".format(result.err, os.linesep, result.out) -def test_parallel_error_report(cmd, initproj): +def test_parallel_error_report(cmd, initproj, mock_venv): initproj( "pkg123-0.7", filedefs={ @@ -41,7 +41,8 @@ def test_parallel_error_report(cmd, initproj): [tox] envlist = a [testenv] - commands=python -c "import sys, os; sys.stderr.write(str(12345) + os.linesep); raise SystemExit(17)" + commands=python -c "import sys, os; sys.stderr.write(str(12345) + os.linesep);\ + raise SystemExit(17)" """ }, ) diff --git a/tox.ini b/tox.ini index 219ef4d2b5..c40d794bdb 100644 --- a/tox.ini +++ b/tox.ini @@ -29,7 +29,6 @@ commands = pytest {posargs:\ --junitxml={env:JUNIT_XML_FILE:{toxworkdir}/.test.{envname}.xml} \ . } - [testenv:docs] description = invoke sphinx-build to build the HTML docs basepython = python3.7 @@ -80,6 +79,8 @@ commands = coverage erase coverage xml -o {toxworkdir}/coverage.xml coverage html -d {toxworkdir}/htmlcov diff-cover --compare-branch {env:DIFF_AGAINST:origin/master} {toxworkdir}/coverage.xml +depends = py27, py34, py35, py36, py37, pypy, pypy3 +parallel_show_output = True [testenv:coveralls] description = [only run on CI]: upload coverage data to codecov (depends on coverage running first)