Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to attach Subsystems to streaming workunits #8720

Merged
merged 6 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.reporting.reporting import Reporting
from pants.reporting.streaming_workunit_handler import StreamingWorkunitHandler
from pants.subsystem.subsystem import Subsystem
from pants.util.contextutil import maybe_profiled


Expand Down Expand Up @@ -112,7 +113,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,
# be merged with the zipkin_trace_v2 flag, since they both involve most
# of the same engine functionality, but for now is separate to avoid
# breaking functionality associated with zipkin tracing while iterating on streaming workunit reporting.
stream_workunits = options.for_scope('reporting').stream_workunits
stream_workunits = len(options.for_global_scope().streaming_workunits_handlers) != 0
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits=stream_workunits)
return graph_session, graph_session.scheduler_session

Expand Down Expand Up @@ -320,7 +321,9 @@ def _run(self):
try:
self._maybe_handle_help()

streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callback=None)
streaming_handlers = self._options.for_global_scope().streaming_workunits_handlers
callbacks = Subsystem.get_streaming_workunit_callbacks(streaming_handlers)
streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callbacks=callbacks)
with streaming_reporter.session():
engine_result = self._maybe_run_v2()

Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ def register_options(cls, register):
register('--lock', advanced=True, type=bool, default=True,
help='Use a global lock to exclude other versions of pants from running during '
'critical operations.')
register('--streaming-workunits-handlers', type=list, member_type=str, default=[],
advanced=True,
help="Use this option to name Subsystems which will receive streaming workunit events. "
"For instance, `--streaming-workunits-handlers=\"['pants.reporting.workunit.Workunits']\"` will "
"register a Subsystem called Workunits defined in the module \"pants.reporting.workunit\"."
)

@classmethod
def validate_instance(cls, opts):
Expand Down
3 changes: 1 addition & 2 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ def prepare_v1_graph_run_v2(self, options, options_bootstrapper):
build_id = RunTracker.global_instance().run_id
v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
stream_workunits = options.for_scope('reporting').stream_workunits
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui, should_report_workunits=stream_workunits)
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui)

if options.for_global_scope().loop:
fn = self._loop
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def register_options(cls, register):
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')
register('--stream-workunits', advanced=True, type=bool, default=False,
help="If set to true, report workunit information while pants is running")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary in this case, but just bringing it up in case it is: Should this option removal follow some kind of deprecation cycle?


def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
21 changes: 11 additions & 10 deletions src/python/pants/reporting/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

import threading
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Optional
from typing import Any, Callable, Iterable, Iterator, Optional


DEFAULT_REPORT_INTERVAL_SECONDS = 10


class StreamingWorkunitHandler:
def __init__(self, scheduler: Any, callback: Optional[Callable], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable] = (), report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
self.callback = callback
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None

def start(self) -> None:
if self.callback is not None:
self._thread_runner = _InnerHandler(self.scheduler, self.callback, self.report_interval)
if self.callbacks:
self._thread_runner = _InnerHandler(self.scheduler, self.callbacks, self.report_interval)
self._thread_runner.start()

def end(self) -> None:
Expand All @@ -28,8 +28,8 @@ def end(self) -> None:
# After stopping the thread, poll workunits one last time to make sure
# we report any workunits that were added after the last time the thread polled.
workunits = self.scheduler.poll_workunits()
if self.callback:
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)

@contextmanager
def session(self) -> Iterator[None]:
Expand All @@ -44,17 +44,18 @@ def session(self) -> Iterator[None]:


class _InnerHandler(threading.Thread):
def __init__(self, scheduler: Any, callback: Callable, report_interval: float):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable], report_interval: float):
super(_InnerHandler, self).__init__()
self.scheduler = scheduler
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callback = callback
self.callbacks = callbacks

def run(self):
while not self.stop_request.isSet():
workunits = self.scheduler.poll_workunits()
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)
self.stop_request.wait(timeout=self.report_interval)

def join(self, timeout=None):
Expand Down
59 changes: 58 additions & 1 deletion src/python/pants/subsystem/subsystem.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import importlib
import inspect
from typing import Dict, Optional, Tuple, Type, TypeVar, Union, cast
import logging
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, TypeVar, Union, cast

from pants.option.optionable import Optionable
from pants.option.options import Options
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin, SubsystemDependency


logger = logging.getLogger(__name__)
gshuflin marked this conversation as resolved.
Show resolved Hide resolved


class SubsystemError(Exception):
"""An error in a subsystem."""

Expand Down Expand Up @@ -169,3 +174,55 @@ def get_options(self):
:API: public
"""
return self._scoped_options

@staticmethod
def get_streaming_workunit_callbacks(subsystem_names: Iterable[str]) -> List[Callable]:
"""
This method is used to dynamically generate a list of callables
intended to be passed to StreamingWorkunitHandler. The caller provides a
collection of strings representing a Python import path to a class that
implements the `Subsystem` class. It will then inspect these classes for
the presence of a special method called `handle_workunits`, which expects a
single non-self argument - namely, a tuple of Python dictionaries
representing workunits.

For instance, you might invoke this method with something like:

`Subsystem.get_streaming_workunit_callbacks(["pants.reporting.workunits.Workunit"])`

And this will result in the method attempting to dynamically-import a
module called "pants.reporting.workunits", inspecting it for the presence
of a class called `Workunit`, getting a global instance of this Subsystem,
and returning a list containing a single reference to the
`handle_workunits` method defined on it - and returning an empty list and
emitting warnings if any of these steps fail.
"""
gshuflin marked this conversation as resolved.
Show resolved Hide resolved

callables = []

for name in subsystem_names:
try:
name_components = name.split(".")
module_name = ".".join(name_components[:-1])
class_name = name_components[-1]
module = importlib.import_module(module_name)
subsystem_class = getattr(module, class_name)
except (IndexError, AttributeError, ModuleNotFoundError, ValueError) as e:
logger.warning(f"Invalid module name: {name}: {e}")
continue
except ImportError as e:
logger.warning(f"Could not import {module_name}: {e}")
continue
try:
subsystem = subsystem_class.global_instance()
except AttributeError:
logger.warning(f"{subsystem_class} is not a global subsystem.")
continue

try:
callables.append(subsystem.handle_workunits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some way we can define an interface or mixin or something of the sort to capture all these constraints? (i.e. the component should have a method called handle_workunits, it should be a global subsystem...).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add checking for the signature of handle_workunits?

except AttributeError:
logger.warning(f"{subsystem_class} does not have a method named `handle_workunits` defined.")
continue

return callables
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def add(self, workunits) -> None:
self.workunits.extend(workunits)

tracker = Tracker()
async_reporter = StreamingWorkunitHandler(scheduler, callback=tracker.add, report_interval_seconds=0.01)
async_reporter = StreamingWorkunitHandler(scheduler, callbacks=[tracker.add], report_interval_seconds=0.01)
with async_reporter.session():
scheduler.product_request(Fib, subjects=[0])

Expand Down
2 changes: 1 addition & 1 deletion tests/python/pants_test/subsystem/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ python_tests(
dependencies=[
'src/python/pants/option',
'src/python/pants/subsystem',
'src/python/pants/testutil:test_base',
],
tags = {"partially_type_checked"},
)

python_library(
Expand Down
45 changes: 43 additions & 2 deletions tests/python/pants_test/subsystem/test_subsystem.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import unittest
import logging

from pants.option.optionable import Optionable
from pants.option.scope import ScopeInfo
from pants.subsystem.subsystem import Subsystem
from pants.subsystem.subsystem_client_mixin import SubsystemClientMixin
from pants.testutil.test_base import TestBase


class WorkunitSubscriptableSubsystem(Subsystem):
options_scope = "dummy scope"

def handle_workunits(self, workunits):
pass


class DummySubsystem(Subsystem):
Expand Down Expand Up @@ -41,9 +49,10 @@ def si(scope, subsystem_cls):
return ScopeInfo(scope, ScopeInfo.SUBSYSTEM, subsystem_cls)


class SubsystemTest(unittest.TestCase):
class SubsystemTest(TestBase):
def setUp(self):
DummySubsystem._options = DummyOptions()
WorkunitSubscriptableSubsystem._options = DummyOptions()

def test_global_instance(self):
# Verify that we get the same instance back every time.
Expand Down Expand Up @@ -266,3 +275,35 @@ def subsystem_dependencies(cls):

with self.assertRaises(SubsystemClientMixin.CycleException):
list(SubsystemB.subsystem_closure_iter())

def test_get_streaming_workunit_callbacks(self):
import_str = "pants_test.subsystem.test_subsystem.WorkunitSubscriptableSubsystem"
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
assert len(callables_list) == 1
Eric-Arellano marked this conversation as resolved.
Show resolved Hide resolved

def test_streaming_workunit_callbacks_bad_module(self):
import_str = "nonexistent_module.AClassThatDoesntActuallyExist"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert len(callables_list) == 0
assert "No module named 'nonexistent_module'" in warnings[0]

def test_streaming_workunit_callbacks_good_module_bad_class(self):
import_str = "pants_test.subsystem.test_subsystem.ANonexistentClass"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
warnings = captured.warnings()
assert len(warnings) == 1
assert len(callables_list) == 0
assert "module 'pants_test.subsystem.test_subsystem' has no attribute 'ANonexistentClass'" in warnings[0]

def test_streaming_workunit_callbacks_with_invalid_subsystem(self):
gshuflin marked this conversation as resolved.
Show resolved Hide resolved
import_str = "pants_test.subsystem.test_subsystem.DummySubsystem"
with self.captured_logging(level = logging.WARNING) as captured:
callables_list = Subsystem.get_streaming_workunit_callbacks([import_str])
Eric-Arellano marked this conversation as resolved.
Show resolved Hide resolved
warnings = captured.warnings()
assert len(warnings) == 1
assert "does not have a method named `handle_workunits` defined" in warnings[0]
Eric-Arellano marked this conversation as resolved.
Show resolved Hide resolved
assert len(callables_list) == 0