diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java index 7cc2f74e8..d997400de 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java @@ -63,23 +63,27 @@ final class JsonResumableSession { new JsonResumableSessionPutTask( context, resumableWrite.getUploadId(), content, contentRange); HttpRpcContext httpRpcContext = HttpRpcContext.getInstance(); - httpRpcContext.newInvocationId(); - AtomicBoolean dirty = new AtomicBoolean(false); - return Retrying.run( - deps, - alg, - () -> { - if (dirty.getAndSet(true)) { - ResumableOperationResult<@Nullable StorageObject> query = query(); - long persistedSize = query.getPersistedSize(); - if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) { - return query; - } else { - task.rewindTo(persistedSize); + try { + httpRpcContext.newInvocationId(); + AtomicBoolean dirty = new AtomicBoolean(false); + return Retrying.run( + deps, + alg, + () -> { + if (dirty.getAndSet(true)) { + ResumableOperationResult<@Nullable StorageObject> query = query(); + long persistedSize = query.getPersistedSize(); + if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) { + return query; + } else { + task.rewindTo(persistedSize); + } } - } - return task.call(); - }, - Decoder.identity()); + return task.call(); + }, + Decoder.identity()); + } finally { + httpRpcContext.clearInvocationId(); + } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index c276c511a..3aa17cc2d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -44,9 +44,12 @@ import com.google.api.client.util.Data; import com.google.api.services.storage.Storage; import com.google.api.services.storage.Storage.Objects.Compose; +import com.google.api.services.storage.Storage.Objects.Delete; import com.google.api.services.storage.Storage.Objects.Get; import com.google.api.services.storage.Storage.Objects.Insert; import com.google.api.services.storage.Storage.Objects.Move; +import com.google.api.services.storage.Storage.Objects.Patch; +import com.google.api.services.storage.StorageRequest; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Bucket.RetentionPolicy; import com.google.api.services.storage.model.BucketAccessControl; @@ -109,6 +112,7 @@ public class HttpStorageRpc implements StorageRpc { // declare this HttpStatus code here as it's not included in java.net.HttpURLConnection private static final int SC_REQUESTED_RANGE_NOT_SATISFIABLE = 416; private static final boolean IS_RECORD_EVENTS = true; + private static final String X_GOOG_GCS_IDEMPOTENCY_TOKEN = "x-goog-gcs-idempotency-token"; private final StorageOptions options; private final Storage storage; @@ -208,7 +212,7 @@ public void intercept(HttpRequest request) throws IOException { .filter(java.util.Objects::nonNull) .collect(JOINER); headers.set("x-goog-api-client", newValue); - headers.set("x-goog-gcs-idempotency-token", invocationId); + headers.set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, invocationId); String userAgent = headers.getUserAgent(); if ((userAgent == null @@ -247,7 +251,9 @@ public void addDelete( batches.add(storage.batch()); currentBatchSize = 0; } - deleteCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback)); + Delete call = deleteCall(storageObject, options); + addIdempotencyTokenToCall(call); + call.queue(batches.getLast(), toJsonCallback(callback)); currentBatchSize++; } catch (IOException ex) { throw translate(ex); @@ -264,7 +270,9 @@ public void addPatch( batches.add(storage.batch()); currentBatchSize = 0; } - patchCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback)); + Patch call = patchCall(storageObject, options); + addIdempotencyTokenToCall(call); + call.queue(batches.getLast(), toJsonCallback(callback)); currentBatchSize++; } catch (IOException ex) { throw translate(ex); @@ -281,7 +289,9 @@ public void addGet( batches.add(storage.batch()); currentBatchSize = 0; } - getCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback)); + Get call = getCall(storageObject, options); + addIdempotencyTokenToCall(call); + call.queue(batches.getLast(), toJsonCallback(callback)); currentBatchSize++; } catch (IOException ex) { throw translate(ex); @@ -310,6 +320,12 @@ public void submit() { span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS); } } + + private void addIdempotencyTokenToCall(StorageRequest call) { + HttpRpcContext instance = HttpRpcContext.getInstance(); + call.getRequestHeaders().set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, instance.newInvocationId()); + instance.clearInvocationId(); + } } private static JsonBatchCallback toJsonCallback(final RpcBatch.Callback callback) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBatchTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBatchTest.java index d1d1c97a5..f5b775f36 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBatchTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBatchTest.java @@ -23,12 +23,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobTargetOption; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; @@ -40,6 +43,14 @@ import com.google.cloud.storage.it.runner.annotations.StorageFixture; import com.google.cloud.storage.it.runner.registry.Generator; import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -208,4 +219,46 @@ public void testBatchRequestFail() { assertThat(e.getMessage()).contains("Invalid argument"); } } + + @Test + public void batchSuccessiveUpdatesWork() { + byte[] bytes = DataGenerator.base64Characters().genBytes(137); + + List blobs = + IntStream.range(0, 2) + .mapToObj( + i -> { + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + try (WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist())) { + writer.write(ByteBuffer.wrap(bytes)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return info.getBlobId(); + }) + .collect(Collectors.toList()); + + OffsetDateTime now1 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC); + + List update1 = + storage.update( + blobs.stream() + .map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now1).build()) + .collect(Collectors.toList())); + + OffsetDateTime now2 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC); + List update2 = + storage.update( + blobs.stream() + .map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now2).build()) + .collect(Collectors.toList())); + + assertThat( + update2.stream() + .filter(b -> !now2.equals(b.getCustomTimeOffsetDateTime())) + .map(BlobInfo::getBlobId) + .map(BlobId::toGsUtilUriWithGeneration) + .collect(Collectors.toList())) + .isEmpty(); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITHttpIdempotencyTokenTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITHttpIdempotencyTokenTest.java index 7030c5228..7eae1b2a8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITHttpIdempotencyTokenTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITHttpIdempotencyTokenTest.java @@ -33,6 +33,8 @@ import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.BucketListOption; +import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.it.runner.StorageITRunner; import com.google.cloud.storage.it.runner.annotations.Backend; @@ -42,6 +44,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.truth.IterableSubject; import java.nio.ByteBuffer; +import java.time.Clock; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -179,4 +184,34 @@ public void resumableUpload() throws Exception { // 4. Finalize session and put final 45B assertAll(() -> subject.hasSize(4), () -> assertThat(actualXxd).isEqualTo(expectedXxd)); } + + @Test + public void batch() throws Exception { + BlobInfo info1 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobInfo info2 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobInfo info3 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + storage.create(info1, BlobTargetOption.doesNotExist()); + storage.create(info2, BlobTargetOption.doesNotExist()); + storage.create(info3, BlobTargetOption.doesNotExist()); + + requestAuditing.clear(); + OffsetDateTime now = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC); + + StorageBatch batch = storage.batch(); + StorageBatchResult r1 = batch.get(info1.getBlobId()); + StorageBatchResult r2 = + batch.update(info2.toBuilder().setCustomTimeOffsetDateTime(now).build()); + StorageBatchResult r3 = batch.delete(info3.getBlobId()); + + batch.submit(); + assertAll( + () -> assertThat(r1).isNotNull(), + () -> assertThat(r2.get().getCustomTimeOffsetDateTime()).isEqualTo(now), + () -> assertThat(r3.get()).isTrue(), + () -> { + IterableSubject subject = + requestAuditing.assertRequestHeader(X_GOOG_GCS_IDEMPOTENCY_TOKEN); + subject.hasSize(3); + }); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/spi/v1/HttpRpcContextTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/spi/v1/HttpRpcContextTest.java index bb049137b..467e4e76e 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/spi/v1/HttpRpcContextTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/spi/v1/HttpRpcContextTest.java @@ -43,11 +43,15 @@ public void testNewInvocationId() { UUID uuid = UUID.fromString("28220dff-1e8b-4770-9e10-022c2a99d8f3"); HttpRpcContext testContext = new HttpRpcContext(() -> uuid); - assertThat(testContext.newInvocationId()).isEqualTo(uuid); - assertThat(testContext.getInvocationId()).isEqualTo(uuid); - // call again to ensure the id is consistent with our supplier - assertThat(testContext.newInvocationId()).isEqualTo(uuid); - assertThat(testContext.getInvocationId()).isEqualTo(uuid); + try { + assertThat(testContext.newInvocationId()).isEqualTo(uuid); + assertThat(testContext.getInvocationId()).isEqualTo(uuid); + // call again to ensure the id is consistent with our supplier + assertThat(testContext.newInvocationId()).isEqualTo(uuid); + assertThat(testContext.getInvocationId()).isEqualTo(uuid); + } finally { + testContext.clearInvocationId(); + } } @Test