Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early support for @console_rule. #6088

Merged
merged 7 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 89 additions & 24 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from setproctitle import setproctitle as set_process_title

from pants.base.build_environment import get_buildroot
from pants.base.exiter import Exiter
from pants.bin.local_pants_runner import LocalPantsRunner
from pants.init.util import clean_global_runtime_state
Expand Down Expand Up @@ -72,30 +73,69 @@ class DaemonPantsRunner(ProcessManager):
N.B. this class is primarily used by the PailgunService in pantsd.
"""

def __init__(self, socket, exiter, args, env, target_roots, graph_helper, fork_lock,
deferred_exception=None):
@classmethod
def create(cls, sock, args, env, fork_lock, scheduler_service):
try:
# N.B. This will temporarily redirect stdio in the daemon's pre-fork context
# to the nailgun session. We'll later do this a second time post-fork, because
# threads.
with cls.nailgunned_stdio(sock, env, handle_stdin=False):
options, build_config, options_bootstrapper = LocalPantsRunner.parse_options(args, env)
subprocess_dir = options.for_global_scope().pants_subprocessdir
graph_helper, target_roots = scheduler_service.prefork(options, build_config)
deferred_exc = None
except Exception:
deferred_exc = sys.exc_info()
graph_helper = None
target_roots = None
options_bootstrapper = None
# N.B. This will be overridden with the correct value if options
# parsing is successful - otherwise it permits us to run just far
# enough to raise the deferred exception.
subprocess_dir = os.path.join(get_buildroot(), '.pids')

return cls(
sock,
args,
env,
fork_lock,
graph_helper,
target_roots,
subprocess_dir,
options_bootstrapper,
deferred_exc
)

def __init__(self, socket, args, env, fork_lock, graph_helper, target_roots,
metadata_base_dir, options_bootstrapper, deferred_exc):
"""
:param socket socket: A connected socket capable of speaking the nailgun protocol.
:param Exiter exiter: The Exiter instance for this run.
:param list args: The arguments (i.e. sys.argv) for this run.
:param dict env: The environment (i.e. os.environ) for this run.
:param TargetRoots target_roots: The `TargetRoots` for this run.
:param threading.RLock fork_lock: A lock to use during forking for thread safety.
:param LegacyGraphSession graph_helper: The LegacyGraphSession instance to use for BuildGraph
construction. In the event of an exception, this will be
None.
:param threading.RLock fork_lock: A lock to use during forking for thread safety.
:param Exception deferred_exception: A deferred exception from the daemon's graph construction.
:param TargetRoots target_roots: The `TargetRoots` for this run.
:param str metadata_base_dir: The ProcessManager metadata_base_dir from options.
:param OptionsBootstrapper options_bootstrapper: An OptionsBootstrapper to reuse.
:param Exception deferred_exception: A deferred exception from the daemon's pre-fork context.
If present, this will be re-raised in the client context.
"""
super(DaemonPantsRunner, self).__init__(name=self._make_identity())
super(DaemonPantsRunner, self).__init__(
name=self._make_identity(),
metadata_base_dir=metadata_base_dir
)
self._socket = socket
self._exiter = exiter
self._args = args
self._env = env
self._target_roots = target_roots
self._graph_helper = graph_helper
self._fork_lock = fork_lock
self._deferred_exception = deferred_exception
self._graph_helper = graph_helper
self._target_roots = target_roots
self._options_bootstrapper = options_bootstrapper
self._deferred_exception = deferred_exc

self._exiter = DaemonExiter(socket)

def _make_identity(self):
"""Generate a ProcessManager identity for a given pants run.
Expand All @@ -104,15 +144,16 @@ def _make_identity(self):
"""
return 'pantsd-run-{}'.format(datetime.datetime.now().strftime('%Y-%m-%dT%H_%M_%S_%f'))

@classmethod
@contextmanager
def _tty_stdio(self):
def _tty_stdio(cls, env):
"""Handles stdio redirection in the case of all stdio descriptors being the same tty."""
# If all stdio is a tty, there's only one logical I/O device (the tty device). This happens to
# be addressable as a file in OSX and Linux, so we take advantage of that and directly open the
# character device for output redirection - eliminating the need to directly marshall any
# interactive stdio back/forth across the socket and permitting full, correct tty control with
# no middle-man.
stdin_ttyname, stdout_ttyname, stderr_ttyname = NailgunProtocol.ttynames_from_env(self._env)
stdin_ttyname, stdout_ttyname, stderr_ttyname = NailgunProtocol.ttynames_from_env(env)
assert stdin_ttyname == stdout_ttyname == stderr_ttyname, (
'expected all stdio ttys to be the same, but instead got: {}\n'
'please file a bug at http://github.com/pantsbuild/pants'
Expand All @@ -125,15 +166,29 @@ def finalizer():
termios.tcdrain(tty_fileno)
yield finalizer

@classmethod
@contextmanager
def _pipe_stdio(self, sock, stdin_isatty, stdout_isatty, stderr_isatty):
def _pipe_stdio(cls, sock, stdin_isatty, stdout_isatty, stderr_isatty, handle_stdin):
"""Handles stdio redirection in the case of pipes and/or mixed pipes and ttys."""
stdio_writers = (
(ChunkType.STDOUT, stdout_isatty),
(ChunkType.STDERR, stderr_isatty)
)
types, ttys = zip(*(stdio_writers))
with NailgunStreamStdinReader.open(sock, stdin_isatty) as stdin_fd,\

@contextmanager
def maybe_handle_stdin(want):
if want:
# TODO: Launching this thread pre-fork to handle @rule input currently results
# in an unhandled SIGILL in `src/python/pants/engine/scheduler.py, line 313 in pre_fork`.
# More work to be done here in https://github.com/pantsbuild/pants/issues/6005
with NailgunStreamStdinReader.open(sock, stdin_isatty) as fd:
yield fd
else:
with open('/dev/null', 'rb') as fh:
yield fh.fileno()

with maybe_handle_stdin(handle_stdin) as stdin_fd,\
NailgunStreamWriter.open_multi(sock, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
Expand All @@ -151,17 +206,25 @@ def finalizer():
stderr.close()
yield finalizer

@classmethod
@contextmanager
def _nailgunned_stdio(self, sock):
def nailgunned_stdio(cls, sock, env, handle_stdin=True):
"""Redirects stdio to the connected socket speaking the nailgun protocol."""
# Determine output tty capabilities from the environment.
stdin_isatty, stdout_isatty, stderr_isatty = NailgunProtocol.isatty_from_env(self._env)
stdin_isatty, stdout_isatty, stderr_isatty = NailgunProtocol.isatty_from_env(env)
is_tty_capable = all((stdin_isatty, stdout_isatty, stderr_isatty))

if all((stdin_isatty, stdout_isatty, stderr_isatty)):
with self._tty_stdio() as finalizer:
if is_tty_capable:
with cls._tty_stdio(env) as finalizer:
yield finalizer
else:
with self._pipe_stdio(sock, stdin_isatty, stdout_isatty, stderr_isatty) as finalizer:
with cls._pipe_stdio(
sock,
stdin_isatty,
stdout_isatty,
stderr_isatty,
handle_stdin
) as finalizer:
yield finalizer

def _setup_sigint_handler(self):
Expand Down Expand Up @@ -225,7 +288,8 @@ def post_fork_child(self):
NailgunProtocol.send_pid(self._socket, bytes(os.getpgrp() * -1))

# Invoke a Pants run with stdio redirected and a proxied environment.
with self._nailgunned_stdio(self._socket) as finalizer, hermetic_environment_as(**self._env):
with self.nailgunned_stdio(self._socket, self._env) as finalizer,\
hermetic_environment_as(**self._env):
try:
# Setup the Exiter's finalizer.
self._exiter.set_finalizer(finalizer)
Expand All @@ -237,12 +301,13 @@ def post_fork_child(self):
self._raise_deferred_exc()

# Otherwise, conduct a normal run.
runner = LocalPantsRunner(
runner = LocalPantsRunner.create(
self._exiter,
self._args,
self._env,
target_roots=self._target_roots,
daemon_build_graph=self._graph_helper
self._target_roots,
self._graph_helper,
self._options_bootstrapper
)
runner.set_start_time(self._maybe_get_client_start_time_from_env(self._env))
runner.run()
Expand Down
Loading