Skip to content

Commit

Permalink
Creating separate instances of CAS and execution handlers for every a…
Browse files Browse the repository at this point in the history
…ction. This allows Bazel to talk to multiple instances of the server, if these exist, enabling server-side parallelism (due to using separate gRPC channels).

TESTED: internally and local server

--
PiperOrigin-RevId: 142262973
MOS_MIGRATED_REVID=142262973
  • Loading branch information
Ola Rozenfeld authored and katre committed Dec 16, 2016
1 parent 77e01d8 commit d299729
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ public final class GrpcActionCache implements RemoteActionCache {
/** Channel over which to send gRPC CAS queries. */
private final ManagedChannel channel;

// TODO(olaola): proper profiling to determine the best values for these.
private final int grpcTimeoutSeconds;
private final int maxBatchInputs;
private final int maxChunkSizeBytes;
private final int maxBatchSizeBytes;
private final RemoteOptions options;

private static final int MAX_MEMORY_KBYTES = 512 * 1024;

Expand Down Expand Up @@ -138,7 +134,7 @@ public BlobChunk next() throws IOException {
} else {
chunk.setOffset(offset);
}
int size = Math.min(currentBlob.length - offset, maxChunkSizeBytes);
int size = Math.min(currentBlob.length - offset, options.grpcMaxChunkSizeBytes);
if (size > 0) {
chunk.setData(ByteString.copyFrom(currentBlob, offset, size));
offset += size;
Expand Down Expand Up @@ -203,7 +199,7 @@ public BlobChunk next() throws IOException {
chunk.setOffset(offset);
}
if (bytesLeft > 0) {
byte[] blob = new byte[(int) Math.min(bytesLeft, (long) maxChunkSizeBytes)];
byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)];
currentStream.read(blob);
chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
Expand All @@ -218,11 +214,8 @@ public BlobChunk next() throws IOException {

@VisibleForTesting
public GrpcActionCache(ManagedChannel channel, RemoteOptions options) {
this.options = options;
this.channel = channel;
maxBatchInputs = options.grpcMaxBatchInputs;
maxChunkSizeBytes = options.grpcMaxChunkSizeBytes;
maxBatchSizeBytes = options.grpcMaxBatchSizeBytes;
grpcTimeoutSeconds = options.grpcTimeoutSeconds;
}

public GrpcActionCache(RemoteOptions options) throws InvalidConfigurationException {
Expand All @@ -235,11 +228,12 @@ public static boolean isRemoteCacheOptions(RemoteOptions options) {

private CasServiceBlockingStub getBlockingStub() {
return CasServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
.withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
}

private CasServiceStub getStub() {
return CasServiceGrpc.newStub(channel).withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
return CasServiceGrpc.newStub(channel).withDeadlineAfter(
options.grpcTimeoutSeconds, TimeUnit.SECONDS);
}

private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
Expand Down Expand Up @@ -508,13 +502,15 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs)
int currentBatchBytes = 0;
int batchedInputs = 0;
int batches = 0;
CasServiceStub stub = getStub();
try {
while (blobs.hasNext()) {
BlobChunk chunk = blobs.next();
if (chunk.hasDigest()) {
// Determine whether to start next batch.
if (batchedInputs % maxBatchInputs == 0
|| chunk.getDigest().getSizeBytes() + currentBatchBytes > maxBatchSizeBytes) {
final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes;
if (batchedInputs % options.grpcMaxBatchInputs == 0
|| batchSize > options.grpcMaxBatchSizeBytes) {
// The batches execute simultaneously.
if (requestObserver != null) {
batchedInputs = 0;
Expand All @@ -523,7 +519,7 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs)
}
batches++;
responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
requestObserver = getStub().uploadBlob(responseObserver);
requestObserver = stub.uploadBlob(responseObserver);
}
batchedInputs++;
}
Expand All @@ -549,7 +545,7 @@ private void uploadChunks(int numItems, BlobChunkIterator blobs)
while (batches++ < numItems) {
finishLatch.countDown(); // Non-sent batches.
}
finishLatch.await(grpcTimeoutSeconds, TimeUnit.SECONDS);
finishLatch.await(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
if (exception.get() != null) {
throw exception.get(); // Re-throw the first encountered exception.
}
Expand Down Expand Up @@ -623,7 +619,7 @@ public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
public ActionResult getCachedActionResult(ActionKey actionKey) {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
.withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheRequest request =
ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
ExecutionCacheReply reply = stub.getCachedResult(request);
Expand All @@ -641,7 +637,7 @@ public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
ExecutionCacheServiceBlockingStub stub =
ExecutionCacheServiceGrpc.newBlockingStub(channel)
.withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
.withDeadlineAfter(options.grpcTimeoutSeconds, TimeUnit.SECONDS);
ExecutionCacheSetRequest request =
ExecutionCacheSetRequest.newBuilder()
.setActionDigest(actionKey.getDigest())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ final class RemoteActionContextProvider extends ActionContextProvider {

RemoteActionContextProvider(
CommandEnvironment env,
BuildRequest buildRequest,
RemoteActionCache actionCache,
RemoteWorkExecutor workExecutor) {
BuildRequest buildRequest) {
boolean verboseFailures = buildRequest.getOptions(ExecutionOptions.class).verboseFailures;
Builder<ActionContext> strategiesBuilder = ImmutableList.builder();
strategiesBuilder.add(
Expand All @@ -41,8 +39,6 @@ final class RemoteActionContextProvider extends ActionContextProvider {
env.getExecRoot(),
buildRequest.getOptions(RemoteOptions.class),
verboseFailures,
actionCache,
workExecutor,
env.getRuntime().getProductName()));
this.strategies = strategiesBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.ExecutorBuilder;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.buildtool.BuildRequest;
import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.Command;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
Expand All @@ -33,8 +31,6 @@
/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
private CommandEnvironment env;
private RemoteActionCache actionCache;
private RemoteWorkExecutor workExecutor;

@Override
public void beforeCommand(Command command, CommandEnvironment env) {
Expand All @@ -49,41 +45,22 @@ public void afterCommand() {

@Override
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) {
builder.addActionContextProvider(
new RemoteActionContextProvider(env, request, actionCache, workExecutor));
builder.addActionContextProvider(new RemoteActionContextProvider(env, request));
}

@Subscribe
public void buildStarting(BuildStartingEvent event) {
RemoteOptions options = event.getRequest().getOptions(RemoteOptions.class);

try {
// Reinitialize the remote cache and worker from options every time, because the options
// may change from build to build.

// Don't provide the remote spawn unless at least action cache is initialized.
if (ConcurrentMapFactory.isRemoteCacheOptions(options)) {
actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options));
}
if (GrpcActionCache.isRemoteCacheOptions(options)) {
actionCache = new GrpcActionCache(options);
}
// Otherwise actionCache remains null and remote caching/execution are disabled.

if (actionCache != null) {
HashFunction hf = FileSystem.getDigestFunction();
if (hf != HashFunction.SHA1) {
env.getBlazeModuleEnvironment().exit(new AbruptExitException(
"Remote cache/execution requires SHA1 digests, got " + hf
+ ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1",
ExitCode.COMMAND_LINE_ERROR));
}
if (RemoteWorkExecutor.isRemoteExecutionOptions(options)) {
workExecutor = new RemoteWorkExecutor(options);
}
if (ConcurrentMapFactory.isRemoteCacheOptions(options)
|| GrpcActionCache.isRemoteCacheOptions(options)) {
HashFunction hf = FileSystem.getDigestFunction();
if (hf != HashFunction.SHA1) {
env.getBlazeModuleEnvironment().exit(new AbruptExitException(
"Remote cache/execution requires SHA1 digests, got " + hf
+ ", run with --host_jvm_args=-Dbazel.DigestFunction=SHA1",
ExitCode.COMMAND_LINE_ERROR));
}
} catch (InvalidConfigurationException e) {
env.getReporter().handle(Event.warn(e.toString()));
}
}

Expand Down
Loading

0 comments on commit d299729

Please sign in to comment.