diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java index 234326fa2f..3bbba1d703 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java @@ -67,7 +67,7 @@ private ReadableByteChannelSessionBuilder( this.read = read; this.retrier = retrier; this.resultRetryAlgorithm = resultRetryAlgorithm; - this.hasher = Hasher.noop(); + this.hasher = Hasher.defaultHasher(); this.autoGzipDecompression = false; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index ce41620d33..cef751213c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -30,6 +30,7 @@ import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable; +import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException; import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; import com.google.cloud.storage.Retrying.Retrier; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; @@ -104,7 +105,11 @@ public boolean shouldRetry( boolean isWatchdogTimeout = previousThrowable instanceof StorageException && previousThrowable.getCause() instanceof WatchdogTimeoutException; - boolean shouldRetry = isWatchdogTimeout || alg.shouldRetry(previousThrowable, null); + boolean isChecksumMismatch = + previousThrowable instanceof StorageException + && previousThrowable.getCause() instanceof UncheckedChecksumMismatchException; + boolean shouldRetry = + isWatchdogTimeout || isChecksumMismatch || alg.shouldRetry(previousThrowable, null); if (previousThrowable != null && !shouldRetry) { result.setException(previousThrowable); } @@ -146,6 +151,16 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { Thread.currentThread().interrupt(); throw new InterruptedIOException(); } + if (take instanceof IOException) { + IOException ioe = (IOException) take; + if (alg.shouldRetry(ioe, null)) { + readObjectObserver = null; + continue; + } else { + ioe.addSuppressed(new AsyncStorageTaskException()); + throw ioe; + } + } if (take instanceof Throwable) { Throwable throwable = (Throwable) take; BaseServiceException coalesce = StorageException.coalesce(throwable); @@ -153,6 +168,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { readObjectObserver = null; continue; } else { + close(); throw new IOException(coalesce); } } @@ -160,45 +176,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { complete = true; break; } - readObjectObserver.request(); - ReadObjectResponse resp = (ReadObjectResponse) take; - try (ResponseContentLifecycleHandle handle = - read.getResponseContentLifecycleManager().get(resp)) { - ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle); - if (resp.hasMetadata()) { - Object respMetadata = resp.getMetadata(); - if (metadata == null) { - metadata = respMetadata; - } else if (metadata.getGeneration() != respMetadata.getGeneration()) { - throw closeWithError( - String.format( - Locale.US, - "Mismatch Generation between subsequent reads. Expected %d but received %d", - metadata.getGeneration(), - respMetadata.getGeneration())); - } - } - ChecksummedData checksummedData = resp.getChecksummedData(); - ByteString content = checksummedData.getContent(); - int contentSize = content.size(); - // Very important to know whether a crc32c value is set. Without checking, protobuf will - // happily return 0, which is a valid crc32c value. - if (checksummedData.hasCrc32C()) { - Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); - try { - hasher.validate(expected, content); - } catch (IOException e) { - close(); - throw e; - } - } - ref.copy(c, dsts, offset, length); - if (ref.hasRemaining()) { - leftovers = ref; - } else { - ref.close(); - } + ReadObjectResponseChildRef ref = (ReadObjectResponseChildRef) take; + ref.copy(c, dsts, offset, length); + if (ref.hasRemaining()) { + leftovers = ref; + } else { + ref.close(); } } long read = c.read(); @@ -321,11 +305,10 @@ private void ensureStreamOpen() { } } - private IOException closeWithError(String message) throws IOException { - close(); + private IOException createError(String message) throws IOException { StorageException cause = new StorageException(HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, message); - throw new IOException(message, cause); + return new IOException(message, cause); } private final class ReadObjectObserver extends StateCheckingResponseObserver { @@ -335,10 +318,6 @@ private final class ReadObjectObserver extends StateCheckingResponseObserver handle = + read.getResponseContentLifecycleManager().get(response)) { + ChecksummedData checksummedData = response.getChecksummedData(); + ByteString content = checksummedData.getContent(); + int contentSize = content.size(); + // Very important to know whether a crc32c value is set. Without checking, protobuf will + // happily return 0, which is a valid crc32c value. + if (checksummedData.hasCrc32C()) { + Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); + try { + hasher.validateUnchecked(expected, content); + } catch (UncheckedChecksumMismatchException e) { + queue.offer(e); + return; + } + } + if (response.hasMetadata()) { + Object respMetadata = response.getMetadata(); + if (metadata == null) { + metadata = respMetadata; + } else if (metadata.getGeneration() != respMetadata.getGeneration()) { + IOException exception = + createError( + String.format( + Locale.US, + "Mismatch Generation between subsequent reads. Expected %d but received %d", + metadata.getGeneration(), + respMetadata.getGeneration())); + queue.offer(exception); + return; + } + } + queue.offer(ReadObjectResponseChildRef.from(handle)); + fetchOffset.addAndGet(contentSize); if (response.hasMetadata() && !result.isDone()) { result.set(response.getMetadata()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Code.ABORTED.toStatus().withCause(e).asRuntimeException(); + } catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index 5c31cb5339..9113af1e0c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -62,7 +62,7 @@ protected LazyReadChannel newLazyReadChannel() { ResumableMedia.gapic() .read() .byteChannel(read, retrier, resultRetryAlgorithm) - .setHasher(Hasher.noop()) + .setHasher(Hasher.defaultHasher()) .setAutoGzipDecompression(autoGzipDecompression); BufferHandle bufferHandle = getBufferHandle(); // because we're erasing the specific type of channel, we need to declare it here. diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index 8006d2b534..1e1c05915a 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.TestUtils.apiException; import static com.google.cloud.storage.TestUtils.getChecksummedData; +import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; @@ -236,7 +237,7 @@ public void readObject( } @Test - public void ioException_if_crc32c_mismatch_individual_message() + public void ifCrc32cMismatchIndividualMessage_restartFromCorrectOffset() throws IOException, InterruptedException { StorageGrpc.StorageImplBase fakeStorage = new StorageGrpc.StorageImplBase() { @@ -245,10 +246,12 @@ public void readObject( ReadObjectRequest request, StreamObserver responseObserver) { if (request.equals(req1)) { responseObserver.onNext(resp1); - ReadObjectResponse.Builder b = resp2.toBuilder(); + responseObserver.onNext(resp2); + ReadObjectResponse.Builder b = resp3.toBuilder(); // set a bad checksum value b.getChecksummedDataBuilder().setCrc32C(1); responseObserver.onNext(b.build()); + } else if (request.equals(req2)) { responseObserver.onNext(resp3); responseObserver.onNext(resp4); responseObserver.onCompleted(); @@ -276,10 +279,10 @@ public void readObject( retryOnly(DataLossException.class))); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { - IOException ioException = - assertThrows(IOException.class, () -> c.read(ByteBuffer.wrap(actualBytes))); + int read = c.read(ByteBuffer.wrap(actualBytes)); - assertThat(ioException).hasMessageThat().contains("Mismatch checksum"); + assertThat(read).isEqualTo(40); + assertThat(xxd(actualBytes)).isEqualTo(xxd(bytes)); } } }