Skip to content

Commit

Permalink
Add progress spinner to parallel and dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborbernat committed Jan 3, 2019
1 parent 64983d4 commit 4660bef
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 45 deletions.
12 changes: 9 additions & 3 deletions src/tox/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import tox
from tox.constants import INFO
from tox.interpreters import Interpreters, NoInterpreterInfo
from .parallel import add_parallel_flags, ENV_VAR_KEY as PARALLEL_ENV_VAR_KEY

from .parallel import add_parallel_flags, ENV_VAR_KEY as PARALLEL_ENV_VAR_KEY, add_parallel_config

hookimpl = tox.hookimpl
"""DEPRECATED - REMOVE - this is left for compatibility with plugins importing this from here.
Expand Down Expand Up @@ -804,6 +803,8 @@ def develop(testenv_config, value):
help="list of extras to install with the source distribution or develop install",
)

add_parallel_config(parser)


def cli_skip_missing_interpreter(parser):
class SkipMissingInterpreterAction(argparse.Action):
Expand Down Expand Up @@ -1045,7 +1046,7 @@ def __init__(self, config, ini_path, ini_data): # noqa
# factors stated in config envlist
stated_envlist = reader.getstring("envlist", replace=False)
if stated_envlist:
for env in _split_env(stated_envlist):
for env in config.envlist:
known_factors.update(env.split("-"))

# configure testenvs
Expand Down Expand Up @@ -1124,6 +1125,9 @@ def make_envconfig(self, name, section, subs, config, replace=True):
res = reader.getlist(env_attr.name, sep=" ")
elif atype == "line-list":
res = reader.getlist(env_attr.name, sep="\n")
elif atype == "env-list":
res = reader.getstring(env_attr.name, replace=False)
res = tuple(_split_env(res))
else:
raise ValueError("unknown type {!r}".format(atype))
if env_attr.postprocess:
Expand Down Expand Up @@ -1173,6 +1177,8 @@ def _getenvdata(self, reader, config):

def _split_env(env):
"""if handed a list, action="append" was used for -e """
if env is None:
return []
if not isinstance(env, list):
env = [e.split("#", 1)[0].strip() for e in env.split("\n")]
env = ",".join([e for e in env if e])
Expand Down
17 changes: 16 additions & 1 deletion src/tox/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from argparse import ArgumentTypeError

ENV_VAR_KEY = "_PARALLEL_TOXENV"
ENV_VAR_KEY = "TOX_PARALLEL_ENV"
OFF_VALUE = 0
DEFAULT_PARALLEL = OFF_VALUE

Expand Down Expand Up @@ -57,3 +57,18 @@ def add_parallel_flags(parser):
dest="parallel_live",
help="connect to stdout while running environments",
)


def add_parallel_config(parser):
parser.add_testenv_attribute(
"depends",
type="env-list",
help="tox environments that this environment depends on (must be run after those)",
)
parser.add_testenv_attribute(
"parallel_show_output",
type="bool",
default=False,
help="if set to True the content of the output will always be shown "
"when running in parallel mode",
)
125 changes: 86 additions & 39 deletions src/tox/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import subprocess
import sys
import time
from collections import OrderedDict
from contextlib import contextmanager
from threading import Semaphore, Thread
from threading import Event, Semaphore, Thread

import pkg_resources
import py
Expand All @@ -25,6 +26,8 @@
from tox.config.parallel import OFF_VALUE as PARALLEL_OFF
from tox.result import ResultLog
from tox.util import set_os_env_var
from tox.util.graph import stable_topological_sort
from tox.util.spinner import Spinner
from tox.venv import VirtualEnv


Expand Down Expand Up @@ -399,8 +402,15 @@ def __init__(self, config, popen=subprocess.Popen, Report=Reporter):
self.venvlist = [self.getvenv(x) for x in self.evaluated_env_list()]
except LookupError:
raise SystemExit(1)
except tox.exception.ConfigError as e:
self.report.error(str(e))
except tox.exception.ConfigError as exception:
self.report.error(str(exception))
raise SystemExit(1)
try:
self.venv_order = stable_topological_sort(
OrderedDict((v.name, v.envconfig.depends) for v in self.venvlist)
)
except ValueError as exception:
self.report.error("circular dependency detected: {}".format(exception))
raise SystemExit(1)
self._actions = []

Expand Down Expand Up @@ -465,7 +475,8 @@ def cleanup(self):
try:
yield
finally:
for tox_env in self.venvlist:
for name in self.venv_order:
tox_env = self.getvenv(name)
if (
hasattr(tox_env, "package")
and isinstance(tox_env.package, py.path.local)
Expand Down Expand Up @@ -575,7 +586,8 @@ def subcommand_test(self):
if self.config.skipsdist:
self.report.info("skipping sdist step")
else:
for venv in self.venvlist:
for name in self.venv_order:
venv = self.getvenv(name)
if not venv.envconfig.skip_install:
venv.package = self.hook.tox_package(session=self, venv=venv)
if not venv.package:
Expand All @@ -593,7 +605,8 @@ def subcommand_test(self):
return retcode

def run_sequential(self):
for venv in self.venvlist:
for name in self.venv_order:
venv = self.getvenv(name)
if self.setupenv(venv):
if venv.envconfig.skip_install:
self.finishvenv(venv)
Expand Down Expand Up @@ -625,45 +638,78 @@ def run_parallel(self):

max_parallel = self.config.option.parallel
if max_parallel is None:
max_parallel = len(self.venvlist)
max_parallel = len(self.venv_order)
semaphore = Semaphore(max_parallel)
finished = Event()
sink = None if live_out else subprocess.PIPE

def run_in_thread(venv, env):
try:
env[PARALLEL_ENV_VAR_KEY] = venv.envconfig.envname
args_sub = list(args)
if hasattr(venv, "package"):
args_sub.insert(position, str(venv.package))
args_sub.insert(position, "--installpkg")
run = subprocess.Popen(
args_sub, env=env, stdout=sink, stderr=sink, universal_newlines=True
)
res = run.wait()
finally:
semaphore.release()
if res is not None:
venv.status = "skipped tests" if self.config.option.notest else res
show_progress = not live_out and self.report.verbosity > Verbosity.QUIET
with Spinner(enabled=show_progress) as spinner:

def run_in_thread(tox_env, os_env):
res = None
env_name = tox_env.envconfig.envname
try:
os_env[PARALLEL_ENV_VAR_KEY] = env_name
args_sub = list(args)
if hasattr(tox_env, "package"):
args_sub.insert(position, str(tox_env.package))
args_sub.insert(position, "--installpkg")
process = subprocess.Popen(
args_sub,
env=os_env,
stdout=sink,
stderr=sink,
stdin=None,
universal_newlines=True,
)
res = process.wait()
finally:
done.add(env_name)
semaphore.release()
finished.set()
report = spinner.succeed
if self.config.option.notest:
report = spinner.fail
elif res:
report = spinner.fail
report(env_name)

tox_env.status = "skipped tests" if self.config.option.notest else res
if not live_out:
venv.out, venv.err = run.communicate()
if res:
message = "Failed {} under process {}, stdout:\n{}{}".format(
venv.name,
run.pid,
venv.out,
"\nstderr:\n{}".format(venv.err) if venv.err else "",
out, err = process.communicate()
if res or tox_env.envconfig.parallel_show_output:
outcome = (
"Failed {} under process {}, stdout:\n".format(env_name, process.pid)
if res
else ""
)
message = "{}{}{}".format(
outcome, out, "\nstderr:\n{}".format(err) if err else ""
).rstrip()
self.report.logline_if(Verbosity.QUIET, message)

threads = []
for venv in self.venvlist:
semaphore.acquire(blocking=True)
thread = Thread(target=run_in_thread, args=(venv, os.environ.copy()))
thread.start()
threads.append(thread)

for thread in threads:
thread.join()
threads = []
todo = OrderedDict(
(i, set(self.getvenv(i).envconfig.depends)) for i in self.venv_order
)
done = set()
while todo:
for name, depends in list(todo.items()):
if depends - done:
continue
del todo[name]
venv = self.getvenv(name)
semaphore.acquire(blocking=True)
spinner.add(venv.name)
thread = Thread(target=run_in_thread, args=(venv, os.environ.copy()))
thread.start()
threads.append(thread)
finished.wait()
finished.clear()

for thread in threads:
thread.join()

def runenvreport(self, venv):
"""
Expand Down Expand Up @@ -692,7 +738,8 @@ def _summary(self):
if not is_parallel_child:
self.report.startsummary()
exit_code = 0
for venv in self.venvlist:
for name in self.venv_order:
venv = self.getvenv(name)
reporter = self.report.good
status = venv.status
if isinstance(status, tox.exception.InterpreterNotFound):
Expand Down
File renamed without changes.
65 changes: 65 additions & 0 deletions src/tox/util/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from collections import OrderedDict, defaultdict


def stable_topological_sort(graph):
to_order = set(graph.keys()) # keep a log of what we need to order

# normalize graph - fill missing nodes (assume no dependency)
for values in graph.values():
for value in values:
if value not in graph:
graph[value] = tuple()

inverse_graph = defaultdict(set)
for key, depends in graph.items():
for depend in depends:
inverse_graph[depend].add(key)

topology = []
degree = {k: len(v) for k, v in graph.items()}
ready_to_visit = {n for n, d in degree.items() if not d}
need_to_visit = OrderedDict((i, None) for i in graph.keys())
while need_to_visit:
# to keep stable, pick the first node ready to visit in the original order
for node in need_to_visit:
if node in ready_to_visit:
break
else:
break
del need_to_visit[node]

topology.append(node)

# decrease degree for nodes we're going too
for to_node in inverse_graph[node]:
degree[to_node] -= 1
if not degree[to_node]: # if a node has no more incoming node it's ready to visit
ready_to_visit.add(to_node)

result = [n for n in topology if n in to_order] # filter out missing nodes we extended

if len(result) < len(to_order):
identify_cycle(graph)
raise ValueError("could not order tox environments and failed to detect circle")
return result


def identify_cycle(graph):
path = set()
visited = set()

def visit(vertex):
if vertex in visited:
return None
visited.add(vertex)
path.add(vertex)
for neighbour in graph.get(vertex, ()):
if neighbour in path or visit(neighbour):
return path
path.remove(vertex)
return None

for node in graph:
result = visit(node)
if result is not None:
raise ValueError(" | ".join(result))
Loading

0 comments on commit 4660bef

Please sign in to comment.