Skip to content

Commit

Permalink
Add a --loop flag, to allow for running continuously (#6270)
Browse files Browse the repository at this point in the history
### Problem

The ability to run a command continuously as files change has been a long-standing feature request, and something which is implemented in competing tools. It is very useful for low-latency compiler and test feedback.

### Solution

Builds atop @kwlzn 's work in #6088 and adds a `--loop` flag which re-runs all `@console_rule`s whenever input files have changed.

Adds a covering integration test (and refactors the pantsd test harness a bit to do so). 

### Result

```
./pants --enable-pantsd --v2 --no-v1 --loop list ${dir}::
```
... will continuously list the contents of a directory when files under the directory are invalidated.
  • Loading branch information
Stu Hood authored Aug 7, 2018
1 parent eac5ecf commit 008ce42
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 215 deletions.
33 changes: 26 additions & 7 deletions src/python/pants/engine/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import ast
import functools
import inspect
import logging
from abc import abstractproperty
Expand Down Expand Up @@ -33,15 +34,20 @@ def visit_Call(self, node):
self.gets.append(Get.extract_constraints(node))


class GoalProduct(object):
class _GoalProduct(object):
"""GoalProduct is a factory for anonymous singleton types representing the execution of goals.
The created types are returned by `@console_rule` instances, which may not have any outputs
of their own.
"""
PRODUCT_MAP = {}

@staticmethod
def _synthesize_goal_product(name):
product_type_name = '{}GoalExecution'.format(name.capitalize())
if PY2:
product_type_name = product_type_name.encode('utf-8')
return type(product_type_name, (datatype(['result']),), {})
return type(product_type_name, (datatype([]),), {})

@classmethod
def for_name(cls, name):
Expand Down Expand Up @@ -85,10 +91,23 @@ def resolve_type(name):
rule_visitor.visit(node)
gets.update(Get(resolve_type(p), resolve_type(s)) for p, s in rule_visitor.gets)

func._rule = TaskRule(output_type, input_selectors, func, input_gets=list(gets))
func.output_type = output_type
func.goal = for_goal
return func
# For @console_rule, redefine the function to avoid needing a literal return of the output type.
if for_goal:
def goal_and_return(*args, **kwargs):
res = func(*args, **kwargs)
if res is not None:
raise Exception('A @console_rule should not have a return value.')
return output_type()
functools.update_wrapper(goal_and_return, func)
wrapped_func = goal_and_return
else:
wrapped_func = func

wrapped_func._rule = TaskRule(output_type, input_selectors, wrapped_func, input_gets=list(gets))
wrapped_func.output_type = output_type
wrapped_func.goal = for_goal

return wrapped_func
return wrapper


Expand All @@ -97,7 +116,7 @@ def rule(output_type, input_selectors):


def console_rule(goal_name, input_selectors):
output_type = GoalProduct.for_name(goal_name)
output_type = _GoalProduct.for_name(goal_name)
return _make_rule(output_type, input_selectors, goal_name)


Expand Down
14 changes: 8 additions & 6 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import check_no_overlapping_paths
from pants.util.objects import Collection, SubclassesOf, datatype
from pants.util.strutil import pluralize


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,7 +70,7 @@ def failure(cls, error):


class ExecutionError(Exception):
def __init__(self, message, wrapped_exceptions):
def __init__(self, message, wrapped_exceptions=None):
super(ExecutionError, self).__init__(message)
self.wrapped_exceptions = wrapped_exceptions or ()

Expand Down Expand Up @@ -403,6 +404,8 @@ class SchedulerSession(object):
a Session.
"""

execution_error_type = ExecutionError

def __init__(self, scheduler, session):
self._scheduler = scheduler
self._session = session
Expand Down Expand Up @@ -541,19 +544,18 @@ def products_request(self, products, subjects):
throw_root_states = tuple(state for root, state in result.root_products if type(state) is Throw)
if throw_root_states:
unique_exceptions = tuple(set(t.exc for t in throw_root_states))
exception_noun = pluralize(len(unique_exceptions), 'Exception')

if self._scheduler.include_trace_on_error:
cumulative_trace = '\n'.join(self.trace(request))
raise ExecutionError(
'Received unexpected Throw state(s):\n{}'.format(cumulative_trace),
'{} encountered:\n{}'.format(exception_noun, cumulative_trace),
unique_exceptions,
)

if len(unique_exceptions) == 1:
raise throw_root_states[0].exc
else:
raise ExecutionError(
'Multiple exceptions encountered:\n {}'.format(
'{} encountered:\n {}'.format(
exception_noun,
'\n '.join('{}: {}'.format(type(t).__name__, str(t)) for t in unique_exceptions)),
unique_exceptions
)
Expand Down
18 changes: 10 additions & 8 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,20 @@ def _determine_subjects(target_roots):
"""
return target_roots.specs or []

def _execute(self, *args, **kwargs):
request = self.scheduler_session.execution_request(*args, **kwargs)
result = self.scheduler_session.execute(request)
if result.error:
raise result.error

def warm_product_graph(self, target_roots):
"""Warm the scheduler's `ProductGraph` with `TransitiveHydratedTargets` products.
This method raises only fatal errors, and does not consider failed roots in the execution
graph: in the v1 codepath, failed roots are accounted for post-fork.
:param TargetRoots target_roots: The targets root of the request.
"""
logger.debug('warming target_roots for: %r', target_roots)
subjects = self._determine_subjects(target_roots)
self._execute([TransitiveHydratedTargets], subjects)
request = self.scheduler_session.execution_request([TransitiveHydratedTargets], subjects)
result = self.scheduler_session.execute(request)
if result.error:
raise result.error

def validate_goals(self, goals):
"""Checks for @console_rules that satisfy requested goals.
Expand All @@ -195,6 +195,8 @@ def validate_goals(self, goals):
def run_console_rules(self, goals, target_roots):
"""Runs @console_rules sequentially and interactively by requesting their implicit Goal products.
For retryable failures, raises scheduler.ExecutionError.
:param list goals: The list of requested goal names as passed on the commandline.
:param TargetRoots target_roots: The targets root of the request.
"""
Expand All @@ -204,7 +206,7 @@ def run_console_rules(self, goals, target_roots):
for goal in goals:
goal_product = self.goal_map[goal]
logger.debug('requesting {} to satisfy execution of `{}` goal'.format(goal_product, goal))
self._execute([goal_product], subjects)
self.scheduler_session.product_request(goal_product, subjects)

def create_build_graph(self, target_roots, build_root=None):
"""Construct and return a `BuildGraph` given a set of input specs.
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/init/options_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def create(cls, options_bootstrapper, build_configuration, init_subsystems=True)
# Parse and register options.
options = cls._construct_options(options_bootstrapper, build_configuration)

GlobalOptionsRegistrar.validate_instance(options.for_global_scope())

if init_subsystems:
Subsystem.set_options(options)

Expand Down
37 changes: 31 additions & 6 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
get_pants_cachedir, get_pants_configdir, pants_version)
from pants.option.arg_splitter import GLOBAL_SCOPE
from pants.option.custom_types import dir_option
from pants.option.errors import OptionsError
from pants.option.optionable import Optionable
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin
Expand Down Expand Up @@ -125,6 +126,7 @@ def register_bootstrap_options(cls, register):
register('-q', '--quiet', type=bool, recursive=True, daemon=False,
help='Squelches most console output. NOTE: Some tasks default to behaving quietly: '
'inverting this option supports making them noisier than they would be otherwise.')

# Not really needed in bootstrap options, but putting it here means it displays right
# after -l and -q in help output, which is conveniently contextual.
register('--colors', type=bool, default=sys.stdout.isatty(), recursive=True, daemon=False,
Expand Down Expand Up @@ -236,12 +238,6 @@ def register_bootstrap_options(cls, register):
register('--enable-pantsd', advanced=True, type=bool, default=False,
help='Enables use of the pants daemon (and implicitly, the v2 engine). (Beta)')

# Toggles v1/v2 `Task` vs `@rule` pipelines on/off.
register('--v1', advanced=True, type=bool, default=True,
help='Enables execution of v1 Tasks.')
register('--v2', advanced=True, type=bool, default=False,
help='Enables execution of v2 @rules.')

# These facilitate configuring the native engine.
register('--native-engine-visualize-to', advanced=True, default=None, type=dir_option, daemon=False,
help='A directory to write execution and rule graphs to as `dot` files. The contents '
Expand Down Expand Up @@ -339,6 +335,19 @@ def register_options(cls, register):
"tags ('-' prefix). Useful with ::, to find subsets of targets "
"(e.g., integration tests.)")

# Toggles v1/v2 `Task` vs `@rule` pipelines on/off.
register('--v1', advanced=True, type=bool, default=True,
help='Enables execution of v1 Tasks.')
register('--v2', advanced=True, type=bool, default=False,
help='Enables execution of v2 @console_rules.')

loop_flag = '--loop'
register(loop_flag, type=bool,
help='Run v2 @console_rules continuously as file changes are detected. Requires '
'`--v2`, and is best utilized with `--v2 --no-v1`.')
register('--loop-max', type=int, default=2**32, advanced=True,
help='The maximum number of times to loop when `{}` is specified.'.format(loop_flag))

register('-t', '--timeout', advanced=True, type=int, metavar='<seconds>',
help='Number of seconds to wait for http connections.')
# TODO: After moving to the new options system these abstraction leaks can go away.
Expand All @@ -358,3 +367,19 @@ def register_options(cls, register):
register('--lock', advanced=True, type=bool, default=True,
help='Use a global lock to exclude other versions of pants from running during '
'critical operations.')

@classmethod
def validate_instance(cls, opts):
"""Validates an instance of global options for cases that are not prohibited via registration.
For example: mutually exclusive options may be registered by passing a `mutually_exclusive_group`,
but when multiple flags must be specified together, it can be necessary to specify post-parse
checks.
Raises pants.option.errors.OptionsError on validation failure.
"""
if opts.loop and (not opts.v2 or opts.v1):
raise OptionsError('The --loop option only works with @console_rules, and thus requires '
'`--v2 --no-v1` to function as expected.')
if opts.loop and not opts.enable_pantsd:
raise OptionsError('The --loop option requires `--enable-pantsd`, in order to watch files.')
63 changes: 59 additions & 4 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
import queue
import sys
import threading
from builtins import open

Expand Down Expand Up @@ -57,6 +58,8 @@ def __init__(
self._watchman_is_running = threading.Event()
self._invalidating_files = set()

self._loop_condition = LoopCondition()

@staticmethod
def _combined_invalidating_fileset_from_globs(glob_strs, root):
return set.union(*(Fileset.globs(glob_str, root=root)() for glob_str in glob_strs))
Expand Down Expand Up @@ -116,11 +119,12 @@ def _check_pid_changed(self):
def _handle_batch_event(self, files):
self._logger.debug('handling change event for: %s', files)

with self.lifecycle_lock:
self._maybe_invalidate_scheduler_batch(files)
self._maybe_invalidate_scheduler_batch(files)

with self.fork_lock:
self._scheduler.invalidate_files(files)
invalidated = self._scheduler.invalidate_files(files)
if invalidated:
self._loop_condition.notify_all()

def _process_event_queue(self):
"""File event notification queue processor."""
Expand Down Expand Up @@ -172,6 +176,28 @@ def prefork(self, options, build_config):
self._watchman_is_running.wait()

session = self._graph_helper.new_session()
if options.for_global_scope().loop:
return session, self._prefork_loop(session, options)
else:
return session, self._prefork_body(session, options)

def _prefork_loop(self, session, options):
# TODO: See https://github.com/pantsbuild/pants/issues/6288 regarding Ctrl+C handling.
iterations = options.for_global_scope().loop_max
target_roots = None
while iterations and not self.is_killed:
try:
target_roots = self._prefork_body(session, options)
except session.scheduler_session.execution_error_type as e:
# Render retryable exceptions raised by the Scheduler.
print(e, file=sys.stderr)

iterations -= 1
while iterations and not self.is_killed and not self._loop_condition.wait(timeout=1):
continue
return target_roots

def _prefork_body(self, session, options):
with self.fork_lock:
global_options = options.for_global_scope()
target_roots = TargetRootsCalculator.create(
Expand All @@ -192,9 +218,38 @@ def prefork(self, options, build_config):
# N.B. @console_rules run pre-fork in order to cache the products they request during execution.
session.run_console_rules(options.goals, target_roots)

return session, target_roots
return target_roots

def run(self):
"""Main service entrypoint."""
while not self.is_killed:
self._process_event_queue()


class LoopCondition(object):
"""A wrapped condition variable to handle deciding when loop consumers should re-run.
Any number of threads may wait and/or notify the condition.
"""

def __init__(self):
super(LoopCondition, self).__init__()
self._condition = threading.Condition(threading.Lock())
self._iteration = 0

def notify_all(self):
"""Notifies all threads waiting for the condition."""
with self._condition:
self._iteration += 1
self._condition.notify_all()

def wait(self, timeout):
"""Waits for the condition for at most the given timeout and returns True if the condition triggered.
Generally called in a loop until the condition triggers.
"""

with self._condition:
previous_iteration = self._iteration
self._condition.wait(timeout)
return previous_iteration != self._iteration
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/legacy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ python_tests(
name = 'console_rule_integration',
sources = [ 'test_console_rule_integration.py' ],
dependencies = [
'tests/python/pants_test:int-test',
'tests/python/pants_test/pantsd:pantsd_integration_test_base',
],
tags = {'integration'},
timeout = 300,
Expand Down
Loading

0 comments on commit 008ce42

Please sign in to comment.