Skip to content

Commit

Permalink
Hook up streaming workunits to subsystems
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Dec 3, 2019
1 parent 17df302 commit b7c5fb5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 18 deletions.
7 changes: 5 additions & 2 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.reporting.reporting import Reporting
from pants.reporting.streaming_workunit_handler import StreamingWorkunitHandler
from pants.subsystem.subsystem import Subsystem
from pants.util.contextutil import maybe_profiled


Expand Down Expand Up @@ -112,7 +113,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,
# be merged with the zipkin_trace_v2 flag, since they both involve most
# of the same engine functionality, but for now is separate to avoid
# breaking functionality associated with zipkin tracing while iterating on streaming workunit reporting.
stream_workunits = options.for_scope('reporting').stream_workunits
stream_workunits = len(options.for_global_scope().streaming_workunits_handlers) != 0
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits=stream_workunits)
return graph_session, graph_session.scheduler_session

Expand Down Expand Up @@ -320,7 +321,9 @@ def _run(self):
try:
self._maybe_handle_help()

streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callback=None)
streaming_handlers = self._options.for_global_scope().streaming_workunits_handlers
callbacks = Subsystem.get_streaming_workunit_callbacks(streaming_handlers)
streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callbacks=callbacks)
with streaming_reporter.session():
engine_result = self._maybe_run_v2()

Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ def register_options(cls, register):
register('--lock', advanced=True, type=bool, default=True,
help='Use a global lock to exclude other versions of pants from running during '
'critical operations.')
register('--streaming-workunits-handlers', type=list, member_type=str, default=[],
advanced=True,
help="Use this option to name Subsystems which will receive streaming workunit events. "
"""For instance, `--streaming-workunits-handlers="['pants.reporting.workunit.Workunits']"` will """
"""register a Subsystem called Workunits defined in the module "pants.reporting.workunit"."""
)

@classmethod
def validate_instance(cls, opts):
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def register_options(cls, register):
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')
register('--stream-workunits', advanced=True, type=bool, default=False,
help="If set to true, report workunit information while pants is running")

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
21 changes: 11 additions & 10 deletions src/python/pants/reporting/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

import threading
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Optional
from typing import Any, Callable, Iterable, Iterator, Optional


DEFAULT_REPORT_INTERVAL_SECONDS = 10


class StreamingWorkunitHandler:
def __init__(self, scheduler: Any, callback: Optional[Callable], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable] = (), report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
self.callback = callback
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None

def start(self) -> None:
if self.callback is not None:
self._thread_runner = _InnerHandler(self.scheduler, self.callback, self.report_interval)
if self.callbacks:
self._thread_runner = _InnerHandler(self.scheduler, self.callbacks, self.report_interval)
self._thread_runner.start()

def end(self) -> None:
Expand All @@ -28,8 +28,8 @@ def end(self) -> None:
# After stopping the thread, poll workunits one last time to make sure
# we report any workunits that were added after the last time the thread polled.
workunits = self.scheduler.poll_workunits()
if self.callback:
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)

@contextmanager
def session(self) -> Iterator[None]:
Expand All @@ -44,17 +44,18 @@ def session(self) -> Iterator[None]:


class _InnerHandler(threading.Thread):
def __init__(self, scheduler: Any, callback: Callable, report_interval: float):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable], report_interval: float):
super(_InnerHandler, self).__init__()
self.scheduler = scheduler
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callback = callback
self.callbacks = callbacks

def run(self):
while not self.stop_request.isSet():
workunits = self.scheduler.poll_workunits()
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)
self.stop_request.wait(timeout=self.report_interval)

def join(self, timeout=None):
Expand Down
59 changes: 58 additions & 1 deletion src/python/pants/subsystem/subsystem.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import importlib
import inspect
from typing import Dict, Optional, Tuple, Type, TypeVar, Union, cast
import logging
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, cast

from pants.option.optionable import Optionable
from pants.option.options import Options
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin, SubsystemDependency


logger = logging.getLogger(__name__)


class SubsystemError(Exception):
"""An error in a subsystem."""

Expand Down Expand Up @@ -169,3 +174,55 @@ def get_options(self):
:API: public
"""
return self._scoped_options

@staticmethod
def get_streaming_workunit_callbacks(subsystem_names: Iterable[str]) -> List[Callable]:

"""
This method is used to dynamically generate a list of callables
intended to be passed to StreamingWorkunitHandler. The caller provides a
collection of strings representing a Python import path to a class that
implements the `Subsystem` class. It will then inspect these classes for
the presence of a special method called `handle_workunits`, which expects a
single non-self argument - namely, a tuple of Python dictionaries
representing workunits.
For instance, you might invoke this method with something like:
`Subsystem.get_streaming_workunit_callbacks(["pants.reporting.workunits.Workunit"])`
And this will result in the method attempting to dynamically-import a
module called "pants.reporting.workunits", inspecting it for the presence
of a class called `Workunit`, getting a global instance of this Subsystem,
and returning a list containing a single reference to the
`handle_workunits` method defined on it - and returning an empty list and
emitting warnings if any of these steps fail.
"""

callables = []

for name in subsystem_names:
try:
module_name = '.'.join(name.split(".")[:-1])
class_name = name.split(".")[-1]
module = importlib.import_module(module_name)
subsystem_class = getattr(module, class_name)
except (IndexError, AttributeError, ModuleNotFoundError, ValueError) as e:
logger.warning(f"Invalid module name: {name}: {e}")
continue
except ImportError as e:
logger.warning(f"Could not import {module_name}: {e}")
continue
try:
subsystem = subsystem_class.global_instance()
except AttributeError:
logger.warning(f"{subsystem_class} is not a global subsystem.")
continue

try:
callables.append(subsystem.handle_workunits)
except AttributeError:
logger.warning(f"{subsystem_class} does not have a method named `handle_workunits` defined.")
continue

return callables
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def add(self, workunits) -> None:
self.workunits.extend(workunits)

tracker = Tracker()
async_reporter = StreamingWorkunitHandler(scheduler, callback=tracker.add, report_interval_seconds=0.01)
async_reporter = StreamingWorkunitHandler(scheduler, callbacks=[tracker.add], report_interval_seconds=0.01)
with async_reporter.session():
scheduler.product_request(Fib, subjects=[0])

Expand Down
1 change: 1 addition & 0 deletions tests/python/pants_test/subsystem/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ python_tests(
dependencies=[
'src/python/pants/option',
'src/python/pants/subsystem',
'src/python/pants/testutil:test_base',
],
)

Expand Down
27 changes: 25 additions & 2 deletions tests/python/pants_test/subsystem/test_subsystem.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import unittest
import logging

from pants.option.optionable import Optionable
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem import Subsystem
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin
from pants.testutil.test_base import TestBase


class WorkunitSubscriptableSubsystem(Subsystem):
options_scope = "dummy scope"

def handle_workunits(self, workunits):
pass


class DummySubsystem(Subsystem):
Expand Down Expand Up @@ -41,9 +49,10 @@ def si(scope, subsystem_cls):
return ScopeInfo(scope, ScopeInfo.SUBSYSTEM, subsystem_cls)


class SubsystemTest(unittest.TestCase):
class SubsystemTest(TestBase):
def setUp(self):
DummySubsystem._options = DummyOptions()
WorkunitSubscriptableSubsystem._options = DummyOptions()

def test_global_instance(self):
# Verify that we get the same instance back every time.
Expand Down Expand Up @@ -266,3 +275,17 @@ def subsystem_dependencies(cls):

with self.assertRaises(SubsystemClientMixin.CycleException):
list(SubsystemB.subsystem_closure_iter())

def test_get_streaming_workunit_callbacks(self):
import_str = "pants_test.subsystem.test_subsystem.WorkunitSubscriptableSubsystem"
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
assert len(callables_list) == 1

def test_streaming_workunit_callbacks_with_invalid_subsystem(self):
import_str = "pants_test.subsystem.test_subsystem.DummySubsystem"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert "does not have a method named `handle_workunits` defined" in warnings[0]
assert len(callables_list) == 0

0 comments on commit b7c5fb5

Please sign in to comment.