Skip to content

Commit

Permalink
Asynchronously verify ServerCapabilities.
Browse files Browse the repository at this point in the history
Currently, we get and verify ServerCapabilities within `RemoteModule#beforeCommand` which blocks Bazel. However, the result, if not an error, is not needed until execution phrase when Bazel starts to execute actions remotely or query remote cache.

This CL changes Bazel to not wait within `beforeCommand`, but to get/wait the ServerCapabilities when it needs them. This is achieved by moving the process to be a part of channel creation and by using `AsyncTaskCache` to ensure it's only executed once. `beforeCommand` still triggers the start of the process.

Error generated during that process is no longer an `AbruptExitException` but normal `IOException` (and is treated as remote error) which means local fallback works automatically. And in case of building with only remote cache, it also automatically falls back to local execution (instead of exiting with error). As a result, some tests must be updated/removed.

Working towards bazelbuild#18607.

PiperOrigin-RevId: 563399076
Change-Id: I8d4130e81a059b4939a7e745e25abd3916f39f9e
  • Loading branch information
coeuvre authored and copybara-github committed Sep 7, 2023
1 parent 3040f50 commit 0d36c6b
Show file tree
Hide file tree
Showing 25 changed files with 460 additions and 476 deletions.
5 changes: 3 additions & 2 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ java_library(
exclude = [
"ExecutionStatusException.java",
"ReferenceCountedChannel.java",
"ReferenceCountedChannelPool.java",
"ChannelConnectionWithServerCapabilitiesFactory.java",
"RemoteRetrier.java",
"RemoteRetrierUtils.java",
"Retrier.java",
Expand Down Expand Up @@ -160,13 +160,14 @@ java_library(
java_library(
name = "ReferenceCountedChannel",
srcs = [
"ChannelConnectionWithServerCapabilitiesFactory.java",
"ReferenceCountedChannel.java",
],
deps = [
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//third_party:guava",
"//third_party:jsr305",
"//third_party:netty",
"//third_party:rxjava3",
"//third_party/grpc-java:grpc-jar",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import io.grpc.ManagedChannel;
import io.reactivex.rxjava3.core.Single;

/**
* A {@link ChannelConnectionFactory} that create {@link ChannelConnectionWithServerCapabilities}.
*/
public interface ChannelConnectionWithServerCapabilitiesFactory extends ChannelConnectionFactory {

@Override
Single<? extends ChannelConnectionWithServerCapabilities> create();

/** A {@link ChannelConnection} that provides {@link ServerCapabilities}. */
class ChannelConnectionWithServerCapabilities extends ChannelConnection {
private final ServerCapabilities serverCapabilities;

public ChannelConnectionWithServerCapabilities(
ManagedChannel channel, ServerCapabilities serverCapabilities) {
super(channel);
this.serverCapabilities = serverCapabilities;
}

public ServerCapabilities getServerCapabilities() {
return serverCapabilities;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ static ExecuteResponse extractResponseOrThrowIfError(Operation operation) throws
}

@Override
public ServerCapabilities getServerCapabilities() {
public ServerCapabilities getServerCapabilities() throws IOException {
return channel.getServerCapabilities();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,148 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import build.bazel.remote.execution.v2.DigestFunction;
import build.bazel.remote.execution.v2.DigestFunction.Value;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.remote.util.Utils;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ChannelConnectionFactory} which creates {@link ChannelConnection} using {@link
* ChannelFactory}.
*/
public class GoogleChannelConnectionFactory implements ChannelConnectionFactory {
public class GoogleChannelConnectionFactory
implements ChannelConnectionWithServerCapabilitiesFactory {
private final AtomicBoolean getAndVerifyServerCapabilitiesStarted = new AtomicBoolean(false);
private final SettableFuture<ServerCapabilities> serverCapabilities = SettableFuture.create();

private final ChannelFactory channelFactory;
private final String target;
private final String proxy;
private final AuthAndTLSOptions options;
private final List<ClientInterceptor> interceptors;
private final int maxConcurrency;
private final boolean verboseFailures;
private final Reporter reporter;
private final RemoteServerCapabilities remoteServerCapabilities;
private final RemoteOptions remoteOptions;
private final DigestFunction.Value digestFunction;
private final ServerCapabilitiesRequirement requirement;

public GoogleChannelConnectionFactory(
ChannelFactory channelFactory,
String target,
String proxy,
RemoteOptions remoteOptions,
AuthAndTLSOptions options,
List<ClientInterceptor> interceptors,
int maxConcurrency) {
int maxConcurrency,
boolean verboseFailures,
Reporter reporter,
RemoteServerCapabilities remoteServerCapabilities,
Value digestFunction,
ServerCapabilitiesRequirement requirement) {
this.channelFactory = channelFactory;
this.target = target;
this.proxy = proxy;
this.options = options;
this.interceptors = interceptors;
this.maxConcurrency = maxConcurrency;
this.verboseFailures = verboseFailures;
this.reporter = reporter;
this.remoteServerCapabilities = remoteServerCapabilities;
this.remoteOptions = remoteOptions;
this.digestFunction = digestFunction;
this.requirement = requirement;
}

@Override
public Single<ChannelConnection> create() {
public Single<ChannelConnectionWithServerCapabilities> create() {
return Single.fromCallable(
() ->
new ChannelConnection(channelFactory.newChannel(target, proxy, options, interceptors)));
() -> channelFactory.newChannel(target, proxy, options, interceptors))
.flatMap(
channel ->
RxFutures.toSingle(() -> getAndVerifyServerCapabilities(channel), directExecutor())
.map(
serverCapabilities ->
new ChannelConnectionWithServerCapabilities(
channel, serverCapabilities)));
}

private ListenableFuture<ServerCapabilities> getAndVerifyServerCapabilities(
ManagedChannel channel) {
if (getAndVerifyServerCapabilitiesStarted.compareAndSet(false, true)) {
var s = Profiler.instance().profile("getAndVerifyServerCapabilities");
var future =
Futures.transformAsync(
remoteServerCapabilities.get(channel),
serverCapabilities -> {
var result =
RemoteServerCapabilities.checkClientServerCompatibility(
serverCapabilities, remoteOptions, digestFunction, requirement);
for (String warning : result.getWarnings()) {
reporter.handle(Event.warn(warning));
}
List<String> errors = result.getErrors();
for (int i = 0; i < errors.size() - 1; ++i) {
reporter.handle(Event.error(errors.get(i)));
}
if (!errors.isEmpty()) {
String lastErrorMessage = errors.get(errors.size() - 1);
return immediateFailedFuture(new IOException(lastErrorMessage));
}
return immediateFuture(serverCapabilities);
},
directExecutor());
future.addListener(s::close, directExecutor());
Futures.addCallback(
future,
new FutureCallback<ServerCapabilities>() {
@Override
public void onSuccess(ServerCapabilities result) {
serverCapabilities.set(result);
}

@Override
public void onFailure(Throwable error) {
String message =
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(error, verboseFailures);
reporter.handle(Event.error(message));

IOException exception;
if (error instanceof IOException) {
exception = (IOException) error;
} else {
exception = new IOException(error);
}
serverCapabilities.setException(exception);
}
},
directExecutor());
}
return serverCapabilities;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private ListenableFuture<CachedActionResult> handleStatus(
}

@Override
public CacheCapabilities getCacheCapabilities() {
public CacheCapabilities getCacheCapabilities() throws IOException {
return channel.getServerCapabilities().getCacheCapabilities();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private ExecuteResponse getOperationResponse(Operation op) throws IOException {
}

@Override
public ServerCapabilities getServerCapabilities() {
public ServerCapabilities getServerCapabilities() throws IOException {
return channel.getServerCapabilities();
}

Expand Down
Loading

0 comments on commit 0d36c6b

Please sign in to comment.