From c378d9dc3900c1464f473350191f678e5857359d Mon Sep 17 00:00:00 2001 From: Googler Date: Thu, 28 Jan 2021 22:54:20 -0800 Subject: [PATCH] Remote: Use parameters instead of thread-local storage to provide tracing metadata. (Part 4) Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext. PiperOrigin-RevId: 354472775 --- .../ByteStreamBuildEventArtifactUploader.java | 29 +- ...reamBuildEventArtifactUploaderFactory.java | 13 +- .../build/lib/remote/ByteStreamUploader.java | 53 ++- .../build/lib/remote/GrpcCacheClient.java | 12 +- .../build/lib/remote/RemoteCache.java | 7 +- .../lib/remote/RemoteExecutionCache.java | 21 +- .../build/lib/remote/RemoteModule.java | 6 +- .../RemoteRepositoryRemoteExecutor.java | 3 +- .../build/lib/remote/RemoteSpawnRunner.java | 3 +- .../lib/remote/common/RemoteCacheClient.java | 7 +- .../remote/disk/DiskAndRemoteCacheClient.java | 14 +- .../lib/remote/disk/DiskCacheClient.java | 6 +- .../lib/remote/http/HttpCacheClient.java | 6 +- ...eStreamBuildEventArtifactUploaderTest.java | 7 +- .../lib/remote/ByteStreamUploaderTest.java | 312 +++++++++--------- .../build/lib/remote/GrpcCacheClientTest.java | 2 +- .../build/lib/remote/RemoteCacheTests.java | 69 ++-- .../RemoteRepositoryRemoteExecutorTest.java | 2 +- .../lib/remote/RemoteSpawnRunnerTest.java | 2 +- .../downloader/GrpcRemoteDownloaderTest.java | 22 +- .../lib/remote/http/HttpCacheClientTest.java | 23 +- .../lib/remote/util/InMemoryCacheClient.java | 6 +- .../build/remote/worker/ByteStreamServer.java | 6 +- .../build/remote/worker/CasServer.java | 7 +- .../remote/worker/OnDiskBlobStoreCache.java | 10 +- 25 files changed, 375 insertions(+), 273 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index 1b2fd7c4a539c5..0ca33b6bd4c8ed 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -29,7 +30,11 @@ import com.google.devtools.build.lib.buildeventstream.PathConverter; import com.google.devtools.build.lib.collect.ImmutableIterable; import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.Path; import io.grpc.Context; import io.netty.util.AbstractReferenceCounted; @@ -50,7 +55,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted implements BuildEventArtifactUploader { private final ListeningExecutorService uploadExecutor; - private final Context ctx; + private final String buildRequestId; + private final String commandId; private final ByteStreamUploader uploader; private final String remoteServerInstanceName; private final MissingDigestsFinder missingDigestsFinder; @@ -61,7 +67,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted ByteStreamUploader uploader, MissingDigestsFinder missingDigestsFinder, String remoteServerName, - Context ctx, + String buildRequestId, + String commandId, @Nullable String remoteInstanceName, int maxUploadThreads) { this.uploader = Preconditions.checkNotNull(uploader); @@ -69,7 +76,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted if (!Strings.isNullOrEmpty(remoteInstanceName)) { remoteServerInstanceName += "/" + remoteInstanceName; } - this.ctx = ctx; + this.buildRequestId = buildRequestId; + this.commandId = commandId; this.remoteServerInstanceName = remoteServerInstanceName; // Limit the maximum threads number to 1000 (chosen arbitrarily) this.uploadExecutor = @@ -153,6 +161,8 @@ private static List processQueryResult( */ private ListenableFuture> queryRemoteCache( ImmutableList> allPaths) throws Exception { + Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload"); + List knownRemotePaths = new ArrayList<>(allPaths.size()); List filesToQuery = new ArrayList<>(); Set digestsToQuery = new HashSet<>(); @@ -185,6 +195,11 @@ private ListenableFuture> queryRemoteCache( */ private ListenableFuture> uploadLocalFiles( ImmutableIterable allPaths) { + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload"); + RemoteActionExecutionContext context = + new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); + ImmutableList.Builder> allPathsUploaded = ImmutableList.builder(); for (PathMetadata path : allPaths) { @@ -192,12 +207,8 @@ private ListenableFuture> uploadLocalFiles( Chunker chunker = Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build(); final ListenableFuture upload; - Context prevCtx = ctx.attach(); - try { - upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false); - } finally { - ctx.detach(prevCtx); - } + upload = + uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false); allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor)); } else { allPathsUploaded.add(Futures.immediateFuture(path)); diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java index 978c10f325e50e..1c27fbe703d0c7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java @@ -18,7 +18,6 @@ import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory; import com.google.devtools.build.lib.runtime.CommandEnvironment; -import io.grpc.Context; import javax.annotation.Nullable; /** @@ -29,7 +28,8 @@ class ByteStreamBuildEventArtifactUploaderFactory implements private final ByteStreamUploader uploader; private final String remoteServerName; - private final Context ctx; + private final String buildRequestId; + private final String commandId; private final MissingDigestsFinder missingDigestsFinder; @Nullable private final String remoteInstanceName; @@ -37,12 +37,14 @@ class ByteStreamBuildEventArtifactUploaderFactory implements ByteStreamUploader uploader, MissingDigestsFinder missingDigestsFinder, String remoteServerName, - Context ctx, + String buildRequestId, + String commandId, @Nullable String remoteInstanceName) { this.uploader = uploader; this.missingDigestsFinder = missingDigestsFinder; this.remoteServerName = remoteServerName; - this.ctx = ctx; + this.buildRequestId = buildRequestId; + this.commandId = commandId; this.remoteInstanceName = remoteInstanceName; } @@ -52,7 +54,8 @@ public BuildEventArtifactUploader create(CommandEnvironment env) { uploader.retain(), missingDigestsFinder, remoteServerName, - ctx, + buildRequestId, + commandId, remoteInstanceName, env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index c1133796129b4b..bba8593ed91050 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -36,12 +36,12 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; -import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; @@ -134,9 +134,10 @@ public ByteStreamUploader( * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source fails */ - public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload) + public void uploadBlob( + RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) throws IOException, InterruptedException { - uploadBlobs(singletonMap(hash, chunker), forceUpload); + uploadBlobs(context, singletonMap(hash, chunker), forceUpload); } /** @@ -156,12 +157,14 @@ public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload) * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source or uploading fails */ - public void uploadBlobs(Map chunkers, boolean forceUpload) + public void uploadBlobs( + RemoteActionExecutionContext context, Map chunkers, boolean forceUpload) throws IOException, InterruptedException { List> uploads = new ArrayList<>(); for (Map.Entry chunkerEntry : chunkers.entrySet()) { - uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload)); + uploads.add( + uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload)); } try { @@ -200,14 +203,17 @@ void shutdown() { } } - /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */ + /** + * @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker, + * boolean)} instead. + */ @Deprecated @VisibleForTesting public ListenableFuture uploadBlobAsync( - HashCode hash, Chunker chunker, boolean forceUpload) { + RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) { Digest digest = Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build(); - return uploadBlobAsync(digest, chunker, forceUpload); + return uploadBlobAsync(context, digest, chunker, forceUpload); } /** @@ -227,7 +233,7 @@ public ListenableFuture uploadBlobAsync( * @throws IOException when reading of the {@link Chunker}s input source fails */ public ListenableFuture uploadBlobAsync( - Digest digest, Chunker chunker, boolean forceUpload) { + RemoteActionExecutionContext context, Digest digest, Chunker chunker, boolean forceUpload) { synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); @@ -242,7 +248,7 @@ public ListenableFuture uploadBlobAsync( ListenableFuture uploadResult = Futures.transform( - startAsyncUpload(digest, chunker), + startAsyncUpload(context, digest, chunker), (v) -> { synchronized (lock) { uploadedBlobs.add(HashCode.fromString(digest.getHash())); @@ -294,7 +300,8 @@ private static String buildUploadResourceName(String instanceName, UUID uuid, Di } /** Starts a file upload and returns a future representing the upload. */ - private ListenableFuture startAsyncUpload(Digest digest, Chunker chunker) { + private ListenableFuture startAsyncUpload( + RemoteActionExecutionContext context, Digest digest, Chunker chunker) { try { chunker.reset(); } catch (IOException e) { @@ -313,7 +320,13 @@ private ListenableFuture startAsyncUpload(Digest digest, Chunker chunker) String resourceName = buildUploadResourceName(instanceName, uploadId, digest); AsyncUpload newUpload = new AsyncUpload( - channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker); + context, + channel, + callCredentialsProvider, + callTimeoutSecs, + retrier, + resourceName, + chunker); ListenableFuture currUpload = newUpload.start(); currUpload.addListener( () -> { @@ -348,6 +361,7 @@ public ReferenceCounted touch(Object o) { private static class AsyncUpload { + private final RemoteActionExecutionContext context; private final Channel channel; private final CallCredentialsProvider callCredentialsProvider; private final long callTimeoutSecs; @@ -358,12 +372,14 @@ private static class AsyncUpload { private ClientCall call; AsyncUpload( + RemoteActionExecutionContext context, Channel channel, CallCredentialsProvider callCredentialsProvider, long callTimeoutSecs, Retrier retrier, String resourceName, Chunker chunker) { + this.context = context; this.channel = channel; this.callCredentialsProvider = callCredentialsProvider; this.callTimeoutSecs = callTimeoutSecs; @@ -373,7 +389,6 @@ private static class AsyncUpload { } ListenableFuture start() { - Context ctx = Context.current(); ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff); AtomicLong committedOffset = new AtomicLong(0); @@ -383,8 +398,7 @@ ListenableFuture start() { retrier.executeAsync( () -> { if (committedOffset.get() < chunker.getSize()) { - return ctx.call( - () -> callAndQueryOnFailure(committedOffset, progressiveBackoff)); + return callAndQueryOnFailure(committedOffset, progressiveBackoff); } return Futures.immediateFuture(null); }, @@ -409,7 +423,8 @@ ListenableFuture start() { private ByteStreamFutureStub bsFutureStub() { return ByteStreamGrpc.newFutureStub(channel) - .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) + .withInterceptors( + TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata())) .withCallCredentials(callCredentialsProvider.getCallCredentials()) .withDeadlineAfter(callTimeoutSecs, SECONDS); } @@ -420,7 +435,7 @@ private ListenableFuture callAndQueryOnFailure( call(committedOffset), Exception.class, (e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff), - Context.current().fixedContextExecutor(MoreExecutors.directExecutor())); + MoreExecutors.directExecutor()); } private ListenableFuture guardQueryWithSuppression( @@ -584,7 +599,9 @@ public void onReady() { } } }; - call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); + call.start( + callListener, + TracingMetadataUtils.headersFromRequestMetadata(context.getRequestMetadata())); call.request(1); return uploadResult; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index ef031d7d83771a..b2aeb4a8b0ef3a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -393,16 +393,22 @@ public void onCompleted() { } @Override - public ListenableFuture uploadFile(Digest digest, Path path) { + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path path) { return uploader.uploadBlobAsync( + context, digest, Chunker.builder().setInput(digest.getSizeBytes(), path).build(), /* forceUpload= */ true); } @Override - public ListenableFuture uploadBlob(Digest digest, ByteString data) { + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { return uploader.uploadBlobAsync( - digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true); + context, + digest, + Chunker.builder().setInput(data.toByteArray()).build(), + /* forceUpload= */ true); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index 3950814d09f253..3496243044c913 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java @@ -139,7 +139,7 @@ public ActionResult upload( int exitCode) throws ExecException, IOException, InterruptedException { ActionResult.Builder resultBuilder = ActionResult.newBuilder(); - uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder); + uploadOutputs(context, execRoot, actionKey, action, command, outputs, outErr, resultBuilder); resultBuilder.setExitCode(exitCode); ActionResult result = resultBuilder.build(); if (exitCode == 0 && !action.getDoNotCache()) { @@ -162,6 +162,7 @@ public ActionResult upload( } private void uploadOutputs( + RemoteActionExecutionContext context, Path execRoot, ActionKey actionKey, Action action, @@ -192,14 +193,14 @@ private void uploadOutputs( for (Digest digest : digestsToUpload) { Path file = digestToFile.get(digest); if (file != null) { - uploads.add(cacheProtocol.uploadFile(digest, file)); + uploads.add(cacheProtocol.uploadFile(context, digest, file)); } else { ByteString blob = digestToBlobs.get(digest); if (blob == null) { String message = "FindMissingBlobs call returned an unknown digest: " + digest; throw new IOException(message); } - uploads.add(cacheProtocol.uploadBlob(digest, blob)); + uploads.add(cacheProtocol.uploadBlob(context, digest, blob)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 88860253067e3e..439840492abddc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; @@ -53,7 +54,10 @@ public RemoteExecutionCache( * However, remote execution uses a cache to store input files, and that may be a separate * end-point from the executor itself, so the functionality lives here. */ - public void ensureInputsPresent(MerkleTree merkleTree, Map additionalInputs) + public void ensureInputsPresent( + RemoteActionExecutionContext context, + MerkleTree merkleTree, + Map additionalInputs) throws IOException, InterruptedException { Iterable allDigests = Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet()); @@ -62,30 +66,33 @@ public void ensureInputsPresent(MerkleTree merkleTree, Map addi List> uploadFutures = new ArrayList<>(); for (Digest missingDigest : missingDigests) { - uploadFutures.add(uploadBlob(missingDigest, merkleTree, additionalInputs)); + uploadFutures.add(uploadBlob(context, missingDigest, merkleTree, additionalInputs)); } waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false); } private ListenableFuture uploadBlob( - Digest digest, MerkleTree merkleTree, Map additionalInputs) { + RemoteActionExecutionContext context, + Digest digest, + MerkleTree merkleTree, + Map additionalInputs) { Directory node = merkleTree.getDirectoryByDigest(digest); if (node != null) { - return cacheProtocol.uploadBlob(digest, node.toByteString()); + return cacheProtocol.uploadBlob(context, digest, node.toByteString()); } PathOrBytes file = merkleTree.getFileByDigest(digest); if (file != null) { if (file.getBytes() != null) { - return cacheProtocol.uploadBlob(digest, file.getBytes()); + return cacheProtocol.uploadBlob(context, digest, file.getBytes()); } - return cacheProtocol.uploadFile(digest, file.getPath()); + return cacheProtocol.uploadFile(context, digest, file.getPath()); } Message message = additionalInputs.get(digest); if (message != null) { - return cacheProtocol.uploadBlob(digest, message.toByteString()); + return cacheProtocol.uploadBlob(context, digest, message.toByteString()); } return Futures.immediateFailedFuture( diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index dfa368bb7768d8..443d4a8f95e26a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -93,7 +93,6 @@ import com.google.devtools.common.options.OptionsParsingResult; import io.grpc.CallCredentials; import io.grpc.ClientInterceptor; -import io.grpc.Context; import io.grpc.ManagedChannel; import java.io.IOException; import java.util.HashSet; @@ -501,14 +500,13 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, uploader.retain()); uploader.release(); - Context requestContext = - TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "bes-upload"); buildEventArtifactUploaderFactoryDelegate.init( new ByteStreamBuildEventArtifactUploaderFactory( uploader, cacheClient, cacheChannel.authority(), - requestContext, + buildRequestId, + invocationId, remoteOptions.remoteInstanceName)); if (enableRemoteExecution) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java index 5d89161722f913..f4a05dc30a831a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java @@ -144,7 +144,8 @@ public ExecutionResult execute( additionalInputs.put(actionDigest, action); additionalInputs.put(commandHash, command); - remoteCache.ensureInputsPresent(merkleTree, additionalInputs); + remoteCache.ensureInputsPresent( + remoteActionExecutionContext, merkleTree, additionalInputs); } try (SilentCloseable c = diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index fa99c638b09359..3e9486c9b34907 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -337,7 +337,8 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) additionalInputs.put(commandHash, command); Duration networkTimeStart = networkTime.getDuration(); Stopwatch uploadTime = Stopwatch.createStarted(); - remoteCache.ensureInputsPresent(merkleTree, additionalInputs); + remoteCache.ensureInputsPresent( + remoteActionExecutionContext, merkleTree, additionalInputs); // subtract network time consumed here to ensure wall clock during upload is not // double // counted, and metrics time computation does not exceed total time diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index caf497f9df693c..628d214bec973b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -104,20 +104,23 @@ ListenableFuture downloadBlob( /** * Uploads a {@code file} to the CAS. * + * @param context the context for the action. * @param digest The digest of the file. * @param file The file to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadFile(Digest digest, Path file); + ListenableFuture uploadFile(RemoteActionExecutionContext context, Digest digest, Path file); /** * Uploads a BLOB to the CAS. * + * @param context the context for the action. * @param digest The digest of the blob. * @param data The BLOB to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob(Digest digest, ByteString data); + ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data); /** Close resources associated with the remote cache. */ void close(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java index 040a7d447836c2..60c501677d0cd9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java @@ -65,11 +65,12 @@ public void close() { } @Override - public ListenableFuture uploadFile(Digest digest, Path file) { + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { try { - diskCache.uploadFile(digest, file).get(); + diskCache.uploadFile(context, digest, file).get(); if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) { - remoteCache.uploadFile(digest, file).get(); + remoteCache.uploadFile(context, digest, file).get(); } } catch (ExecutionException e) { return Futures.immediateFailedFuture(e.getCause()); @@ -80,11 +81,12 @@ public ListenableFuture uploadFile(Digest digest, Path file) { } @Override - public ListenableFuture uploadBlob(Digest digest, ByteString data) { + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { try { - diskCache.uploadBlob(digest, data).get(); + diskCache.uploadBlob(context, digest, data).get(); if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteUploadLocalResults) { - remoteCache.uploadBlob(digest, data).get(); + remoteCache.uploadBlob(context, digest, data).get(); } } catch (ExecutionException e) { return Futures.immediateFailedFuture(e.getCause()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java index 027891353cfe4c..6778525ccca254 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java @@ -121,7 +121,8 @@ public void uploadActionResult( public void close() {} @Override - public ListenableFuture uploadFile(Digest digest, Path file) { + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { try (InputStream in = file.getInputStream()) { saveFile(digest.getHash(), in, /* actionResult= */ false); } catch (IOException e) { @@ -131,7 +132,8 @@ public ListenableFuture uploadFile(Digest digest, Path file) { } @Override - public ListenableFuture uploadBlob(Digest digest, ByteString data) { + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { try (InputStream in = data.newInput()) { saveFile(digest.getHash(), in, /* actionResult= */ false); } catch (IOException e) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 7a67bb5bdb696e..1d8a922299abab 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -671,7 +671,8 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile(Digest digest, Path file) { + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { try { return uploadAsync( digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); @@ -682,7 +683,8 @@ public ListenableFuture uploadFile(Digest digest, Path file) { } @Override - public ListenableFuture uploadBlob(Digest digest, ByteString data) { + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { return uploadAsync( digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index da6daeebd53f57..e633f2c701b05d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -333,7 +333,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception StaticMissingDigestsFinder digestQuerier = Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest))); ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class); - when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean())) + when(uploader.uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean())) .thenReturn(Futures.immediateFuture(null)); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader, digestQuerier); @@ -349,7 +349,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception // assert verify(digestQuerier).findMissingDigests(any()); - verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean()); + verify(uploader).uploadBlobAsync(any(), eq(localDigest), any(), anyBoolean()); assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash()); assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash()); } @@ -373,7 +373,8 @@ private ByteStreamBuildEventArtifactUploader newArtifactUploader( uploader, missingDigestsFinder, "localhost", - withEmptyMetadata, + "none", + "none", "instance", /* maxUploadThreads= */ 100); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index eec073a50a197b..29cd41eb4ef88f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -36,6 +36,9 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -86,9 +89,7 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -/** - * Tests for {@link ByteStreamUploader}. - */ +/** Tests for {@link ByteStreamUploader}. */ @RunWith(JUnit4.class) public class ByteStreamUploaderTest { @@ -102,6 +103,7 @@ public class ByteStreamUploaderTest { private Server server; private ManagedChannel channel; + private RemoteActionExecutionContext context; private Context withEmptyMetadata; private Context prevContext; @@ -112,12 +114,19 @@ public final void setUp() throws Exception { MockitoAnnotations.initMocks(this); String serverName = "Server for " + this.getClass(); - server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry) - .build().start(); + server = + InProcessServerBuilder.forName(serverName) + .fallbackHandlerRegistry(serviceRegistry) + .build() + .start(); channel = InProcessChannelBuilder.forName(serverName).build(); - withEmptyMetadata = - TracingMetadataUtils.contextWithMetadata( - "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance())); + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata( + "none", + "none", + DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash()); + context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); + withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata); retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); @@ -161,7 +170,8 @@ public void singleBlobUploadShouldWork() throws Exception { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - serviceRegistry.addService(new ByteStreamImplBase() { + serviceRegistry.addService( + new ByteStreamImplBase() { @Override public StreamObserver write(StreamObserver streamObserver) { return new StreamObserver() { @@ -183,8 +193,8 @@ public void onNext(WriteRequest writeRequest) { ByteString data = writeRequest.getData(); - System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset, - data.size()); + System.arraycopy( + data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); nextOffset += data.size(); boolean lastWrite = blob.length == nextOffset; @@ -210,7 +220,7 @@ public void onCompleted() { } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This test should not have triggered any retries. Mockito.verifyZeroInteractions(mockBackoff); @@ -328,7 +338,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This test should not have triggered any retries. Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class)); @@ -392,7 +402,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This test should not have triggered any retries. assertThat(numWriteCalls.get()).isEqualTo(1); @@ -466,7 +476,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This test should have triggered a single retry, because it made // no progress. @@ -508,7 +518,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This test should not have triggered any retries. Mockito.verifyZeroInteractions(mockBackoff); @@ -549,7 +559,7 @@ public StreamObserver write(StreamObserver streamOb }); try { - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); fail("Should have thrown an exception."); } catch (IOException e) { // expected @@ -592,7 +602,7 @@ public void multipleBlobsUploadShouldWork() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(chunkers, true); + uploader.uploadBlobs(context, chunkers, true); blockUntilInternalStateConsistent(uploader); @@ -690,16 +700,19 @@ public void queryWriteStatus( for (Map.Entry chunkerEntry : chunkers.entrySet()) { Digest actionDigest = chunkerEntry.getKey(); - Context ctx = - TracingMetadataUtils.contextWithMetadata( - "build-req-id", "command-id", DIGEST_UTIL.asActionKey(actionDigest)); - ctx.run( - () -> - uploads.add( - uploader.uploadBlobAsync( - HashCode.fromString(actionDigest.getHash()), - chunkerEntry.getValue(), - /* forceUpload=*/ true))); + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata( + "build-req-id", + "command-id", + DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash()); + RemoteActionExecutionContext remoteActionExecutionContext = + new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); + uploads.add( + uploader.uploadBlobAsync( + remoteActionExecutionContext, + actionDigest, + chunkerEntry.getValue(), + /* forceUpload= */ true)); } for (ListenableFuture upload : uploads) { @@ -776,7 +789,7 @@ public ServerCall.Listener interceptCall( } })); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); } @Test @@ -796,47 +809,48 @@ public void sameBlobShouldNotBeUploadedTwice() throws Exception { byte[] blob = new byte[CHUNK_SIZE * 10]; Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); - HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + Digest digest = DIGEST_UTIL.compute(blob); AtomicInteger numWriteCalls = new AtomicInteger(); CountDownLatch blocker = new CountDownLatch(1); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver response) { - numWriteCalls.incrementAndGet(); - try { - // Ensures that the first upload does not finish, before the second upload is started. - blocker.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + numWriteCalls.incrementAndGet(); + try { + // Ensures that the first upload does not finish, before the second upload is started. + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } - return new StreamObserver() { + return new StreamObserver() { - private long bytesReceived; + private long bytesReceived; - @Override - public void onNext(WriteRequest writeRequest) { - bytesReceived += writeRequest.getData().size(); - } + @Override + public void onNext(WriteRequest writeRequest) { + bytesReceived += writeRequest.getData().size(); + } - @Override - public void onError(Throwable throwable) { - fail("onError should never be called."); - } + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } - @Override - public void onCompleted() { - response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build()); - response.onCompleted(); + @Override + public void onCompleted() { + response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build()); + response.onCompleted(); + } + }; } - }; - } - }); + }); - Future upload1 = uploader.uploadBlobAsync(hash, chunker, true); - Future upload2 = uploader.uploadBlobAsync(hash, chunker, true); + Future upload1 = uploader.uploadBlobAsync(context, digest, chunker, true); + Future upload2 = uploader.uploadBlobAsync(context, digest, chunker, true); blocker.countDown(); @@ -866,16 +880,17 @@ public void errorsShouldBeReported() throws IOException, InterruptedException { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver response) { - response.onError(Status.INTERNAL.asException()); - return new NoopStreamObserver(); - } - }); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + response.onError(Status.INTERNAL.asException()); + return new NoopStreamObserver(); + } + }); try { - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue(); @@ -926,14 +941,14 @@ public void onCancel() { byte[] blob1 = new byte[CHUNK_SIZE]; Chunker chunker1 = Chunker.builder().setInput(blob1).setChunkSize(CHUNK_SIZE).build(); - HashCode hash1 = HashCode.fromString(DIGEST_UTIL.compute(blob1).getHash()); + Digest digest1 = DIGEST_UTIL.compute(blob1); byte[] blob2 = new byte[CHUNK_SIZE + 1]; Chunker chunker2 = Chunker.builder().setInput(blob2).setChunkSize(CHUNK_SIZE).build(); - HashCode hash2 = HashCode.fromString(DIGEST_UTIL.compute(blob2).getHash()); + Digest digest2 = DIGEST_UTIL.compute(blob2); - ListenableFuture f1 = uploader.uploadBlobAsync(hash1, chunker1, true); - ListenableFuture f2 = uploader.uploadBlobAsync(hash2, chunker2, true); + ListenableFuture f1 = uploader.uploadBlobAsync(context, digest1, chunker1, true); + ListenableFuture f2 = uploader.uploadBlobAsync(context, digest2, chunker2, true); assertThat(uploader.uploadsInProgress()).isTrue(); @@ -964,14 +979,15 @@ public void failureInRetryExecutorShouldBeHandled() throws Exception { /* callTimeoutSecs= */ 60, retrier); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver response) { - // Immediately fail the call, so that it is retried. - response.onError(Status.ABORTED.asException()); - return new NoopStreamObserver(); - } - }); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + // Immediately fail the call, so that it is retried. + response.onError(Status.ABORTED.asException()); + return new NoopStreamObserver(); + } + }); retryService.shutdownNow(); // Random very high timeout, as the test will timeout by itself. @@ -982,7 +998,7 @@ public StreamObserver write(StreamObserver response Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); try { - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -1004,35 +1020,34 @@ public void resourceNameWithoutInstanceName() throws Exception { /* callTimeoutSecs= */ 60, retrier); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver response) { - return new StreamObserver() { - @Override - public void onNext(WriteRequest writeRequest) { - // Test that the resource name doesn't start with an instance name. - assertThat(writeRequest.getResourceName()).startsWith("uploads/"); - } - + serviceRegistry.addService( + new ByteStreamImplBase() { @Override - public void onError(Throwable throwable) { + public StreamObserver write(StreamObserver response) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) { + // Test that the resource name doesn't start with an instance name. + assertThat(writeRequest.getResourceName()).startsWith("uploads/"); + } - } + @Override + public void onError(Throwable throwable) {} - @Override - public void onCompleted() { - response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build()); - response.onCompleted(); + @Override + public void onCompleted() { + response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build()); + response.onCompleted(); + } + }; } - }; - } - }); + }); byte[] blob = new byte[1]; Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); withEmptyMetadata.detach(prevContext); } @@ -1053,21 +1068,22 @@ public void nonRetryableStatusShouldNotBeRetried() throws Exception { AtomicInteger numCalls = new AtomicInteger(); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver response) { - numCalls.incrementAndGet(); - response.onError(Status.INTERNAL.asException()); - return new NoopStreamObserver(); - } - }); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + numCalls.incrementAndGet(); + response.onError(Status.INTERNAL.asException()); + return new NoopStreamObserver(); + } + }); byte[] blob = new byte[1]; Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); try { - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(numCalls.get()).isEqualTo(1); @@ -1139,7 +1155,7 @@ public void onCompleted() { StatusRuntimeException expected = null; try { // This should fail - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); } catch (IOException e) { if (e.getCause() instanceof StatusRuntimeException) { expected = (StatusRuntimeException) e.getCause(); @@ -1148,7 +1164,7 @@ public void onCompleted() { assertThat(expected).isNotNull(); assertThat(Status.fromThrowable(expected).getCode()).isEqualTo(Code.UNKNOWN); // This should trigger an upload. - uploader.uploadBlob(hash, chunker, false); + uploader.uploadBlob(context, hash, chunker, false); assertThat(numUploads.get()).isEqualTo(2); @@ -1177,42 +1193,43 @@ public void deduplicationOfUploadsShouldWork() throws Exception { HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); AtomicInteger numUploads = new AtomicInteger(); - serviceRegistry.addService(new ByteStreamImplBase() { - @Override - public StreamObserver write(StreamObserver streamObserver) { - numUploads.incrementAndGet(); - return new StreamObserver() { + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numUploads.incrementAndGet(); + return new StreamObserver() { - long nextOffset = 0; + long nextOffset = 0; - @Override - public void onNext(WriteRequest writeRequest) { - nextOffset += writeRequest.getData().size(); - boolean lastWrite = blob.length == nextOffset; - assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite); - } + @Override + public void onNext(WriteRequest writeRequest) { + nextOffset += writeRequest.getData().size(); + boolean lastWrite = blob.length == nextOffset; + assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite); + } - @Override - public void onError(Throwable throwable) { - fail("onError should never be called."); - } + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } - @Override - public void onCompleted() { - assertThat(nextOffset).isEqualTo(blob.length); + @Override + public void onCompleted() { + assertThat(nextOffset).isEqualTo(blob.length); - WriteResponse response = - WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); - streamObserver.onNext(response); - streamObserver.onCompleted(); + WriteResponse response = + WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); + streamObserver.onNext(response); + streamObserver.onCompleted(); + } + }; } - }; - } - }); + }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); // This should not trigger an upload. - uploader.uploadBlob(hash, chunker, false); + uploader.uploadBlob(context, hash, chunker, false); assertThat(numUploads.get()).isEqualTo(1); @@ -1271,11 +1288,7 @@ public StreamObserver write(StreamObserver streamOb } }); - assertThrows( - IOException.class, - () -> { - uploader.uploadBlob(hash, chunker, true); - }); + assertThrows(IOException.class, () -> uploader.uploadBlob(context, hash, chunker, true)); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1363,7 +1376,7 @@ public void onCompleted() { } }); - uploader.uploadBlob(hash, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1378,16 +1391,13 @@ public void onCompleted() { private static class NoopStreamObserver implements StreamObserver { @Override - public void onNext(WriteRequest writeRequest) { - } + public void onNext(WriteRequest writeRequest) {} @Override - public void onError(Throwable throwable) { - } + public void onError(Throwable throwable) {} @Override - public void onCompleted() { - } + public void onCompleted() {} } static class FixedBackoff implements Retrier.Backoff { diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index d22e1ed9a5fa73..c1f010429a152b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -317,7 +317,7 @@ public void onError(Throwable t) { }); // Upload all missing inputs (that is, the virtual action input from above) - client.ensureInputsPresent(merkleTree, ImmutableMap.of()); + client.ensureInputsPresent(remoteActionExecutionContext, merkleTree, ImmutableMap.of()); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java index 2fe3d729dfed3f..cd9906bcc5761c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java @@ -576,7 +576,7 @@ public void downloadRelativeSymlinkInDirectory() throws Exception { Directory.newBuilder() .addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("../foo"))) .build(); - Digest treeDigest = cache.addContents(tree.toByteArray()); + Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray()); ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest); // Doesn't check for dangling links, hence download succeeds. @@ -636,7 +636,7 @@ public void downloadAbsoluteSymlinkInDirectoryError() throws Exception { Directory.newBuilder() .addSymlinks(SymlinkNode.newBuilder().setName("link").setTarget("/foo"))) .build(); - Digest treeDigest = cache.addContents(tree.toByteArray()); + Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray()); ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest); IOException expected = @@ -660,10 +660,10 @@ public void downloadAbsoluteSymlinkInDirectoryError() throws Exception { public void downloadFailureMaintainsDirectories() throws Exception { InMemoryRemoteCache cache = newRemoteCache(); Tree tree = Tree.newBuilder().setRoot(Directory.newBuilder()).build(); - Digest treeDigest = cache.addContents(tree.toByteArray()); + Digest treeDigest = cache.addContents(remoteActionExecutionContext, tree.toByteArray()); Digest outputFileDigest = cache.addException("outputdir/outputfile", new IOException("download failed")); - Digest otherFileDigest = cache.addContents("otherfile"); + Digest otherFileDigest = cache.addContents(remoteActionExecutionContext, "otherfile"); ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectoriesBuilder().setPath("outputdir").setTreeDigest(treeDigest); @@ -692,9 +692,9 @@ public void onErrorWaitForRemainingDownloadsToComplete() throws Exception { Path stderr = fs.getPath("/execroot/stderr"); InMemoryRemoteCache cache = newRemoteCache(); - Digest digest1 = cache.addContents("file1"); + Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1"); Digest digest2 = cache.addException("file2", new IOException("download failed")); - Digest digest3 = cache.addContents("file3"); + Digest digest3 = cache.addContents(remoteActionExecutionContext, "file3"); ActionResult result = ActionResult.newBuilder() @@ -728,7 +728,7 @@ public void downloadWithMultipleErrorsAddsThemAsSuppressed() throws Exception { Path stderr = fs.getPath("/execroot/stderr"); InMemoryRemoteCache cache = newRemoteCache(); - Digest digest1 = cache.addContents("file1"); + Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1"); Digest digest2 = cache.addException("file2", new IOException("file2 failed")); Digest digest3 = cache.addException("file3", new IOException("file3 failed")); @@ -763,7 +763,7 @@ public void downloadWithDuplicateIOErrorsDoesNotSuppress() throws Exception { Path stderr = fs.getPath("/execroot/stderr"); InMemoryRemoteCache cache = newRemoteCache(); - Digest digest1 = cache.addContents("file1"); + Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1"); IOException reusedException = new IOException("reused io exception"); Digest digest2 = cache.addException("file2", reusedException); Digest digest3 = cache.addException("file3", reusedException); @@ -799,7 +799,7 @@ public void downloadWithDuplicateInterruptionsDoesNotSuppress() throws Exception Path stderr = fs.getPath("/execroot/stderr"); InMemoryRemoteCache cache = newRemoteCache(); - Digest digest1 = cache.addContents("file1"); + Digest digest1 = cache.addContents(remoteActionExecutionContext, "file1"); InterruptedException reusedInterruption = new InterruptedException("reused interruption"); Digest digest2 = cache.addException("file2", reusedInterruption); Digest digest3 = cache.addException("file3", reusedInterruption); @@ -839,8 +839,8 @@ public void testDownloadWithStdoutStderrOnSuccess() throws Exception { when(spyOutErr.childOutErr()).thenReturn(spyChildOutErr); InMemoryRemoteCache cache = newRemoteCache(); - Digest digestStdout = cache.addContents("stdout"); - Digest digestStderr = cache.addContents("stderr"); + Digest digestStdout = cache.addContents(remoteActionExecutionContext, "stdout"); + Digest digestStderr = cache.addContents(remoteActionExecutionContext, "stderr"); ActionResult result = ActionResult.newBuilder() @@ -917,8 +917,8 @@ public void testDownloadClashes() throws Exception { // arrange InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest d1 = remoteCache.addContents("content1"); - Digest d2 = remoteCache.addContents("content2"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1"); + Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2"); ActionResult r = ActionResult.newBuilder() .setExitCode(0) @@ -949,8 +949,8 @@ public void testDownloadMinimalFiles() throws Exception { // arrange InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest d1 = remoteCache.addContents("content1"); - Digest d2 = remoteCache.addContents("content2"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1"); + Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2"); ActionResult r = ActionResult.newBuilder() .setExitCode(0) @@ -996,19 +996,19 @@ public void testDownloadMinimalDirectory() throws Exception { // Output Directory: // dir/file1 // dir/a/file2 - Digest d1 = remoteCache.addContents("content1"); - Digest d2 = remoteCache.addContents("content2"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1"); + Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2"); FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build(); FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build(); Directory a = Directory.newBuilder().addFiles(file2).build(); - Digest da = remoteCache.addContents(a); + Digest da = remoteCache.addContents(remoteActionExecutionContext, a); Directory root = Directory.newBuilder() .addFiles(file1) .addDirectories(DirectoryNode.newBuilder().setName("a").setDigest(da)) .build(); Tree t = Tree.newBuilder().setRoot(root).addChildren(a).build(); - Digest dt = remoteCache.addContents(t); + Digest dt = remoteCache.addContents(remoteActionExecutionContext, t); ActionResult r = ActionResult.newBuilder() .setExitCode(0) @@ -1069,12 +1069,12 @@ public void testDownloadMinimalDirectoryFails() throws Exception { // Output Directory: // dir/file1 // dir/a/file2 - Digest d1 = remoteCache.addContents("content1"); - Digest d2 = remoteCache.addContents("content2"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1"); + Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2"); FileNode file1 = FileNode.newBuilder().setName("file1").setDigest(d1).build(); FileNode file2 = FileNode.newBuilder().setName("file2").setDigest(d2).build(); Directory a = Directory.newBuilder().addFiles(file2).build(); - Digest da = remoteCache.addContents(a); + Digest da = remoteCache.addContents(remoteActionExecutionContext, a); Directory root = Directory.newBuilder() .addFiles(file1) @@ -1126,8 +1126,8 @@ public void testDownloadMinimalWithStdoutStderr() throws Exception { // arrange InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest dOut = remoteCache.addContents("stdout"); - Digest dErr = remoteCache.addContents("stderr"); + Digest dOut = remoteCache.addContents(remoteActionExecutionContext, "stdout"); + Digest dErr = remoteCache.addContents(remoteActionExecutionContext, "stderr"); ActionResult r = ActionResult.newBuilder() .setExitCode(0) @@ -1167,8 +1167,8 @@ public void testDownloadMinimalWithInMemoryOutput() throws Exception { // arrange InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest d1 = remoteCache.addContents("content1"); - Digest d2 = remoteCache.addContents("content2"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "content1"); + Digest d2 = remoteCache.addContents(remoteActionExecutionContext, "content2"); ActionResult r = ActionResult.newBuilder() .setExitCode(0) @@ -1215,7 +1215,7 @@ public void testDownloadMinimalWithMissingInMemoryOutput() throws Exception { // arrange InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest d1 = remoteCache.addContents("in-memory output"); + Digest d1 = remoteCache.addContents(remoteActionExecutionContext, "in-memory output"); ActionResult r = ActionResult.newBuilder().setExitCode(0).build(); Artifact a1 = ActionsTestUtil.createArtifact(artifactRoot, "file1"); MetadataInjector injector = mock(MetadataInjector.class); @@ -1643,18 +1643,21 @@ private static class InMemoryRemoteCache extends RemoteCache { super(new InMemoryCacheClient(), options, digestUtil); } - Digest addContents(String txt) throws IOException, InterruptedException { - return addContents(txt.getBytes(UTF_8)); + Digest addContents(RemoteActionExecutionContext context, String txt) + throws IOException, InterruptedException { + return addContents(context, txt.getBytes(UTF_8)); } - Digest addContents(byte[] bytes) throws IOException, InterruptedException { + Digest addContents(RemoteActionExecutionContext context, byte[] bytes) + throws IOException, InterruptedException { Digest digest = digestUtil.compute(bytes); - Utils.getFromFuture(cacheProtocol.uploadBlob(digest, ByteString.copyFrom(bytes))); + Utils.getFromFuture(cacheProtocol.uploadBlob(context, digest, ByteString.copyFrom(bytes))); return digest; } - Digest addContents(Message m) throws IOException, InterruptedException { - return addContents(m.toByteArray()); + Digest addContents(RemoteActionExecutionContext context, Message m) + throws IOException, InterruptedException { + return addContents(context, m.toByteArray()); } Digest addException(String txt, Exception e) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java index 7fde5bcd734049..f313ec91cf235d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java @@ -93,7 +93,7 @@ public void testZeroExitCodeFromCache() throws IOException, InterruptedException assertThat(executionResult.exitCode()).isEqualTo(0); } - + @Test public void testNoneZeroExitCodeFromCache() throws IOException, InterruptedException { // Test that an ActionResult with a none-zero exit code is not accepted as cached. diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java index 4e2ecd6229c7f7..d811da1244b80d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java @@ -245,7 +245,7 @@ public void nonCachableSpawnsShouldNotBeCached_localFallback() throws Exception runner.exec(spawn, policy); verify(localRunner).exec(spawn, policy); - verify(cache).ensureInputsPresent(any(), any()); + verify(cache).ensureInputsPresent(any(), any(), any()); verifyNoMoreInteractions(cache); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java index 41fdd7f7fa54fd..2e13a3717e9d34 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java @@ -25,6 +25,7 @@ import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase; import build.bazel.remote.asset.v1.Qualifier; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; @@ -37,6 +38,9 @@ import com.google.devtools.build.lib.remote.ReferenceCountedChannel; import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -79,6 +83,7 @@ public class GrpcRemoteDownloaderTest { private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private final String fakeServerName = "fake server for " + getClass(); private Server fakeServer; + private RemoteActionExecutionContext context; private Context withEmptyMetadata; private Context prevContext; private ListeningScheduledExecutorService retryService; @@ -92,9 +97,13 @@ public final void setUp() throws Exception { .directExecutor() .build() .start(); - withEmptyMetadata = - TracingMetadataUtils.contextWithMetadata( - "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance())); + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata( + "none", + "none", + DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash()); + context = new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); + withEmptyMetadata = TracingMetadataUtils.contextWithMetadata(metadata); retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); @@ -189,7 +198,7 @@ public void fetchBlob( final RemoteCacheClient cacheClient = new InMemoryCacheClient(); final GrpcRemoteDownloader downloader = newDownloader(cacheClient); - getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content))); + getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content))); final byte[] downloaded = downloadBlob( downloader, new URL("http://example.com/content.txt"), Optional.empty()); @@ -225,7 +234,7 @@ public void fetchBlob( final RemoteCacheClient cacheClient = new InMemoryCacheClient(); final GrpcRemoteDownloader downloader = newDownloader(cacheClient); - getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFrom(content))); + getFromFuture(cacheClient.uploadBlob(context, contentDigest, ByteString.copyFrom(content))); final byte[] downloaded = downloadBlob( downloader, @@ -263,7 +272,8 @@ public void fetchBlob( final RemoteCacheClient cacheClient = new InMemoryCacheClient(); final GrpcRemoteDownloader downloader = newDownloader(cacheClient); - getFromFuture(cacheClient.uploadBlob(contentDigest, ByteString.copyFromUtf8("wrong content"))); + getFromFuture( + cacheClient.uploadBlob(context, contentDigest, ByteString.copyFromUtf8("wrong content"))); IOException e = assertThrows( diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 7515f32834fe12..5c091c29f758a8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -319,7 +319,7 @@ public void testUploadAtMostOnce() throws Exception { ByteString data = ByteString.copyFrom("foo bar", StandardCharsets.UTF_8); Digest digest = DIGEST_UTIL.compute(data.toByteArray()); - blobStore.uploadBlob(digest, data).get(); + blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get(); assertThat(cacheContents).hasSize(1); String cacheKey = "/cas/" + digest.getHash(); @@ -329,7 +329,7 @@ public void testUploadAtMostOnce() throws Exception { // Clear the remote cache contents cacheContents.clear(); - blobStore.uploadBlob(digest, data).get(); + blobStore.uploadBlob(remoteActionExecutionContext, digest, data).get(); // Nothing should have been uploaded again. assertThat(cacheContents).isEmpty(); @@ -369,7 +369,9 @@ protected void channelRead0( Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); byte[] data = "File Contents".getBytes(Charsets.US_ASCII); - getFromFuture(blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data))); + getFromFuture( + blobStore.uploadBlob( + remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data))); fail("Exception expected"); } finally { testServer.stop(server); @@ -432,7 +434,10 @@ protected void channelRead0( IOException.class, () -> getFromFuture( - blobStore.uploadBlob(DIGEST_UTIL.compute(data.toByteArray()), data))); + blobStore.uploadBlob( + remoteActionExecutionContext, + DIGEST_UTIL.compute(data.toByteArray()), + data))); assertThat(e.getCause()).isInstanceOf(TooLongFrameException.class); } finally { testServer.stop(server); @@ -564,7 +569,10 @@ private void expiredAuthTokensShouldBeRetried_put( Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); byte[] data = "File Contents".getBytes(Charsets.US_ASCII); - blobStore.uploadBlob(DIGEST_UTIL.compute(data), ByteString.copyFrom(data)).get(); + blobStore + .uploadBlob( + remoteActionExecutionContext, DIGEST_UTIL.compute(data), ByteString.copyFrom(data)) + .get(); verify(credentials, times(1)).refresh(); verify(credentials, times(2)).getRequestMetadata(any(URI.class)); verify(credentials, times(2)).hasRequestMetadata(); @@ -621,7 +629,10 @@ private void errorCodeThatShouldNotBeRetried_put( HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); byte[] oneByte = new byte[] {0}; getFromFuture( - blobStore.uploadBlob(DIGEST_UTIL.compute(oneByte), ByteString.copyFrom(oneByte))); + blobStore.uploadBlob( + remoteActionExecutionContext, + DIGEST_UTIL.compute(oneByte), + ByteString.copyFrom(oneByte))); fail("Exception expected."); } catch (Exception e) { assertThat(e).isInstanceOf(HttpException.class); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index 9c78ef0ffde548..9894f836eb8a43 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -107,7 +107,8 @@ public void uploadActionResult( } @Override - public ListenableFuture uploadFile(Digest digest, Path file) { + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { try (InputStream in = file.getInputStream()) { cas.put(digest, ByteStreams.toByteArray(in)); } catch (IOException e) { @@ -117,7 +118,8 @@ public ListenableFuture uploadFile(Digest digest, Path file) { } @Override - public ListenableFuture uploadBlob(Digest digest, ByteString data) { + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { try (InputStream in = data.newInput()) { cas.put(digest, data.toByteArray()); } catch (IOException e) { diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 8bf39e88be4a2f..2ed32e5dcbc531 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -103,6 +103,10 @@ public void read(ReadRequest request, StreamObserver responseObser @Override public StreamObserver write(final StreamObserver responseObserver) { + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + RemoteActionExecutionContext context = + new RemoteActionExecutionContextImpl(meta, new NetworkTime()); + Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString()); try { FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory()); @@ -226,7 +230,7 @@ public void onCompleted() { try { Digest d = digestUtil.compute(temp); - getFromFuture(cache.uploadFile(d, temp)); + getFromFuture(cache.uploadFile(context, d, temp)); try { temp.delete(); } catch (IOException e) { diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java index 9b3bba571f5461..e862939094cf82 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java @@ -68,12 +68,16 @@ public void findMissingBlobs( @Override public void batchUpdateBlobs( BatchUpdateBlobsRequest request, StreamObserver responseObserver) { + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + RemoteActionExecutionContext context = + new RemoteActionExecutionContextImpl(meta, new NetworkTime()); + BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder(); for (BatchUpdateBlobsRequest.Request r : request.getRequestsList()) { BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder(); try { Digest digest = cache.getDigestUtil().compute(r.getData().toByteArray()); - getFromFuture(cache.uploadBlob(digest, r.getData())); + getFromFuture(cache.uploadBlob(context, digest, r.getData())); if (!r.getDigest().equals(digest)) { String err = "Upload digest " + r.getDigest() + " did not match data digest: " + digest; @@ -94,6 +98,7 @@ public void getTree(GetTreeRequest request, StreamObserver resp RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); RemoteActionExecutionContext context = new RemoteActionExecutionContextImpl(meta, new NetworkTime()); + // Directories are returned in depth-first order. We store all previously-traversed digests so // identical subtrees having the same digest will only be traversed and returned once. Set seenDigests = new HashSet<>(); diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java index 804f739089a503..1f39c313dbf60b 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java @@ -61,12 +61,14 @@ public void downloadTree( } } - public ListenableFuture uploadFile(Digest digest, Path file) { - return cacheProtocol.uploadFile(digest, file); + public ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { + return cacheProtocol.uploadFile(context, digest, file); } - public ListenableFuture uploadBlob(Digest digest, ByteString data) { - return cacheProtocol.uploadBlob(digest, data); + public ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { + return cacheProtocol.uploadBlob(context, digest, data); } public void uploadActionResult(