Skip to content

Commit

Permalink
BulkTransferException for remote up/download
Browse files Browse the repository at this point in the history
  • Loading branch information
George Gensure committed Apr 6, 2020
1 parent eea58d1 commit eca40ab
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import java.io.IOException;

/**
* Exception which represents a collection of IOExceptions for the purpose
* of distinguishing remote communication exceptions from those which occur
* on filesystems locally. This exception serves as a trace point for the actual
* transfer, so that the intended operation can be observed in a stack, with all
* constituent exceptions available for observation.
*/
class BulkTransferException extends IOException {
// true since no empty BulkTransferException is ever thrown
private boolean allCacheNotFoundException = true;

BulkTransferException() {
}

BulkTransferException(IOException e) {
add(e);
}

/**
* Add an IOException to the suppressed list.
*
* The Java standard addSuppressed is final and this method stands in
* its place to selectively filter and record whether all suppressed
* exceptions are CacheNotFoundExceptions
*/
void add(IOException e) {
allCacheNotFoundException &= e instanceof CacheNotFoundException;
super.addSuppressed(e);
}

boolean onlyCausedByCacheNotFoundException() {
return allCacheNotFoundException;
}
}
143 changes: 67 additions & 76 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import build.bazel.remote.execution.v2.Action;
import build.bazel.remote.execution.v2.ActionResult;
Expand All @@ -34,6 +35,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
Expand All @@ -57,7 +59,6 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.util.io.OutErr;
Expand Down Expand Up @@ -116,7 +117,7 @@ public RemoteCache(

public ActionResult downloadActionResult(ActionKey actionKey)
throws IOException, InterruptedException {
return Utils.getFromFuture(cacheProtocol.downloadActionResult(actionKey));
return getFromFuture(cacheProtocol.downloadActionResult(actionKey));
}

/**
Expand Down Expand Up @@ -182,7 +183,7 @@ private void uploadOutputs(
digests.addAll(digestToBlobs.keySet());

ImmutableSet<Digest> digestsToUpload =
Utils.getFromFuture(cacheProtocol.findMissingDigests(digests));
getFromFuture(cacheProtocol.findMissingDigests(digests));
ImmutableList.Builder<ListenableFuture<Void>> uploads = ImmutableList.builder();
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
Expand All @@ -198,7 +199,7 @@ private void uploadOutputs(
}
}

waitForUploads(uploads.build());
waitForBulkTransfer(uploads.build(), /* cancelRemainingOnInterrupt=*/ false);

if (manifest.getStderrDigest() != null) {
result.setStderrDigest(manifest.getStderrDigest());
Expand All @@ -208,22 +209,44 @@ private void uploadOutputs(
}
}

private static void waitForUploads(List<ListenableFuture<Void>> uploads)
throws IOException, InterruptedException {
try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
protected static <T> void waitForBulkTransfer(Iterable<ListenableFuture<T>> transfers, boolean cancelRemainingOnInterrupt)
throws BulkTransferException, InterruptedException {
BulkTransferException bulkTransferException = null;
InterruptedException interruptedException = null;
boolean interrupted = Thread.currentThread().isInterrupted();
for (ListenableFuture<T> transfer : transfers) {
try {
if (interruptedException == null) {
// Wait for all downloads to finish.
getFromFuture(transfer);
} else {
transfer.cancel(true);
}
} catch (IOException e) {
if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(e);
} catch (InterruptedException e) {
interrupted = Thread.interrupted() || interrupted;
interruptedException = e;
if (!cancelRemainingOnInterrupt) {
// leave the rest of the transfers alone
break;
}
}
} catch (ExecutionException e) {
// TODO(buchgr): Add support for cancellation and factor this method out to be shared
// between ByteStreamUploader as well.
Throwable cause = e.getCause();
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
if (cause != null) {
throw new IOException(cause);
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (interruptedException != null) {
if (bulkTransferException != null) {
interruptedException.addSuppressed(bulkTransferException);
}
throw new IOException(e);
throw interruptedException;
}
if (bulkTransferException != null) {
throw bulkTransferException;
}
}

Expand Down Expand Up @@ -299,40 +322,16 @@ public void download(
// Subsequently we need to wait for *every* download to finish, even if we already know that
// one failed. That's so that when exiting this method we can be sure that all downloads have
// finished and don't race with the cleanup routine.
// TODO(buchgr): Look into cancellation.

IOException downloadException = null;
InterruptedException interruptedException = null;
FileOutErr tmpOutErr = null;
try {
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));
} catch (IOException e) {
downloadException = e;
}

for (ListenableFuture<FileMetadata> download : downloads) {
try {
// Wait for all downloads to finish.
getFromFuture(download);
} catch (IOException e) {
if (downloadException == null) {
downloadException = e;
} else if (e != downloadException) {
downloadException.addSuppressed(e);
}
} catch (InterruptedException e) {
if (interruptedException == null) {
interruptedException = e;
} else if (e != interruptedException) {
interruptedException.addSuppressed(e);
}
}
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));

if (downloadException != null || interruptedException != null) {
try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
} catch (Exception e) {
try {
// Delete any (partially) downloaded output files.
for (OutputFile file : result.getOutputFilesList()) {
Expand All @@ -347,27 +346,17 @@ public void download(
tmpOutErr.clearOut();
tmpOutErr.clearErr();
}
} catch (IOException e) {
if (downloadException != null && e != downloadException) {
e.addSuppressed(downloadException);
}
if (interruptedException != null) {
e.addSuppressed(interruptedException);
}
} catch (IOException ioEx) {
ioEx.addSuppressed(e);

// If deleting of output files failed, we abort the build with a decent error message as
// any subsequent local execution failure would likely be incomprehensible.
throw new EnvironmentalExecException(
"Failed to delete output files after incomplete download", e);
ExecException execEx = new EnvironmentalExecException(
"Failed to delete output files after incomplete download", ioEx);
execEx.addSuppressed(e);
throw execEx;
}
}

if (interruptedException != null) {
throw interruptedException;
}

if (downloadException != null) {
throw downloadException;
throw e;
}

if (tmpOutErr != null) {
Expand Down Expand Up @@ -487,12 +476,15 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr)
throws IOException {
private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
try {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
Expand All @@ -501,8 +493,12 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
directExecutor()));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
try {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} catch (IOException e) {
downloads.add(Futures.immediateFailedFuture(e));
}
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
Expand Down Expand Up @@ -1115,9 +1111,4 @@ public Collection<SymlinkMetadata> symlinks() {
return symlinks.values();
}
}

@VisibleForTesting
protected <T> T getFromFuture(ListenableFuture<T> f) throws IOException, InterruptedException {
return Utils.getFromFuture(f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Digest;
Expand Down Expand Up @@ -57,21 +58,7 @@ private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blob
uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
}

try {
for (ListenableFuture<Void> upload : uploads) {
upload.get();
}
} catch (ExecutionException e) {
// Cancel remaining uploads.
for (ListenableFuture<Void> upload : uploads) {
upload.cancel(/* mayInterruptIfRunning= */ true);
}

Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
Throwables.propagateIfPossible(cause, InterruptedException.class);
throw new IOException(cause);
}
waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt=*/ false);
}

/**
Expand All @@ -91,7 +78,7 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
Utils.getFromFuture(cacheProtocol.findMissingDigests(allDigests));
getFromFuture(cacheProtocol.findMissingDigests(allDigests));
Map<Digest, Path> filesToUpload = new HashMap<>();
Map<Digest, ByteString> blobsToUpload = new HashMap<>();
for (Digest missingDigest : missingDigests) {
Expand Down
Loading

0 comments on commit eca40ab

Please sign in to comment.