From d2997290078e9e5114d604a6cedefda0f0c6dec4 Mon Sep 17 00:00:00 2001 From: Ola Rozenfeld Date: Fri, 16 Dec 2016 16:37:22 +0000 Subject: [PATCH] Creating separate instances of CAS and execution handlers for every action. This allows Bazel to talk to multiple instances of the server, if these exist, enabling server-side parallelism (due to using separate gRPC channels). TESTED: internally and local server -- PiperOrigin-RevId: 142262973 MOS_MIGRATED_REVID=142262973 --- .../build/lib/remote/GrpcActionCache.java | 34 +++--- .../remote/RemoteActionContextProvider.java | 6 +- .../build/lib/remote/RemoteModule.java | 41 ++----- .../build/lib/remote/RemoteSpawnStrategy.java | 102 ++++++++++-------- .../build/lib/remote/RemoteWorkExecutor.java | 6 +- 5 files changed, 85 insertions(+), 104 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 0563c6848b6578..08eef889a05981 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -73,11 +73,7 @@ public final class GrpcActionCache implements RemoteActionCache { /** Channel over which to send gRPC CAS queries. */ private final ManagedChannel channel; - // TODO(olaola): proper profiling to determine the best values for these. - private final int grpcTimeoutSeconds; - private final int maxBatchInputs; - private final int maxChunkSizeBytes; - private final int maxBatchSizeBytes; + private final RemoteOptions options; private static final int MAX_MEMORY_KBYTES = 512 * 1024; @@ -138,7 +134,7 @@ public BlobChunk next() throws IOException { } else { chunk.setOffset(offset); } - int size = Math.min(currentBlob.length - offset, maxChunkSizeBytes); + int size = Math.min(currentBlob.length - offset, options.grpcMaxChunkSizeBytes); if (size > 0) { chunk.setData(ByteString.copyFrom(currentBlob, offset, size)); offset += size; @@ -203,7 +199,7 @@ public BlobChunk next() throws IOException { chunk.setOffset(offset); } if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, (long) maxChunkSizeBytes)]; + byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)]; currentStream.read(blob); chunk.setData(ByteString.copyFrom(blob)); bytesLeft -= blob.length; @@ -218,11 +214,8 @@ public BlobChunk next() throws IOException { @VisibleForTesting public GrpcActionCache(ManagedChannel channel, RemoteOptions options) { + this.options = options; this.channel = channel; - maxBatchInputs = options.grpcMaxBatchInputs; - maxChunkSizeBytes = options.grpcMaxChunkSizeBytes; - maxBatchSizeBytes = options.grpcMaxBatchSizeBytes; - grpcTimeoutSeconds = options.grpcTimeoutSeconds; } public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException { @@ -235,11 +228,12 @@ public static boolean isRemoteCacheOptions(RemoteOptions options) { private CasServiceBlockingStub getBlockingStub() { return CasServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); } private CasServiceStub getStub() { - return CasServiceGrpc.newStub(channel).withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + return CasServiceGrpc.newStub(channel).withDeadlineAfter( + options.grpcTimeoutSeconds, TimeUnit.SECONDS); } private ImmutableSet getMissingDigests(Iterable digests) { @@ -508,13 +502,15 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs) int currentBatchBytes = 0; int batchedInputs = 0; int batches = 0; + CasServiceStub stub = getStub(); try { while (blobs.hasNext()) { BlobChunk chunk = blobs.next(); if (chunk.hasDigest()) { // Determine whether to start next batch. - if (batchedInputs % maxBatchInputs == 0 - || chunk.getDigest().getSizeBytes() + currentBatchBytes > maxBatchSizeBytes) { + final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes; + if (batchedInputs % options.grpcMaxBatchInputs == 0 + || batchSize > options.grpcMaxBatchSizeBytes) { // The batches execute simultaneously. if (requestObserver != null) { batchedInputs = 0; @@ -523,7 +519,7 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs) } batches++; responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception); - requestObserver = getStub().uploadBlob(responseObserver); + requestObserver = stub.uploadBlob(responseObserver); } batchedInputs++; } @@ -549,7 +545,7 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs) while (batches++ < numItems) { finishLatch.countDown(); // Non-sent batches. } - finishLatch.await(grpcTimeoutSeconds, TimeUnit.SECONDS); + finishLatch.await(options.grpcTimeoutSeconds, TimeUnit.SECONDS); if (exception.get() != null) { throw exception.get(); // Re-throw the first encountered exception. } @@ -623,7 +619,7 @@ public ImmutableList downloadBlobs(Iterable digests) public ActionResult getCachedActionResult(ActionKey actionKey) { ExecutionCacheServiceBlockingStub stub = ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheRequest request = ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build(); ExecutionCacheReply reply = stub.getCachedResult(request); @@ -641,7 +637,7 @@ public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { ExecutionCacheServiceBlockingStub stub = ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); + .withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS); ExecutionCacheSetRequest request = ExecutionCacheSetRequest.newBuilder() .setActionDigest(actionKey.getDigest()) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java index 578a76383109ea..d4c160d685b929 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java @@ -30,9 +30,7 @@ final class RemoteActionContextProvider extends ActionContextProvider { RemoteActionContextProvider( CommandEnvironment env, - BuildRequest buildRequest, - RemoteActionCache actionCache, - RemoteWorkExecutor workExecutor) { + BuildRequest buildRequest) { boolean verboseFailures = buildRequest.getOptions(ExecutionOptions.class).verboseFailures; Builder strategiesBuilder = ImmutableList.builder(); strategiesBuilder.add( @@ -41,8 +39,6 @@ final class RemoteActionContextProvider extends ActionContextProvider { env.getExecRoot(), buildRequest.getOptions(RemoteOptions.class), verboseFailures, - actionCache, - workExecutor, env.getRuntime().getProductName())); this.strategies = strategiesBuilder.build(); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 7d2845c6ffdcbf..dc9e351a5c22f5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -17,10 +17,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.ExecutorBuilder; -import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.buildtool.BuildRequest; import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent; -import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.runtime.BlazeModule; import com.google.devtools.build.lib.runtime.Command; import com.google.devtools.build.lib.runtime.CommandEnvironment; @@ -33,8 +31,6 @@ /** RemoteModule provides distributed cache and remote execution for Bazel. */ public final class RemoteModule extends BlazeModule { private CommandEnvironment env; - private RemoteActionCache actionCache; - private RemoteWorkExecutor workExecutor; @Override public void beforeCommand(Command command, CommandEnvironment env) { @@ -49,41 +45,22 @@ public void afterCommand() { @Override public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) { - builder.addActionContextProvider( - new RemoteActionContextProvider(env, request, actionCache, workExecutor)); + builder.addActionContextProvider(new RemoteActionContextProvider(env, request)); } @Subscribe public void buildStarting(BuildStartingEvent event) { RemoteOptions options = event.getRequest().getOptions(RemoteOptions.class); - try { - // Reinitialize the remote cache and worker from options every time, because the options - // may change from build to build. - - // Don't provide the remote spawn unless at least action cache is initialized. - if (ConcurrentMapFactory.isRemoteCacheOptions(options)) { - actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options)); - } - if (GrpcActionCache.isRemoteCacheOptions(options)) { - actionCache = new GrpcActionCache(options); - } - // Otherwise actionCache remains null and remote caching/execution are disabled. - - if (actionCache != null) { - HashFunction hf = FileSystem.getDigestFunction(); - if (hf != HashFunction.SHA1) { - env.getBlazeModuleEnvironment().exit(new AbruptExitException( - "Remote cache/execution requires SHA1 digests, got " + hf - + ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1", - ExitCode.COMMAND_LINE_ERROR)); - } - if (RemoteWorkExecutor.isRemoteExecutionOptions(options)) { - workExecutor = new RemoteWorkExecutor(options); - } + if (ConcurrentMapFactory.isRemoteCacheOptions(options) + || GrpcActionCache.isRemoteCacheOptions(options)) { + HashFunction hf = FileSystem.getDigestFunction(); + if (hf != HashFunction.SHA1) { + env.getBlazeModuleEnvironment().exit(new AbruptExitException( + "Remote cache/execution requires SHA1 digests, got " + hf + + ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1", + ExitCode.COMMAND_LINE_ERROR)); } - } catch (InvalidConfigurationException e) { - env.getReporter().handle(Event.warn(e.toString())); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index 14a275e8197c09..c7bbdda939835f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.SpawnActionContext; import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.UserExecException; +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; @@ -63,27 +64,19 @@ final class RemoteSpawnStrategy implements SpawnActionContext { private final Path execRoot; private final StandaloneSpawnStrategy standaloneStrategy; - private final RemoteActionCache remoteActionCache; - private final RemoteWorkExecutor remoteWorkExecutor; private final boolean verboseFailures; - private final boolean remoteAcceptCached; - private final boolean remoteAllowLocalFallback; + private final RemoteOptions options; RemoteSpawnStrategy( Map clientEnv, Path execRoot, RemoteOptions options, boolean verboseFailures, - RemoteActionCache actionCache, - RemoteWorkExecutor workExecutor, String productName) { this.execRoot = execRoot; this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName); this.verboseFailures = verboseFailures; - this.remoteActionCache = actionCache; - this.remoteWorkExecutor = workExecutor; - this.remoteAcceptCached = options.remoteAcceptCached; - this.remoteAllowLocalFallback = options.remoteAllowLocalFallback; + this.options = options; } private Action buildAction( @@ -115,18 +108,18 @@ private Command buildCommand(List arguments, ImmutableMap outputFiles = new ArrayList<>(); for (ActionInput output : spawn.getOutputFiles()) { outputFiles.add(execRoot.getRelative(output.getExecPathString())); } try { ActionResult.Builder result = ActionResult.newBuilder(); - remoteActionCache.uploadAllResults(execRoot, outputFiles, result); - remoteActionCache.setCachedActionResult(actionKey, result.build()); + actionCache.uploadAllResults(execRoot, outputFiles, result); + actionCache.setCachedActionResult(actionKey, result.build()); // Handle all cache errors here. } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); @@ -146,14 +139,11 @@ private void execLocally( } } - private void passRemoteOutErr(ActionResult result, FileOutErr outErr) { - if (remoteActionCache == null) { - return; - } + private static void passRemoteOutErr( + RemoteActionCache cache, ActionResult result, FileOutErr outErr) { try { - ImmutableList streams = - remoteActionCache.downloadBlobs( - ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); + ImmutableList streams = cache.downloadBlobs( + ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); outErr.printOut(new String(streams.get(0), UTF_8)); outErr.printErr(new String(streams.get(1), UTF_8)); } catch (CacheNotFoundException e) { @@ -170,11 +160,6 @@ public String toString() { @Override public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) throws ExecException, InterruptedException { - if (!spawn.isRemotable() || remoteActionCache == null) { - standaloneStrategy.exec(spawn, actionExecutionContext); - return; - } - ActionKey actionKey = null; String mnemonic = spawn.getMnemonic(); Executor executor = actionExecutionContext.getExecutor(); @@ -182,6 +167,32 @@ public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) executor.getEventBus().post( ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote")); + RemoteActionCache actionCache = null; + RemoteWorkExecutor workExecutor = null; + if (spawn.isRemotable()) { + // Initialize remote cache and execution handlers. We use separate handlers for every + // action to enable server-side parallelism (need a different gRPC channel per action). + try { + if (ConcurrentMapFactory.isRemoteCacheOptions(options)) { + actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options)); + } + if (GrpcActionCache.isRemoteCacheOptions(options)) { + actionCache = new GrpcActionCache(options); + } + // Otherwise actionCache remains null and remote caching/execution are disabled. + + if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) { + workExecutor = new RemoteWorkExecutor(options); + } + } catch (InvalidConfigurationException e) { + eventHandler.handle(Event.warn(e.toString())); + } + } + if (!spawn.isRemotable() || actionCache == null) { + standaloneStrategy.exec(spawn, actionExecutionContext); + return; + } + try { // Temporary hack: the TreeNodeRepository should be created and maintained upstream! TreeNodeRepository repository = new TreeNodeRepository(execRoot); @@ -199,30 +210,30 @@ public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) // Look up action cache, and reuse the action output if it is found. actionKey = ContentDigests.computeActionKey(action); - ActionResult result = this.remoteAcceptCached - ? remoteActionCache.getCachedActionResult(actionKey) : null; - boolean acceptCachedResult = this.remoteAcceptCached; + ActionResult result = this.options.remoteAcceptCached + ? actionCache.getCachedActionResult(actionKey) : null; + boolean acceptCachedResult = this.options.remoteAcceptCached; if (result != null) { // We don't cache failed actions, so we know the outputs exist. // For now, download all outputs locally; in the future, we can reuse the digests to // just update the TreeNodeRepository and continue the build. try { - remoteActionCache.downloadAllResults(result, execRoot); + actionCache.downloadAllResults(result, execRoot); return; } catch (CacheNotFoundException e) { acceptCachedResult = false; // Retry the action remotely and invalidate the results. } } - if (remoteWorkExecutor == null) { - execLocally(spawn, actionExecutionContext, actionKey); + if (workExecutor == null) { + execLocally(spawn, actionExecutionContext, actionCache, actionKey); return; } // Upload the command and all the inputs into the remote cache. - remoteActionCache.uploadBlob(command.toByteArray()); + actionCache.uploadBlob(command.toByteArray()); // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! - remoteActionCache.uploadTree(repository, execRoot, inputRoot); + actionCache.uploadTree(repository, execRoot, inputRoot); // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() @@ -231,23 +242,24 @@ public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) .setTotalInputFileCount(inputs.size()) .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120)); // TODO(olaola): set sensible local and remote timouts. - ExecuteReply reply = remoteWorkExecutor.executeRemotely(request.build()); + ExecuteReply reply = workExecutor.executeRemotely(request.build()); ExecutionStatus status = reply.getStatus(); result = reply.getResult(); // We do not want to pass on the remote stdout and strerr if we are going to retry the // action. if (status.getSucceeded()) { - passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); - remoteActionCache.downloadAllResults(result, execRoot); + passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr()); + actionCache.downloadAllResults(result, execRoot); return; } - if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED || !remoteAllowLocalFallback) { - passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); + if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED + || !options.remoteAllowLocalFallback) { + passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr()); throw new UserExecException(status.getErrorDetail()); } // For now, we retry locally on all other remote errors. // TODO(olaola): add remote retries on cache miss errors. - execLocally(spawn, actionExecutionContext, actionKey); + execLocally(spawn, actionExecutionContext, actionCache, actionKey); } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); } catch (InterruptedException e) { @@ -260,15 +272,15 @@ public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) stackTrace = "\n" + Throwables.getStackTraceAsString(e); } eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace)); - if (remoteAllowLocalFallback) { - execLocally(spawn, actionExecutionContext, actionKey); + if (options.remoteAllowLocalFallback) { + execLocally(spawn, actionExecutionContext, actionCache, actionKey); } else { throw new UserExecException(e); } } catch (CacheNotFoundException e) { eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")")); - if (remoteAllowLocalFallback) { - execLocally(spawn, actionExecutionContext, actionKey); + if (options.remoteAllowLocalFallback) { + execLocally(spawn, actionExecutionContext, actionCache, actionKey); } else { throw new UserExecException(e); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java index 9cb080b4021935..3635ab7def9962 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java @@ -29,11 +29,11 @@ public class RemoteWorkExecutor { /** Channel over which to send work to run remotely. */ private final ManagedChannel channel; - private final int grpcTimeoutSeconds; + private final RemoteOptions options; public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException { + this.options = options; channel = RemoteUtils.createChannel(options.remoteWorker); - grpcTimeoutSeconds = options.grpcTimeoutSeconds; } public static boolean isRemoteExecutionOptions(RemoteOptions options) { @@ -44,7 +44,7 @@ public ExecuteReply executeRemotely(ExecuteRequest request) { ExecuteServiceBlockingStub stub = ExecuteServiceGrpc.newBlockingStub(channel) .withDeadlineAfter( - grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); + options.grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); Iterator replies = stub.execute(request); ExecuteReply reply = null; while (replies.hasNext()) {