Skip to content

Commit

Permalink
Merge branch 'release-6.4.0' into release-6.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
iancha1992 authored Sep 12, 2023
2 parents 6f16e50 + 484b305 commit 760dba8
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected ListenableFuture<Void> doDownloadFile(
downloadProgressReporter = new DownloadProgressReporter(NO_ACTION, "", 0);
}

return remoteCache.downloadFile(context, tempPath, digest, downloadProgressReporter);
return remoteCache.downloadFile(
context, execPath.getPathString(), tempPath, digest, downloadProgressReporter);
}

public static class DownloadProgress implements FetchProgress {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
import com.google.devtools.build.lib.remote.common.ProgressStatusListener;
Expand Down Expand Up @@ -201,21 +202,34 @@ protected ListenableFuture<Void> uploadBlob(
return RxFutures.toListenableFuture(upload);
}

public ListenableFuture<byte[]> downloadBlob(
RemoteActionExecutionContext context, Digest digest) {
return downloadBlob(context, /* blobName= */ "", digest);
}

/**
* Downloads a blob with content hash {@code digest} and stores its content in memory.
*
* @return a future that completes after the download completes (succeeds / fails). If successful,
* the content is stored in the future's {@code byte[]}.
*/
public ListenableFuture<byte[]> downloadBlob(
RemoteActionExecutionContext context, Digest digest) {
RemoteActionExecutionContext context, String blobName, Digest digest) {
if (digest.getSizeBytes() == 0) {
return EMPTY_BYTES;
}
ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
var download = downloadBlob(context, blobName, digest, bOut);
SettableFuture<byte[]> outerF = SettableFuture.create();
outerF.addListener(
() -> {
if (outerF.isCancelled()) {
download.cancel(/* mayInterruptIfRunning= */ true);
}
},
directExecutor());
Futures.addCallback(
cacheProtocol.downloadBlob(context, digest, bOut),
download,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Expand All @@ -237,12 +251,37 @@ public void onFailure(Throwable t) {
}

private ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
RemoteActionExecutionContext context, String blobName, Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return COMPLETED_SUCCESS;
}
var download = cacheProtocol.downloadBlob(context, digest, out);
SettableFuture<Void> future = SettableFuture.create();
future.addListener(
() -> {
if (future.isCancelled()) {
download.cancel(/* mayInterruptIfRunning= */ true);
}
},
directExecutor());
Futures.addCallback(
download,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
future.set(result);
}

return cacheProtocol.downloadBlob(context, digest, out);
@Override
public void onFailure(Throwable t) {
if (t instanceof CacheNotFoundException) {
((CacheNotFoundException) t).setFilename(blobName);
}
future.setException(t);
}
},
directExecutor());
return future;
}

/** A reporter that reports download progresses. */
Expand Down Expand Up @@ -315,6 +354,13 @@ public ListenableFuture<Void> downloadFile(
throws IOException {
SettableFuture<Void> outerF = SettableFuture.create();
ListenableFuture<Void> f = downloadFile(context, localPath, digest, reporter);
outerF.addListener(
() -> {
if (outerF.isCancelled()) {
f.cancel(/* mayInterruptIfRunning= */ true);
}
},
directExecutor());
Futures.addCallback(
f,
new FutureCallback<Void>() {
Expand All @@ -325,7 +371,10 @@ public void onSuccess(Void unused) {

@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof OutputDigestMismatchException) {
if (throwable instanceof CacheNotFoundException) {
var cacheNotFoundException = (CacheNotFoundException) throwable;
cacheNotFoundException.setFilename(outputPath);
} else if (throwable instanceof OutputDigestMismatchException) {
OutputDigestMismatchException e = ((OutputDigestMismatchException) throwable);
e.setOutputPath(outputPath);
e.setLocalPath(localPath);
Expand All @@ -341,11 +390,16 @@ public void onFailure(Throwable throwable) {
/** Downloads a file (that is not a directory). The content is fetched from the digest. */
public ListenableFuture<Void> downloadFile(
RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
return downloadFile(context, path, digest, new DownloadProgressReporter(NO_ACTION, "", 0));
return downloadFile(
context,
path.getPathString(),
path,
digest,
new DownloadProgressReporter(NO_ACTION, "", 0));
}

/** Downloads a file (that is not a directory). The content is fetched from the digest. */
public ListenableFuture<Void> downloadFile(
private ListenableFuture<Void> downloadFile(
RemoteActionExecutionContext context,
Path path,
Digest digest,
Expand Down Expand Up @@ -409,7 +463,12 @@ public final List<ListenableFuture<Void>> downloadOutErr(
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStdoutDigest()) {
downloads.add(downloadBlob(context, result.getStdoutDigest(), outErr.getOutputStream()));
downloads.add(
downloadBlob(
context,
/* blobName= */ "<stdout>",
result.getStdoutDigest(),
outErr.getOutputStream()));
}
if (!result.getStderrRaw().isEmpty()) {
try {
Expand All @@ -419,7 +478,12 @@ public final List<ListenableFuture<Void>> downloadOutErr(
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStderrDigest()) {
downloads.add(downloadBlob(context, result.getStderrDigest(), outErr.getErrorStream()));
downloads.add(
downloadBlob(
context,
/* blobName= */ "<stderr>",
result.getStderrDigest(),
outErr.getErrorStream()));
}
return downloads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,11 @@ ActionResultMetadata parseActionResultMetadata(
Map<Path, ListenableFuture<Tree>> dirMetadataDownloads =
Maps.newHashMapWithExpectedSize(result.getOutputDirectoriesCount());
for (OutputDirectory dir : result.getOutputDirectories()) {
var outputPath = encodeBytestringUtf8(dir.getPath());
dirMetadataDownloads.put(
remotePathResolver.outputPathToLocalPath(encodeBytestringUtf8(dir.getPath())),
remotePathResolver.outputPathToLocalPath(outputPath),
Futures.transformAsync(
remoteCache.downloadBlob(context, dir.getTreeDigest()),
remoteCache.downloadBlob(context, outputPath, dir.getTreeDigest()),
(treeBytes) ->
immediateFuture(Tree.parseFrom(treeBytes, ExtensionRegistry.getEmptyRegistry())),
directExecutor()));
Expand Down Expand Up @@ -1176,7 +1177,7 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
try (SilentCloseable c = Profiler.instance().profile("Remote.downloadInMemoryOutput")) {
if (inMemoryOutput != null) {
ListenableFuture<byte[]> inMemoryOutputDownload =
remoteCache.downloadBlob(context, inMemoryOutputDigest);
remoteCache.downloadBlob(context, inMemoryOutputPath.getPathString(), inMemoryOutputDigest);
waitForBulkTransfer(
ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt= */ true);
byte[] data = getFromFuture(inMemoryOutputDownload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, Act
if (!result.getStdoutRaw().isEmpty()) {
stdout = result.getStdoutRaw().toByteArray();
} else if (result.hasStdoutDigest()) {
stdout = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStdoutDigest()));
stdout =
Utils.getFromFuture(
remoteCache.downloadBlob(context, "<stdout>", result.getStdoutDigest()));
}

byte[] stderr = new byte[0];
if (!result.getStderrRaw().isEmpty()) {
stderr = result.getStderrRaw().toByteArray();
} else if (result.hasStderrDigest()) {
stderr = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStderrDigest()));
stderr =
Utils.getFromFuture(
remoteCache.downloadBlob(context, "<stderr>", result.getStderrDigest()));
}

return new ExecutionResult(result.getExitCode(), stdout, stderr);
Expand Down Expand Up @@ -138,7 +142,7 @@ public ExecutionResult execute(
platform,
timeout,
acceptCached,
/*salt=*/ null);
/* salt= */ null);
Digest actionDigest = digestUtil.compute(action);
ActionKey actionKey = new ActionKey(actionDigest);
CachedActionResult cachedActionResult;
Expand All @@ -158,7 +162,7 @@ public ExecutionResult execute(
additionalInputs.put(actionDigest, action);
additionalInputs.put(commandHash, command);

remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /*force=*/ true);
remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /* force= */ true);
}

try (SilentCloseable c =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ java_library(
name = "cache_not_found_exception",
srcs = ["CacheNotFoundException.java"],
deps = [
"//third_party:guava",
"//third_party:jsr305",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,37 @@
package com.google.devtools.build.lib.remote.common;

import build.bazel.remote.execution.v2.Digest;
import com.google.common.base.Strings;
import java.io.IOException;
import javax.annotation.Nullable;

/**
* An exception to indicate cache misses. TODO(olaola): have a class of checked
* RemoteCacheExceptions.
*/
public final class CacheNotFoundException extends IOException {
private final Digest missingDigest;
@Nullable private String filename;

public CacheNotFoundException(Digest missingDigest) {
super("Missing digest: " + missingDigest.getHash() + "/" + missingDigest.getSizeBytes());
this.missingDigest = missingDigest;
}

public void setFilename(@Nullable String filename) {
this.filename = filename;
}

public Digest getMissingDigest() {
return missingDigest;
}

@Override
public String getMessage() {
String message =
"Missing digest: " + missingDigest.getHash() + "/" + missingDigest.getSizeBytes();
if (!Strings.isNullOrEmpty(filename)) {
message += " for " + filename;
}
return message;
}
}

0 comments on commit 760dba8

Please sign in to comment.