Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private ReadableByteChannelSessionBuilder(
this.read = read;
this.retrier = retrier;
this.resultRetryAlgorithm = resultRetryAlgorithm;
this.hasher = Hasher.noop();
this.hasher = Hasher.defaultHasher();
this.autoGzipDecompression = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable;
import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException;
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
import com.google.cloud.storage.Retrying.Retrier;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
Expand Down Expand Up @@ -104,7 +105,11 @@ public boolean shouldRetry(
boolean isWatchdogTimeout =
previousThrowable instanceof StorageException
&& previousThrowable.getCause() instanceof WatchdogTimeoutException;
boolean shouldRetry = isWatchdogTimeout || alg.shouldRetry(previousThrowable, null);
boolean isChecksumMismatch =
previousThrowable instanceof StorageException
&& previousThrowable.getCause() instanceof UncheckedChecksumMismatchException;
boolean shouldRetry =
isWatchdogTimeout || isChecksumMismatch || alg.shouldRetry(previousThrowable, null);
if (previousThrowable != null && !shouldRetry) {
result.setException(previousThrowable);
}
Expand Down Expand Up @@ -146,59 +151,38 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
if (take instanceof IOException) {
IOException ioe = (IOException) take;
if (alg.shouldRetry(ioe, null)) {
readObjectObserver = null;
continue;
} else {
ioe.addSuppressed(new AsyncStorageTaskException());
throw ioe;
}
}
if (take instanceof Throwable) {
Throwable throwable = (Throwable) take;
BaseServiceException coalesce = StorageException.coalesce(throwable);
if (alg.shouldRetry(coalesce, null)) {
readObjectObserver = null;
continue;
} else {
close();
throw new IOException(coalesce);
}
}
if (take == EOF_MARKER) {
complete = true;
break;
}
readObjectObserver.request();

ReadObjectResponse resp = (ReadObjectResponse) take;
try (ResponseContentLifecycleHandle<ReadObjectResponse> handle =
read.getResponseContentLifecycleManager().get(resp)) {
ReadObjectResponseChildRef ref = ReadObjectResponseChildRef.from(handle);
if (resp.hasMetadata()) {
Object respMetadata = resp.getMetadata();
if (metadata == null) {
metadata = respMetadata;
} else if (metadata.getGeneration() != respMetadata.getGeneration()) {
throw closeWithError(
String.format(
Locale.US,
"Mismatch Generation between subsequent reads. Expected %d but received %d",
metadata.getGeneration(),
respMetadata.getGeneration()));
}
}
ChecksummedData checksummedData = resp.getChecksummedData();
ByteString content = checksummedData.getContent();
int contentSize = content.size();
// Very important to know whether a crc32c value is set. Without checking, protobuf will
// happily return 0, which is a valid crc32c value.
if (checksummedData.hasCrc32C()) {
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
try {
hasher.validate(expected, content);
} catch (IOException e) {
close();
throw e;
}
}
ref.copy(c, dsts, offset, length);
if (ref.hasRemaining()) {
leftovers = ref;
} else {
ref.close();
}
ReadObjectResponseChildRef ref = (ReadObjectResponseChildRef) take;
ref.copy(c, dsts, offset, length);
if (ref.hasRemaining()) {
leftovers = ref;
} else {
ref.close();
}
}
long read = c.read();
Expand Down Expand Up @@ -321,11 +305,10 @@ private void ensureStreamOpen() {
}
}

private IOException closeWithError(String message) throws IOException {
close();
private IOException createError(String message) throws IOException {
StorageException cause =
new StorageException(HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, message);
throw new IOException(message, cause);
return new IOException(message, cause);
}

private final class ReadObjectObserver extends StateCheckingResponseObserver<ReadObjectResponse> {
Expand All @@ -335,10 +318,6 @@ private final class ReadObjectObserver extends StateCheckingResponseObserver<Rea

private volatile StreamController controller;

void request() {
controller.request(1);
}

void cancel() {
controller.cancel();
}
Expand All @@ -352,16 +331,50 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(ReadObjectResponse response) {
try {
open.set(null);
queue.offer(response);
fetchOffset.addAndGet(response.getChecksummedData().getContent().size());
controller.request(1);
open.set(null);
try (ResponseContentLifecycleHandle<ReadObjectResponse> handle =
read.getResponseContentLifecycleManager().get(response)) {
ChecksummedData checksummedData = response.getChecksummedData();
ByteString content = checksummedData.getContent();
int contentSize = content.size();
// Very important to know whether a crc32c value is set. Without checking, protobuf will
// happily return 0, which is a valid crc32c value.
if (checksummedData.hasCrc32C()) {
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
try {
hasher.validateUnchecked(expected, content);
} catch (UncheckedChecksumMismatchException e) {
queue.offer(e);
return;
}
}
if (response.hasMetadata()) {
Object respMetadata = response.getMetadata();
if (metadata == null) {
metadata = respMetadata;
} else if (metadata.getGeneration() != respMetadata.getGeneration()) {
IOException exception =
createError(
String.format(
Locale.US,
"Mismatch Generation between subsequent reads. Expected %d but received %d",
metadata.getGeneration(),
respMetadata.getGeneration()));
queue.offer(exception);
return;
}
}
queue.offer(ReadObjectResponseChildRef.from(handle));
fetchOffset.addAndGet(contentSize);
if (response.hasMetadata() && !result.isDone()) {
result.set(response.getMetadata());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Code.ABORTED.toStatus().withCause(e).asRuntimeException();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected LazyReadChannel<?, Object> newLazyReadChannel() {
ResumableMedia.gapic()
.read()
.byteChannel(read, retrier, resultRetryAlgorithm)
.setHasher(Hasher.noop())
.setHasher(Hasher.defaultHasher())
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
// because we're erasing the specific type of channel, we need to declare it here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.storage.TestUtils.apiException;
import static com.google.cloud.storage.TestUtils.getChecksummedData;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;

Expand Down Expand Up @@ -236,7 +237,7 @@ public void readObject(
}

@Test
public void ioException_if_crc32c_mismatch_individual_message()
public void ifCrc32cMismatchIndividualMessage_restartFromCorrectOffset()
throws IOException, InterruptedException {
StorageGrpc.StorageImplBase fakeStorage =
new StorageGrpc.StorageImplBase() {
Expand All @@ -245,10 +246,12 @@ public void readObject(
ReadObjectRequest request, StreamObserver<ReadObjectResponse> responseObserver) {
if (request.equals(req1)) {
responseObserver.onNext(resp1);
ReadObjectResponse.Builder b = resp2.toBuilder();
responseObserver.onNext(resp2);
ReadObjectResponse.Builder b = resp3.toBuilder();
// set a bad checksum value
b.getChecksummedDataBuilder().setCrc32C(1);
responseObserver.onNext(b.build());
} else if (request.equals(req2)) {
responseObserver.onNext(resp3);
responseObserver.onNext(resp4);
responseObserver.onCompleted();
Expand Down Expand Up @@ -276,10 +279,10 @@ public void readObject(
retryOnly(DataLossException.class)));
byte[] actualBytes = new byte[40];
try (UnbufferedReadableByteChannel c = session.open()) {
IOException ioException =
assertThrows(IOException.class, () -> c.read(ByteBuffer.wrap(actualBytes)));
int read = c.read(ByteBuffer.wrap(actualBytes));

assertThat(ioException).hasMessageThat().contains("Mismatch checksum");
assertThat(read).isEqualTo(40);
assertThat(xxd(actualBytes)).isEqualTo(xxd(bytes));
}
}
}
Expand Down