Skip to content

Commit

Permalink
Optimize prefetchInputs.
Browse files Browse the repository at this point in the history
Use a pre-allocated array to hold the intermediate transfers to avoid
allocations. Replace some of RxJava code with Futures to avoid RxJava
overheads.

This improves the perfromance of prefetchInputs on a large set of
inputs from ~400ms to ~16ms.

Fixes bazelbuild#20555.
  • Loading branch information
coeuvre committed Dec 15, 2023
1 parent 1533cd1 commit 2bb70fd
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
Expand All @@ -53,7 +56,6 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -291,30 +293,38 @@ public ListenableFuture<Void> prefetchFiles(
// it must still synchronize on the output permissions having been set.
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();

Completable prefetch =
mergeBulkTransfer(
Flowable.fromIterable(files)
.flatMapSingle(
input ->
prefetchFile(
action,
dirsWithOutputPermissions,
metadataSupplier,
input,
priority)))
.doOnComplete(
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
() -> {
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
});

return toListenableFuture(prefetch);
List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
try (var s = Profiler.instance().profile("compose prefetches")) {
for (var file : files) {
var result =
prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority);
transfers.add(result);
}
}

ListenableFuture<Void> mergedTransfer;
try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
mergedTransfer = mergeBulkTransfer(transfers);
}

return Futures.transformAsync(
mergedTransfer,
unused -> {
try {
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor());
}

private Single<TransferResult> prefetchFile(
private ListenableFuture<Void> prefetchFile(
ActionExecutionMetadata action,
Set<Path> dirsWithOutputPermissions,
MetadataSupplier metadataSupplier,
Expand All @@ -323,14 +333,14 @@ private Single<TransferResult> prefetchFile(
try {
if (input instanceof VirtualActionInput) {
prefetchVirtualActionInput((VirtualActionInput) input);
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

PathFragment execPath = input.getExecPath();

FileArtifactValue metadata = metadataSupplier.getMetadata(input);
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
Expand All @@ -357,11 +367,9 @@ private Single<TransferResult> prefetchFile(
result = result.andThen(plantSymlink(symlink));
}

return RxUtils.toTransferResult(result);
} catch (IOException e) {
return Single.just(TransferResult.error(e));
} catch (InterruptedException e) {
return Single.just(TransferResult.interrupted());
return toListenableFuture(result);
} catch (IOException | InterruptedException e) {
return immediateFailedFuture(e);
}
}

Expand Down
68 changes: 59 additions & 9 deletions src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.stream.Collectors.joining;

import build.bazel.remote.execution.v2.Action;
Expand Down Expand Up @@ -44,6 +48,7 @@
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.server.FailureDetails;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.vfs.PathFragment;
Expand Down Expand Up @@ -417,11 +422,11 @@ public static ListenableFuture<ActionResult> downloadAsActionResult(
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
return Futures.immediateFailedFuture(e);
return immediateFailedFuture(e);
}
},
MoreExecutors.directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, directExecutor());
}

public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
Expand Down Expand Up @@ -483,15 +488,15 @@ public ByteString getContents() {
*/
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return Futures.catchingAsync(
call.call(),
Throwable.class,
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
MoreExecutors.directExecutor());
directExecutor());
} catch (Throwable t) {
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
}
Expand All @@ -511,15 +516,15 @@ private static <V> ListenableFuture<V> refreshIfUnauthenticatedAsyncOnException(
}
}

return Futures.immediateFailedFuture(t);
return immediateFailedFuture(t);
}

/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
public static <V> V refreshIfUnauthenticated(
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
throws IOException, InterruptedException {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return call.call();
Expand Down Expand Up @@ -618,4 +623,49 @@ public static void waitForBulkTransfer(
throw bulkTransferException;
}
}

public static ListenableFuture<Void> mergeBulkTransfer(
Iterable<ListenableFuture<Void>> transfers) {
return Futures.whenAllComplete(transfers)
.callAsync(
() -> {
BulkTransferException bulkTransferException = null;

for (var transfer : transfers) {
IOException error = null;
try {
transfer.get();
} catch (CancellationException e) {
return immediateFailedFuture(new InterruptedException());
} catch (InterruptedException e) {
return immediateFailedFuture(e);
} catch (ExecutionException e) {
var cause = e.getCause();
if (cause instanceof InterruptedException) {
return immediateFailedFuture(cause);
} else if (cause instanceof IOException) {
error = (IOException) cause;
} else {
error = new IOException(cause);
}
}

if (error == null) {
continue;
}

if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(error);
}

if (bulkTransferException != null) {
return immediateFailedFuture(bulkTransferException);
}

return immediateVoidFuture();
},
directExecutor());
}
}

0 comments on commit 2bb70fd

Please sign in to comment.