Skip to content

Commit

Permalink
Allow nailgun execution for RscCompile by bundling together the tool …
Browse files Browse the repository at this point in the history
…classpaths (#7092)

*Resolves #7089.*

### Problem

`RscCompile` is the one task in pants which invokes multiple JVM tools over the course of its run, as a consequence of [using outlining to generate semantic information with rsc which zinc can compile against](https://github.com/twitter/rsc/blob/master/docs/compiler.md#typechecking-in-rsc-mid-2018). This doesn't play nice with our `NailgunExecutor`, and causes error messages like the following when `--worker-count` > 1 (see #7089):

```
metacp(jdk) failed: Problem launching via NailgunClient(host=u'127.0.0.1', port=55511, workdir=u'/Users/dmcclanahan/tools/pants') command scala.meta.cli.Metacp --verbose --out .pants.d/tmp/tmpX8iJid.pants.d/compile/rsc/a04416cba788/--jdk--/index /Library/Java/JavaVirtualMachines/TwitterJDK/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/TwitterJDK/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/TwitterJDK/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/TwitterJDK/Contents/Home/lib/tools.jar: (u'Problem talking to nailgun server (address: 127.0.0.1:55511, remote_pid=<remote PID chunk not yet received!>, remote_pgrp=<remote PGRP chunk not yet received!>): TruncatedHeaderError(u"Failed to read nailgun chunk header (TruncatedRead(u\'Expected 5 bytes before socket shutdown, instead received 0\',)).",)', TruncatedHeaderError(u"Failed to read nailgun chunk header (TruncatedRead(u'Expected 5 bytes before socket shutdown, instead received 0',)).",))
                     E   	                   Traceback:
                     E   	                     File "/Users/dmcclanahan/tools/pants/src/python/pants/backend/jvm/tasks/jvm_compile/execution_graph.py", line 276, in worker
                     E   	                       work()
                     E   	                   
                     E   	                     File "/Users/dmcclanahan/tools/pants/src/python/pants/backend/jvm/tasks/jvm_compile/execution_graph.py", line 44, in __call__
                     E   	                       self.fn()
                     E   	                   
                     E   	                     File "/Users/dmcclanahan/tools/pants/src/python/pants/backend/jvm/tasks/jvm_compile/rsc/rsc_compile.py", line 339, in work_for_vts_rsc_jdk
                     E   	                       output_dir=rsc_index_dir)
                     E   	                   
                     E   	                     File "/Users/dmcclanahan/tools/pants/src/python/pants/backend/jvm/tasks/jvm_compile/rsc/rsc_compile.py", line 819, in _runtool
                     E   	                       dist=distribution
                     E   	                   
                     E   	                     File "/Users/dmcclanahan/tools/pants/src/python/pants/backend/jvm/tasks/nailgun_task.py", line 111, in runjava
                     E   	                       raise TaskError(e)
                     E   	                   [info] Compiling 1 Java source to /Users/dmcclanahan/tools/pants/.pants.d/tmp/tmpX8iJid.pants.d/compile/rsc/a04416cba788/examples.src.java.org.pantsbuild.example.hello.greet.greet/current/zinc/classes ...
                     E   	                       [info] Done compiling.
                     E   	                       [info] Compile success at Jan 16, 2019 6:36:30 PM [2.258s]
                     E   	                       
                     E   	FAILURE: Compilation failure: Failed jobs: metacp(jdk)
```

### Solution

- Introduce `JvmToolMixin.register_combined_jvm_tools()` as an API for accessing specific JVM tools as a combined classpath (a suggestion from @xeno-by) but different main classes, allowing the use of a single nailgun instance (resolving #7089).
- Introduce `NailgunTaskBase#do_for_execution_strategy_variant()` which allows specifying different actions to perform for different values of the `--execution-strategy` option in a structured way.
- Use the above to add a code path for a nailgun execution for `RscCompile`, and add testing for `--worker-count` > 1.
- Introduce `ZincCompile#get_zinc_compiler_classpath()` to allow `RscCompile` to override it with the combined classpath to persist the nailgun.
- Remove the mysterious `or self.cmd != self._distribution.java` in `nailgun_executor.py` -- this is necessary for this PR to work, but an option can be plumbed in if it breaks pantsd (this might motivate #6579).

### Result

`RscCompile` can be invoked with > 1 nailgun instance at a time by bundling all the tool jars into a single classpath, without affecting the performance for any of the other execution strategies.
  • Loading branch information
cosmicexplorer authored Jan 18, 2019
1 parent 0375b30 commit feed6e0
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 91 deletions.
204 changes: 122 additions & 82 deletions src/python/pants/backend/jvm/tasks/jvm_compile/rsc/rsc_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pants.util.contextutil import Timer
from pants.util.dirutil import (fast_relpath, fast_relpath_optional, maybe_read_file,
safe_file_dump, safe_mkdir)
from pants.util.memo import memoized_property


#
Expand Down Expand Up @@ -150,7 +151,8 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('rsc', recursive=True),
])
]
)
cls.register_jvm_tool(
register,
'metacp',
Expand All @@ -163,7 +165,8 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('scala', recursive=True),
])
]
)
cls.register_jvm_tool(
register,
'metai',
Expand All @@ -176,7 +179,31 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('scala', recursive=True),
])
]
)

# TODO: allow @memoized_method to convert lists into tuples so they can be hashed!
@memoized_property
def _nailgunnable_combined_classpath(self):
"""Register all of the component tools of the rsc compile task as a "combined" jvm tool.
This allows us to invoke their combined classpath in a single nailgun instance (see #7089 and
#7092). We still invoke their classpaths separately when not using nailgun, however.
"""
cp = []
for component_tool_name in ['rsc', 'metai', 'metacp']:
cp.extend(self.tool_classpath(component_tool_name))
# Add zinc's classpath so that it can be invoked from the same nailgun instance.
cp.extend(super(RscCompile, self).get_zinc_compiler_classpath())
return cp

# Overrides the normal zinc compiler classpath, which only contains zinc.
def get_zinc_compiler_classpath(self):
return self.do_for_execution_strategy_variant({
self.HERMETIC: lambda: super(RscCompile, self).get_zinc_compiler_classpath(),
self.SUBPROCESS: lambda: super(RscCompile, self).get_zinc_compiler_classpath(),
self.NAILGUN: lambda: self._nailgunnable_combined_classpath,
})

def register_extra_products_from_contexts(self, targets, compile_contexts):
super(RscCompile, self).register_extra_products_from_contexts(targets, compile_contexts)
Expand Down Expand Up @@ -747,87 +774,100 @@ def create_compile_context(self, target, target_workdir):
)
]

def _runtool(
self, main, tool_name, args, distribution, tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
if self.execution_strategy == self.HERMETIC:
with self.context.new_workunit(tool_name) as wu:
tool_classpath_abs = self.tool_classpath(tool_name)
tool_classpath = fast_relpath_collection(tool_classpath_abs)
def _runtool_hermetic(self, main, tool_name, args, distribution, tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
tool_classpath_abs = self.tool_classpath(tool_name)
tool_classpath = fast_relpath_collection(tool_classpath_abs)

classpath_for_cmd = os.pathsep.join(tool_classpath)
cmd = [
distribution.java,
]
cmd.extend(self.get_options().jvm_options)
cmd.extend(['-cp', classpath_for_cmd])
cmd.extend([main])
cmd.extend(args)

pathglobs = list(tool_classpath)
pathglobs.extend(f if os.path.isfile(f) else '{}/**'.format(f) for f in input_files)

if pathglobs:
root = PathGlobsAndRoot(
PathGlobs(tuple(pathglobs)),
text_type(get_buildroot()))
# dont capture snapshot, if pathglobs is empty
path_globs_input_digest = self.context._scheduler.capture_snapshots((root,))[0].directory_digest

if path_globs_input_digest and input_digest:
epr_input_files = self.context._scheduler.merge_directories(
(path_globs_input_digest, input_digest))
else:
epr_input_files = path_globs_input_digest or input_digest

epr = ExecuteProcessRequest(
argv=tuple(cmd),
input_files=epr_input_files,
output_files=tuple(),
output_directories=(output_dir,),
timeout_seconds=15*60,
description='run {} for {}'.format(tool_name, tgt),
# TODO: These should always be unicodes
# Since this is always hermetic, we need to use `underlying_dist`
jdk_home=text_type(self._zinc.underlying_dist.home),
)
res = self.context.execute_process_synchronously_without_raising(
epr,
self.name(),
[WorkUnitLabel.TOOL])

if res.exit_code != 0:
raise TaskError(res.stderr)

if output_dir:
dump_digest(output_dir, res.output_directory_digest)
self.context._scheduler.materialize_directories((
DirectoryToMaterialize(
# NB the first element here is the root to materialize into, not the dir to snapshot
text_type(get_buildroot()),
res.output_directory_digest),
))
# TODO drop a file containing the digest, named maybe output_dir.digest
return res
classpath_for_cmd = os.pathsep.join(tool_classpath)
cmd = [
distribution.java,
]
cmd.extend(self.get_options().jvm_options)
cmd.extend(['-cp', classpath_for_cmd])
cmd.extend([main])
cmd.extend(args)

pathglobs = list(tool_classpath)
pathglobs.extend(f if os.path.isfile(f) else '{}/**'.format(f) for f in input_files)

if pathglobs:
root = PathGlobsAndRoot(
PathGlobs(tuple(pathglobs)),
text_type(get_buildroot()))
# dont capture snapshot, if pathglobs is empty
path_globs_input_digest = self.context._scheduler.capture_snapshots((root,))[0].directory_digest

if path_globs_input_digest and input_digest:
epr_input_files = self.context._scheduler.merge_directories(
(path_globs_input_digest, input_digest))
else:
with self.context.new_workunit(tool_name) as wu:
result = self.runjava(classpath=self.tool_classpath(tool_name),
main=main,
jvm_options=self.get_options().jvm_options,
args=args,
workunit_name=tool_name,
workunit_labels=[WorkUnitLabel.TOOL],
dist=distribution
)
if result != 0:
raise TaskError('Running {} failed'.format(tool_name))
runjava_wu = None
for c in wu.children:
if c.name is tool_name:
runjava_wu = c
break
if runjava_wu is None:
raise Exception('couldnt find work unit for underlying execution')
return runjava_wu
epr_input_files = path_globs_input_digest or input_digest

epr = ExecuteProcessRequest(
argv=tuple(cmd),
input_files=epr_input_files,
output_files=tuple(),
output_directories=(output_dir,),
timeout_seconds=15*60,
description='run {} for {}'.format(tool_name, tgt),
# TODO: These should always be unicodes
# Since this is always hermetic, we need to use `underlying_dist`
jdk_home=text_type(self._zinc.underlying_dist.home),
)
res = self.context.execute_process_synchronously_without_raising(
epr,
self.name(),
[WorkUnitLabel.TOOL])

if res.exit_code != 0:
raise TaskError(res.stderr)

if output_dir:
dump_digest(output_dir, res.output_directory_digest)
self.context._scheduler.materialize_directories((
DirectoryToMaterialize(
# NB the first element here is the root to materialize into, not the dir to snapshot
text_type(get_buildroot()),
res.output_directory_digest),
))
# TODO drop a file containing the digest, named maybe output_dir.digest
return res

# The classpath is parameterized so that we can have a single nailgun instance serving all of our
# execution requests.
def _runtool_nonhermetic(self, parent_workunit, classpath, main, tool_name, args, distribution):
result = self.runjava(classpath=classpath,
main=main,
jvm_options=self.get_options().jvm_options,
args=args,
workunit_name=tool_name,
workunit_labels=[WorkUnitLabel.TOOL],
dist=distribution
)
if result != 0:
raise TaskError('Running {} failed'.format(tool_name))
runjava_workunit = None
for c in parent_workunit.children:
if c.name is tool_name:
runjava_workunit = c
break
# TODO: figure out and document when would this happen.
if runjava_workunit is None:
raise Exception('couldnt find work unit for underlying execution')
return runjava_workunit

def _runtool(self, main, tool_name, args, distribution,
tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
with self.context.new_workunit(tool_name) as wu:
return self.do_for_execution_strategy_variant({
self.HERMETIC: lambda: self._runtool_hermetic(
main, tool_name, args, distribution,
tgt=tgt, input_files=input_files, input_digest=input_digest, output_dir=output_dir),
self.SUBPROCESS: lambda: self._runtool_nonhermetic(
wu, self.tool_classpath(tool_name), main, tool_name, args, distribution),
self.NAILGUN: lambda: self._runtool_nonhermetic(
wu, self._nailgunnable_combined_classpath, main, tool_name, args, distribution),
})

def _run_metai_tool(self,
distribution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def relative_to_exec_root(path):
# TODO: This should probably return a ClasspathEntry rather than a Digest
return res.output_directory_digest
else:
if self.runjava(classpath=[self._zinc.zinc],
if self.runjava(classpath=self.get_zinc_compiler_classpath(),
main=Zinc.ZINC_COMPILE_MAIN,
jvm_options=jvm_options,
args=zinc_args,
Expand All @@ -452,6 +452,16 @@ def relative_to_exec_root(path):
dist=self._zinc.dist):
raise TaskError('Zinc compile failed.')

def get_zinc_compiler_classpath(self):
"""Get the classpath for the zinc compiler JVM tool.
This will just be the zinc compiler tool classpath normally, but tasks which invoke zinc along
with other JVM tools with nailgun (such as RscCompile) require zinc to be invoked with this
method to ensure a single classpath is used for all the tools they need to invoke so that the
nailgun instance (which is keyed by classpath and JVM options) isn't invalidated.
"""
return [self._zinc.zinc]

def _verify_zinc_classpath(self, classpath, allow_dist=True):
def is_outside(path, putative_parent):
return os.path.relpath(path, putative_parent).startswith(os.pardir)
Expand Down
18 changes: 18 additions & 0 deletions src/python/pants/backend/jvm/tasks/nailgun_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ class NailgunTaskBase(JvmToolTaskMixin, TaskBase):
SUBPROCESS = 'subprocess'
HERMETIC = 'hermetic'

class InvalidExecutionStrategyMapping(Exception): pass

_all_execution_strategies = frozenset([NAILGUN, SUBPROCESS, HERMETIC])

def do_for_execution_strategy_variant(self, mapping):
"""Invoke the method in `mapping` with the key corresponding to the execution strategy.
`mapping` is a dict mapping execution strategy -> zero-argument lambda.
"""
variants = frozenset(mapping.keys())
if variants != self._all_execution_strategies:
raise self.InvalidExecutionStrategyMapping(
'Must specify a mapping with exactly the keys {} (was: {})'
.format(self._all_execution_strategies, variants))
method_for_variant = mapping[self.execution_strategy]
# The methods need not return a value, but we pass it along if they do.
return method_for_variant()

@classmethod
def register_options(cls, register):
super(NailgunTaskBase, cls).register_options(register)
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/java/nailgun_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def run(this, stdout=None, stderr=None, stdin=None, cwd=None):

def _check_nailgun_state(self, new_fingerprint):
running = self.is_alive()
updated = self.needs_restart(new_fingerprint) or self.cmd != self._distribution.java
updated = self.needs_restart(new_fingerprint)
logging.debug('Nailgun {nailgun} state: updated={up!s} running={run!s} fingerprint={old_fp} '
'new_fingerprint={new_fp} distribution={old_dist} new_distribution={new_dist}'
.format(nailgun=self._identity, up=updated, run=running,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,34 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import os
from functools import wraps

from pants.util.contextutil import temporary_dir
from pants_test.backend.jvm.tasks.jvm_compile.base_compile_integration_test import BaseCompileIT


def _execution_strategies(strategies, workers_range=[1]):
def wrapper(func):
@wraps(func)
def wrapper_self(*args, **kwargs):
for worker_count in workers_range:
for strategy in strategies:
func(*args, execution_strategy=strategy, worker_count=worker_count, **kwargs)
return wrapper_self
return wrapper


class RscCompileIntegration(BaseCompileIT):
def test_basic_binary(self):
@_execution_strategies(['nailgun', 'subprocess'])
def test_basic_binary_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {'execution_strategy': 'subprocess'},
'compile.rsc': {
'execution_strategy': execution_strategy,
'worker_count': worker_count,
},
}

pants_run = self.run_pants(
Expand Down Expand Up @@ -59,12 +75,16 @@ def test_basic_binary_hermetic(self):
'compile/rsc/current/.scala-library-synthetic/current/rsc/index/scala-library-synthetics.jar')
self.assertTrue(os.path.exists(path))

def test_executing_multi_target_binary(self):
@_execution_strategies(['nailgun', 'subprocess'], [2])
def test_executing_multi_target_binary_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {'execution_strategy': 'subprocess'}
'compile.rsc': {
'execution_strategy': execution_strategy,
'worker_count': worker_count,
}
}
with self.temporary_workdir() as workdir:
pants_run = self.run_pants_with_workdir(
Expand Down Expand Up @@ -97,14 +117,16 @@ def test_executing_multi_target_binary_hermetic(self):
self.assert_success(pants_run)
self.assertIn('Hello, Resource World!', pants_run.stdout_data)

def test_java_with_transitive_exported_scala_dep(self):
@_execution_strategies(['nailgun', 'subprocess'], [2])
def test_java_with_transitive_exported_scala_dep_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {
'execution_strategy': 'subprocess',
'worker_count': 1}
'execution_strategy': execution_strategy,
'worker_count': worker_count,
},
}
with self.temporary_workdir() as workdir:
pants_run = self.run_pants_with_workdir(
Expand Down

0 comments on commit feed6e0

Please sign in to comment.