Skip to content

Commit

Permalink
Optimize prefetchInputs.
Browse files Browse the repository at this point in the history
Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads.

This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms.

Fixes #20555.

Closes #20557.

PiperOrigin-RevId: 595226013
Change-Id: If5296fa6b3c0166b95cfca4281255e523724a41f
  • Loading branch information
coeuvre authored and copybara-github committed Jan 2, 2024
1 parent 0bb3493 commit 915fb3e
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
Expand All @@ -44,17 +47,13 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.OutputPermissions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -283,6 +282,10 @@ public ListenableFuture<Void> prefetchFiles(
files.add(input);
}

if (files.isEmpty()) {
return immediateVoidFuture();
}

// Collect the set of directories whose output permissions must be set at the end of this call.
// This responsibility cannot lie with the downloading of an individual file, because multiple
// files may be concurrently downloaded into the same directory within a single call to
Expand All @@ -291,30 +294,38 @@ public ListenableFuture<Void> prefetchFiles(
// it must still synchronize on the output permissions having been set.
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();

Completable prefetch =
mergeBulkTransfer(
Flowable.fromIterable(files)
.flatMapSingle(
input ->
prefetchFile(
action,
dirsWithOutputPermissions,
metadataSupplier,
input,
priority)))
.doOnComplete(
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
() -> {
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
});

return toListenableFuture(prefetch);
// Using plain futures to avoid RxJava overheads.
List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
try (var s = Profiler.instance().profile("compose prefetches")) {
for (var file : files) {
transfers.add(
prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
}
}

ListenableFuture<Void> mergedTransfer;
try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
mergedTransfer = mergeBulkTransfer(transfers);
}

return Futures.transformAsync(
mergedTransfer,
unused -> {
try {
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor());
}

private Single<TransferResult> prefetchFile(
private ListenableFuture<Void> prefetchFile(
ActionExecutionMetadata action,
Set<Path> dirsWithOutputPermissions,
MetadataSupplier metadataSupplier,
Expand All @@ -323,14 +334,14 @@ private Single<TransferResult> prefetchFile(
try {
if (input instanceof VirtualActionInput) {
prefetchVirtualActionInput((VirtualActionInput) input);
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

PathFragment execPath = input.getExecPath();

FileArtifactValue metadata = metadataSupplier.getMetadata(input);
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
return Single.just(TransferResult.ok());
return immediateVoidFuture();
}

@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
Expand All @@ -357,11 +368,9 @@ private Single<TransferResult> prefetchFile(
result = result.andThen(plantSymlink(symlink));
}

return RxUtils.toTransferResult(result);
} catch (IOException e) {
return Single.just(TransferResult.error(e));
} catch (InterruptedException e) {
return Single.just(TransferResult.interrupted());
return toListenableFuture(result);
} catch (IOException | InterruptedException e) {
return immediateFailedFuture(e);
}
}

Expand Down
68 changes: 58 additions & 10 deletions src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.stream.Collectors.joining;

import build.bazel.remote.execution.v2.Action;
Expand All @@ -29,7 +33,6 @@
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.build.lib.actions.Spawn;
Expand Down Expand Up @@ -417,11 +420,11 @@ public static ListenableFuture<ActionResult> downloadAsActionResult(
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
return Futures.immediateFailedFuture(e);
return immediateFailedFuture(e);
}
},
MoreExecutors.directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, directExecutor());
}

public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
Expand Down Expand Up @@ -483,15 +486,15 @@ public ByteString getContents() {
*/
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return Futures.catchingAsync(
call.call(),
Throwable.class,
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
MoreExecutors.directExecutor());
directExecutor());
} catch (Throwable t) {
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
}
Expand All @@ -511,15 +514,15 @@ private static <V> ListenableFuture<V> refreshIfUnauthenticatedAsyncOnException(
}
}

return Futures.immediateFailedFuture(t);
return immediateFailedFuture(t);
}

/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
public static <V> V refreshIfUnauthenticated(
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
throws IOException, InterruptedException {
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);
checkNotNull(call);
checkNotNull(callCredentialsProvider);

try {
return call.call();
Expand Down Expand Up @@ -618,4 +621,49 @@ public static void waitForBulkTransfer(
throw bulkTransferException;
}
}

public static ListenableFuture<Void> mergeBulkTransfer(
Iterable<ListenableFuture<Void>> transfers) {
return Futures.whenAllComplete(transfers)
.callAsync(
() -> {
BulkTransferException bulkTransferException = null;

for (var transfer : transfers) {
IOException error = null;
try {
transfer.get();
} catch (CancellationException e) {
return immediateFailedFuture(new InterruptedException());
} catch (InterruptedException e) {
return immediateFailedFuture(e);
} catch (ExecutionException e) {
var cause = e.getCause();
if (cause instanceof InterruptedException) {
return immediateFailedFuture(cause);
} else if (cause instanceof IOException) {
error = (IOException) cause;
} else {
error = new IOException(cause);
}
}

if (error == null) {
continue;
}

if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(error);
}

if (bulkTransferException != null) {
return immediateFailedFuture(bulkTransferException);
}

return immediateVoidFuture();
},
directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -760,7 +759,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw
prefetcher.prefetchFiles(
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);

assertThrows(CancellationException.class, future::get);
assertThrows(InterruptedException.class, () -> getFromFuture(future));
}

@Test
Expand Down

0 comments on commit 915fb3e

Please sign in to comment.