diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index cc989ccf83a497..7e137c4f2fab21 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -144,17 +144,17 @@ public void reset() throws IOException { } /** - * Seek to an offset, if necessary resetting or initializing + * Seek to an offset in the source stream. * - *

May close open resources in order to seek to an earlier offset. + *

May close and reopen resources in order to seek to an earlier offset. */ public void seek(long toOffset) throws IOException { - if (toOffset < offset) { + if (initialized && toOffset >= offset && !compressed) { + ByteStreams.skipFully(data, toOffset - offset); + } else { reset(); + initialize(toOffset); } - maybeInitialize(); - ByteStreams.skipFully(data, toOffset - offset); - offset = toOffset; if (data.finished()) { close(); } @@ -247,18 +247,26 @@ private void maybeInitialize() throws IOException { if (initialized) { return; } + initialize(0); + } + + private void initialize(long srcPos) throws IOException { + checkState(!initialized); checkState(data == null); checkState(offset == 0); checkState(chunkCache == null); try { + var src = dataSupplier.get(); + ByteStreams.skipFully(src, srcPos); data = compressed - ? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get())) - : new ChunkerInputStream(dataSupplier.get()); + ? new ChunkerInputStream(new ZstdCompressingInputStream(src)) + : new ChunkerInputStream(src); } catch (RuntimeException e) { Throwables.propagateIfPossible(e.getCause(), IOException.class); throw e; } + offset = srcPos; initialized = true; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index de2ff4d1ab44c4..6cf353459b3b7c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -68,6 +68,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -373,23 +374,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception { retrier, /*maximumOpenFiles=*/ -1); - byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + int chunkSize = 1024; + int skipSize = chunkSize + 1; + byte[] blob = new byte[chunkSize * 2 + 1]; new Random().nextBytes(blob); Chunker chunker = - Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); Digest digest = DIGEST_UTIL.compute(blob); - while (chunker.hasNext()) { - chunker.next(); - } - long expectedSize = chunker.getOffset(); - chunker.reset(); - + ByteArrayOutputStream output = new ByteArrayOutputStream(); serviceRegistry.addService( new ByteStreamImplBase() { - - byte[] receivedData = new byte[(int) expectedSize]; String receivedResourceName = null; boolean receivedComplete = false; long nextOffset = 0; @@ -414,21 +410,21 @@ public void onNext(WriteRequest writeRequest) { assertThat(resourceName).isEmpty(); } - assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset); - - ByteString data = writeRequest.getData(); - - System.arraycopy( - data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); - - nextOffset += data.size(); - receivedComplete = expectedSize == nextOffset; - assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete); - if (initialOffset == 0) { streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); mustQueryWriteStatus = true; - initialOffset = nextOffset; + initialOffset = skipSize; + nextOffset = initialOffset; + } else { + ByteString data = writeRequest.getData(); + try { + data.writeTo(output); + } catch (IOException e) { + streamObserver.onError(e); + return; + } + nextOffset += data.size(); + receivedComplete = writeRequest.getFinishWrite(); } } @@ -439,10 +435,6 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - assertThat(nextOffset).isEqualTo(expectedSize); - byte[] decompressed = Zstd.decompress(receivedData, blob.length); - assertThat(decompressed).isEqualTo(blob); - WriteResponse response = WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); streamObserver.onNext(response); @@ -460,7 +452,7 @@ public void queryWriteStatus( if (receivedResourceName != null && receivedResourceName.equals(resourceName)) { assertThat(mustQueryWriteStatus).isTrue(); mustQueryWriteStatus = false; - committedSize = nextOffset; + committedSize = receivedComplete ? blob.length : skipSize; complete = receivedComplete; } else { committedSize = 0; @@ -476,6 +468,9 @@ public void queryWriteStatus( }); uploader.uploadBlob(context, digest, chunker); + byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); + assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) + .isTrue(); // This test triggers one retry. Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));