Skip to content

Commit

Permalink
Connect to workunits
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Nov 27, 2019
1 parent 415538a commit 6897ef2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 15 deletions.
38 changes: 36 additions & 2 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import importlib
import logging
from contextlib import contextmanager
from typing import Callable, List

from pants.base.build_environment import get_buildroot
from pants.base.cmd_line_spec_parser import CmdLineSpecParser
Expand Down Expand Up @@ -112,7 +114,7 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,
# be merged with the zipkin_trace_v2 flag, since they both involve most
# of the same engine functionality, but for now is separate to avoid
# breaking functionality associated with zipkin tracing while iterating on streaming workunit reporting.
stream_workunits = options.for_scope('reporting').stream_workunits
stream_workunits = len(options.for_global_scope().streaming_workunits_handlers) != 0
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits=stream_workunits)
return graph_session, graph_session.scheduler_session

Expand Down Expand Up @@ -314,13 +316,45 @@ def _update_stats(self):
if engine_workunits:
self._run_tracker.report.bulk_record_workunits(engine_workunits)

@staticmethod
def get_streaming_workunit_callbacks(subsystem_names: List[str]) -> List[Callable]:
callables = []

for name in subsystem_names:
try:
module_name = '.'.join(name.split(".")[:-1])
class_name = name.split(".")[-1]
module = importlib.import_module(module_name)
subsystem_class = getattr(module, class_name)
except (IndexError, AttributeError) as e:
logger.warning(f"Invalid module name: {name}: {e}")
continue
except ImportError as e:
logger.warning(f"Could not import {module_name}: {e}")
continue
try:
subsystem = subsystem_class.global_instance()
except AttributeError:
logger.warning(f"{subsystem_class} is not a global subsystem")
continue

try:
callables.append(subsystem.handle_workunits)
except AttributeError:
logger.warning(f"{subsystem_class} does not have a `handle_workunits` method")
continue

return callables

def _run(self):
engine_result = PANTS_FAILED_EXIT_CODE
goal_runner_result = PANTS_FAILED_EXIT_CODE
try:
self._maybe_handle_help()

streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callback=None)
streaming_handlers = self._options.for_global_scope().streaming_workunits_handlers
callbacks = self.get_streaming_workunit_callbacks(streaming_handlers)
streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callbacks=callbacks)
with streaming_reporter.session():
engine_result = self._maybe_run_v2()

Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ def register_options(cls, register):
register('--lock', advanced=True, type=bool, default=True,
help='Use a global lock to exclude other versions of pants from running during '
'critical operations.')
register('--streaming-workunits-handlers', type=list, member_type=str, default=[],
advanced=True, help="Use this option to name Subsystems which will receive streaming workunit events")

@classmethod
def validate_instance(cls, opts):
Expand Down
2 changes: 0 additions & 2 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def register_options(cls, register):
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')
register('--stream-workunits', advanced=True, type=bool, default=False,
help="If set to true, report workunit information while pants is running")

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
21 changes: 11 additions & 10 deletions src/python/pants/reporting/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@

import threading
from contextlib import contextmanager
from typing import Any, Callable, Iterator, Optional
from typing import Any, Callable, Iterator, List, Optional


DEFAULT_REPORT_INTERVAL_SECONDS = 10


class StreamingWorkunitHandler:
def __init__(self, scheduler: Any, callback: Optional[Callable], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
def __init__(self, scheduler: Any, callbacks: List[Callable] = [], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
self.callback = callback
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None

def start(self) -> None:
if self.callback is not None:
self._thread_runner = _InnerHandler(self.scheduler, self.callback, self.report_interval)
if self.callbacks:
self._thread_runner = _InnerHandler(self.scheduler, self.callbacks, self.report_interval)
self._thread_runner.start()

def end(self) -> None:
Expand All @@ -28,8 +28,8 @@ def end(self) -> None:
# After stopping the thread, poll workunits one last time to make sure
# we report any workunits that were added after the last time the thread polled.
workunits = self.scheduler.poll_workunits()
if self.callback:
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)

@contextmanager
def session(self) -> Iterator[None]:
Expand All @@ -44,17 +44,18 @@ def session(self) -> Iterator[None]:


class _InnerHandler(threading.Thread):
def __init__(self, scheduler: Any, callback: Callable, report_interval: float):
def __init__(self, scheduler: Any, callbacks: List[Callable], report_interval: float):
super(_InnerHandler, self).__init__()
self.scheduler = scheduler
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callback = callback
self.callbacks = callbacks

def run(self):
while not self.stop_request.isSet():
workunits = self.scheduler.poll_workunits()
self.callback(workunits)
for callback in self.callbacks:
callback(workunits)
self.stop_request.wait(timeout=self.report_interval)

def join(self, timeout=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/python/pants_test/engine/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def add(self, workunits) -> None:
self.workunits.extend(workunits)

tracker = Tracker()
async_reporter = StreamingWorkunitHandler(scheduler, callback=tracker.add, report_interval_seconds=0.01)
async_reporter = StreamingWorkunitHandler(scheduler, callbacks=[tracker.add], report_interval_seconds=0.01)
with async_reporter.session():
scheduler.product_request(Fib, subjects=[0])

Expand Down

0 comments on commit 6897ef2

Please sign in to comment.