Skip to content

Commit

Permalink
Simplify RemoteExecutionCache
Browse files Browse the repository at this point in the history
This is in preparation for adding an additional cache to RemoteExecutionCache
to deduplicate file uploads.

Progress on #12113.

Change-Id: I3cffc5acc0bfb1b26e908f8edf44e30a4be0db86

Closes #12114.

Change-Id: I3cffc5acc0bfb1b26e908f8edf44e30a4be0db86
PiperOrigin-RevId: 332182515
  • Loading branch information
ulfjack authored and copybara-github committed Sep 17, 2020
1 parent e3285b8 commit 4f008c5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ private void uploadOutputs(
}
}

public static <T> void waitForBulkTransfer(
Iterable<ListenableFuture<T>> transfers, boolean cancelRemainingOnInterrupt)
public static void waitForBulkTransfer(
Iterable<? extends ListenableFuture<?>> transfers, boolean cancelRemainingOnInterrupt)
throws BulkTransferException, InterruptedException {
BulkTransferException bulkTransferException = null;
InterruptedException interruptedException = null;
boolean interrupted = Thread.currentThread().isInterrupted();
for (ListenableFuture<T> transfer : transfers) {
for (ListenableFuture<?> transfer : transfers) {
try {
if (interruptedException == null) {
// Wait for all transfers to finish.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -43,21 +40,6 @@ public RemoteExecutionCache(
super(protocolImpl, options, digestUtil);
}

private void uploadMissing(Map<Digest, Path> files, Map<Digest, ByteString> blobs)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Map.Entry<Digest, Path> entry : files.entrySet()) {
uploads.add(cacheProtocol.uploadFile(entry.getKey(), entry.getValue()));
}

for (Map.Entry<Digest, ByteString> entry : blobs.entrySet()) {
uploads.add(cacheProtocol.uploadBlob(entry.getKey(), entry.getValue()));
}

waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt=*/ false);
}

/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
Expand All @@ -76,28 +58,27 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
ImmutableSet<Digest> missingDigests =
getFromFuture(cacheProtocol.findMissingDigests(allDigests));
Map<Digest, Path> filesToUpload = new HashMap<>();
Map<Digest, ByteString> blobsToUpload = new HashMap<>();
List<ListenableFuture<?>> uploadFutures = new ArrayList<>();
for (Digest missingDigest : missingDigests) {
Directory node = merkleTree.getDirectoryByDigest(missingDigest);
if (node != null) {
blobsToUpload.put(missingDigest, node.toByteString());
uploadFutures.add(cacheProtocol.uploadBlob(missingDigest, node.toByteString()));
continue;
}

PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
if (file != null) {
if (file.getBytes() != null) {
blobsToUpload.put(missingDigest, file.getBytes());
uploadFutures.add(cacheProtocol.uploadBlob(missingDigest, file.getBytes()));
continue;
}
filesToUpload.put(missingDigest, file.getPath());
uploadFutures.add(cacheProtocol.uploadFile(missingDigest, file.getPath()));
continue;
}

Message message = additionalInputs.get(missingDigest);
if (message != null) {
blobsToUpload.put(missingDigest, message.toByteString());
uploadFutures.add(cacheProtocol.uploadBlob(missingDigest, message.toByteString()));
continue;
}

Expand All @@ -107,6 +88,6 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map<Digest, Message> addi
missingDigest));
}

uploadMissing(filesToUpload, blobsToUpload);
waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
}
}

0 comments on commit 4f008c5

Please sign in to comment.