Skip to content

Commit

Permalink
Add option to attach Subsystems to streaming workunits (#8720)
Browse files Browse the repository at this point in the history
### Problem

Now that we have the infrastructure to stream workunits while pants is running, we need a way to be able to tell some component of the code to do something useful with those workunits. We also want plugins to be able to receive streamed workunits.

### Solution

A global Subsystem with a specific method handle_workunits can be registered to receive streaming workunits, by using the new global option --streaming-workunit-handlers, which expects a list of Python import paths as strings. At the beginning of a pants run, pants_local_runner will dynamically import all specified Subsystem classes, and every time StreamingWorkunitHandler receives workunits from the engine, it will pass them along to the handle_workunits method of all registered Subsystems.

The string arguments need to be a fully-qualified import path including the class of the Subsystem itself. For instance, ./pants --v2 --no-v1 --streaming-workunits-handlers="['pants.reporting.workunits.Workunits']" binary examples/src/python/example/hello/main/ will register a Subsystem subclass Workunits defined in the module pants.reporting.workunits to receive streaming workunits.
  • Loading branch information
gshuflin authored Dec 4, 2019
1 parent 33cb62b commit 1dcae2f
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 21 deletions.
7 changes: 5 additions & 2 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.reporting.reporting import Reporting
from pants.reporting.streaming_workunit_handler import StreamingWorkunitHandler
from pants.subsystem.subsystem import Subsystem
from pants.util.contextutil import maybe_profiled


Expand Down Expand Up @@ -112,7 +113,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,
# be merged with the zipkin_trace_v2 flag, since they both involve most
# of the same engine functionality, but for now is separate to avoid
# breaking functionality associated with zipkin tracing while iterating on streaming workunit reporting.
stream_workunits = options.for_scope('reporting').stream_workunits
stream_workunits = len(options.for_global_scope().streaming_workunits_handlers) != 0
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits=stream_workunits)
return graph_session, graph_session.scheduler_session

Expand Down Expand Up @@ -320,7 +321,9 @@ def _run(self):
try:
self._maybe_handle_help()

streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callback=None)
streaming_handlers = self._options.for_global_scope().streaming_workunits_handlers
callbacks = Subsystem.get_streaming_workunit_callbacks(streaming_handlers)
streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callbacks=callbacks)
with streaming_reporter.session():
engine_result = self._maybe_run_v2()

Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ def register_options(cls, register):
register('--lock', advanced=True, type=bool, default=True,
help='Use a global lock to exclude other versions of pants from running during '
'critical operations.')
register('--streaming-workunits-handlers', type=list, member_type=str, default=[],
advanced=True,
help="Use this option to name Subsystems which will receive streaming workunit events. "
"For instance, `--streaming-workunits-handlers=\"['pants.reporting.workunit.Workunits']\"` will "
"register a Subsystem called Workunits defined in the module \"pants.reporting.workunit\"."
)

@classmethod
def validate_instance(cls, opts):
Expand Down
3 changes: 1 addition & 2 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ def prepare_v1_graph_run_v2(self, options, options_bootstrapper):
build_id = RunTracker.global_instance().run_id
v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
stream_workunits = options.for_scope('reporting').stream_workunits
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui, should_report_workunits=stream_workunits)
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui)

if options.for_global_scope().loop:
fn = self._loop
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def register_options(cls, register):
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')
register('--stream-workunits', advanced=True, type=bool, default=False,
help="If set to true, report workunit information while pants is running")

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
21 changes: 11 additions & 10 deletions src/python/pants/reporting/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

import threading
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Optional
from typing import Any, Callable, Iterable, Iterator, Optional


DEFAULT_REPORT_INTERVAL_SECONDS = 10


class StreamingWorkunitHandler:
def __init__(self, scheduler: Any, callback: Optional[Callable], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable] = (), report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
self.callback = callback
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None

def start(self) -> None:
if self.callback is not None:
self._thread_runner = _InnerHandler(self.scheduler, self.callback, self.report_interval)
if self.callbacks:
self._thread_runner = _InnerHandler(self.scheduler, self.callbacks, self.report_interval)
self._thread_runner.start()

def end(self) -> None:
Expand All @@ -28,8 +28,8 @@ def end(self) -> None:
# After stopping the thread, poll workunits one last time to make sure
# we report any workunits that were added after the last time the thread polled.
workunits = self.scheduler.poll_workunits()
if self.callback:
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)

@contextmanager
def session(self) -> Iterator[None]:
Expand All @@ -44,17 +44,18 @@ def session(self) -> Iterator[None]:


class _InnerHandler(threading.Thread):
def __init__(self, scheduler: Any, callback: Callable, report_interval: float):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable], report_interval: float):
super(_InnerHandler, self).__init__()
self.scheduler = scheduler
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callback = callback
self.callbacks = callbacks

def run(self):
while not self.stop_request.isSet():
workunits = self.scheduler.poll_workunits()
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)
self.stop_request.wait(timeout=self.report_interval)

def join(self, timeout=None):
Expand Down
59 changes: 58 additions & 1 deletion src/python/pants/subsystem/subsystem.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import importlib
import inspect
from typing import Dict, Optional, Tuple, Type, TypeVar, Union, cast
import logging
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, cast

from pants.option.optionable import Optionable
from pants.option.options import Options
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin, SubsystemDependency


logger = logging.getLogger(__name__)


class SubsystemError(Exception):
"""An error in a subsystem."""

Expand Down Expand Up @@ -169,3 +174,55 @@ def get_options(self):
:API: public
"""
return self._scoped_options

@staticmethod
def get_streaming_workunit_callbacks(subsystem_names: Iterable[str]) -> List[Callable]:
"""
This method is used to dynamically generate a list of callables
intended to be passed to StreamingWorkunitHandler. The caller provides a
collection of strings representing a Python import path to a class that
implements the `Subsystem` class. It will then inspect these classes for
the presence of a special method called `handle_workunits`, which expects a
single non-self argument - namely, a tuple of Python dictionaries
representing workunits.
For instance, you might invoke this method with something like:
`Subsystem.get_streaming_workunit_callbacks(["pants.reporting.workunits.Workunit"])`
And this will result in the method attempting to dynamically-import a
module called "pants.reporting.workunits", inspecting it for the presence
of a class called `Workunit`, getting a global instance of this Subsystem,
and returning a list containing a single reference to the
`handle_workunits` method defined on it - and returning an empty list and
emitting warnings if any of these steps fail.
"""

callables = []

for name in subsystem_names:
try:
name_components = name.split(".")
module_name = ".".join(name_components[:-1])
class_name = name_components[-1]
module = importlib.import_module(module_name)
subsystem_class = getattr(module, class_name)
except (IndexError, AttributeError, ModuleNotFoundError, ValueError) as e:
logger.warning(f"Invalid module name: {name}: {e}")
continue
except ImportError as e:
logger.warning(f"Could not import {module_name}: {e}")
continue
try:
subsystem = subsystem_class.global_instance()
except AttributeError:
logger.warning(f"{subsystem_class} is not a global subsystem.")
continue

try:
callables.append(subsystem.handle_workunits)
except AttributeError:
logger.warning(f"{subsystem_class} does not have a method named `handle_workunits` defined.")
continue

return callables
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def add(self, workunits) -> None:
self.workunits.extend(workunits)

tracker = Tracker()
async_reporter = StreamingWorkunitHandler(scheduler, callback=tracker.add, report_interval_seconds=0.01)
async_reporter = StreamingWorkunitHandler(scheduler, callbacks=[tracker.add], report_interval_seconds=0.01)
with async_reporter.session():
scheduler.product_request(Fib, subjects=[0])

Expand Down
2 changes: 1 addition & 1 deletion tests/python/pants_test/subsystem/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ python_tests(
dependencies=[
'src/python/pants/option',
'src/python/pants/subsystem',
'src/python/pants/testutil:test_base',
],
tags = {"partially_type_checked"},
)

python_library(
Expand Down
45 changes: 43 additions & 2 deletions tests/python/pants_test/subsystem/test_subsystem.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import unittest
import logging

from pants.option.optionable import Optionable
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem import Subsystem
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin
from pants.testutil.test_base import TestBase


class WorkunitSubscriptableSubsystem(Subsystem):
options_scope = "dummy scope"

def handle_workunits(self, workunits):
pass


class DummySubsystem(Subsystem):
Expand Down Expand Up @@ -41,9 +49,10 @@ def si(scope, subsystem_cls):
return ScopeInfo(scope, ScopeInfo.SUBSYSTEM, subsystem_cls)


class SubsystemTest(unittest.TestCase):
class SubsystemTest(TestBase):
def setUp(self):
DummySubsystem._options = DummyOptions()
WorkunitSubscriptableSubsystem._options = DummyOptions()

def test_global_instance(self):
# Verify that we get the same instance back every time.
Expand Down Expand Up @@ -266,3 +275,35 @@ def subsystem_dependencies(cls):

with self.assertRaises(SubsystemClientMixin.CycleException):
list(SubsystemB.subsystem_closure_iter())

def test_get_streaming_workunit_callbacks(self):
import_str = "pants_test.subsystem.test_subsystem.WorkunitSubscriptableSubsystem"
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
assert len(callables_list) == 1

def test_streaming_workunit_callbacks_bad_module(self):
import_str = "nonexistent_module.AClassThatDoesntActuallyExist"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert len(callables_list) == 0
assert "No module named 'nonexistent_module'" in warnings[0]

def test_streaming_workunit_callbacks_good_module_bad_class(self):
import_str = "pants_test.subsystem.test_subsystem.ANonexistentClass"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert len(callables_list) == 0
assert "module 'pants_test.subsystem.test_subsystem' has no attribute 'ANonexistentClass'" in warnings[0]

def test_streaming_workunit_callbacks_with_invalid_subsystem(self):
import_str = "pants_test.subsystem.test_subsystem.DummySubsystem"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert "does not have a method named `handle_workunits` defined" in warnings[0]
assert len(callables_list) == 0

0 comments on commit 1dcae2f

Please sign in to comment.