diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java index 16c6ec0aeb..984c7bfd8f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java @@ -20,11 +20,13 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.ObjectChecksums; @@ -33,11 +35,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; -import java.util.function.LongConsumer; import org.checkerframework.checker.nullness.qual.NonNull; final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel @@ -55,13 +53,14 @@ final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel private boolean open = true; private boolean first = true; private boolean finished = false; + private volatile WriteObjectRequest lastWrittenRequest; GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( SettableApiFuture resultFuture, ChunkSegmenter chunkSegmenter, ClientStreamingCallable write, - ResumableWrite requestFactory) { - String bucketName = requestFactory.bucketName(); + WriteCtx writeCtx) { + String bucketName = writeCtx.getRequestFactory().bucketName(); this.resultFuture = resultFuture; this.chunkSegmenter = chunkSegmenter; @@ -69,8 +68,8 @@ final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel contextWithBucketName(bucketName, GrpcCallContext.createDefault()); this.write = write.withDefaultCallContext(internalContext); - this.writeCtx = new WriteCtx<>(requestFactory); - this.responseObserver = new Observer(writeCtx.getConfirmedBytes()::set, resultFuture::set); + this.writeCtx = writeCtx; + this.responseObserver = new Observer(internalContext); } @Override @@ -92,27 +91,24 @@ public boolean isOpen() { @Override public void close() throws IOException { + if (!open) { + return; + } + open = false; ApiStreamObserver openedStream = openedStream(); - if (!finished) { - WriteObjectRequest message = finishMessage(); - try { + try { + if (!finished) { + WriteObjectRequest message = finishMessage(); + lastWrittenRequest = message; openedStream.onNext(message); - openedStream.onCompleted(); finished = true; - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; - } - } else { - try { - openedStream.onCompleted(); - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; } + openedStream.onCompleted(); + responseObserver.await(); + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; } - open = false; - responseObserver.await(); } private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) @@ -122,51 +118,54 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo } ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); - - List messages = new ArrayList<>(); + if (data.length == 0) { + return 0; + } ApiStreamObserver openedStream = openedStream(); int bytesConsumed = 0; - for (ChunkSegment datum : data) { - Crc32cLengthKnown crc32c = datum.getCrc32c(); - ByteString b = datum.getB(); - int contentSize = b.size(); - long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); - Crc32cLengthKnown cumulative = - writeCtx - .getCumulativeCrc32c() - .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); - ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); - if (crc32c != null) { - checksummedData.setCrc32C(crc32c.getValue()); - } - WriteObjectRequest.Builder builder = - writeCtx - .newRequestBuilder() - .setWriteOffset(offset) - .setChecksummedData(checksummedData.build()); - if (!datum.isOnlyFullBlocks()) { - builder.setFinishWrite(true); - if (cumulative != null) { - builder.setObjectChecksums( - ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + try { + for (int i = 0; i < data.length; i++) { + ChunkSegment datum = data[i]; + Crc32cLengthKnown crc32c = datum.getCrc32c(); + ByteString b = datum.getB(); + int contentSize = b.size(); + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + Crc32cLengthKnown cumulative = + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder(); + if (!first) { + builder.clearUploadId(); + builder.clearWriteObjectSpec(); + builder.clearObjectChecksums(); + } + builder.setWriteOffset(offset).setChecksummedData(checksummedData.build()); + if (!datum.isOnlyFullBlocks() || (finalize && i + 1 == data.length)) { + builder.setFinishWrite(true); + if (cumulative != null) { + builder.setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + } + finished = true; } - finished = true; - } - - WriteObjectRequest build = possiblyPairDownRequest(builder, first).build(); - first = false; - messages.add(build); - bytesConsumed += contentSize; - } - if (finalize && !finished) { - messages.add(finishMessage()); - finished = true; - } - try { - for (WriteObjectRequest message : messages) { - openedStream.onNext(message); + WriteObjectRequest build = builder.build(); + first = false; + lastWrittenRequest = build; + openedStream.onNext(build); + bytesConsumed += contentSize; + } + if (finalize && !finished) { + WriteObjectRequest finishMessage = finishMessage(); + lastWrittenRequest = finishMessage; + openedStream.onNext(finishMessage); + finished = true; } } catch (RuntimeException e) { resultFuture.setException(e); @@ -201,73 +200,104 @@ private ApiStreamObserver openedStream() { return stream; } - /** - * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, - * this utility method centralizes the logic necessary to clear those fields for use by subsequent - * messages. - */ - private static WriteObjectRequest.Builder possiblyPairDownRequest( - WriteObjectRequest.Builder b, boolean firstMessageOfStream) { - if (firstMessageOfStream && b.getWriteOffset() == 0) { - return b; - } - if (b.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b; - } - - static class Observer implements ApiStreamObserver { + class Observer implements ApiStreamObserver { - private final LongConsumer sizeCallback; - private final Consumer completeCallback; + private final GrpcCallContext context; private final SettableApiFuture invocationHandle; private volatile WriteObjectResponse last; - Observer(LongConsumer sizeCallback, Consumer completeCallback) { - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; + Observer(GrpcCallContext context) { + this.context = context; this.invocationHandle = SettableApiFuture.create(); } @Override public void onNext(WriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - sizeCallback.accept(value.getPersistedSize()); - } else if (value.hasResource()) { - sizeCallback.accept(value.getResource().getSize()); - } last = value; } - /** - * observed exceptions so far - * - *
    - *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} - *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} - *
  3. {@link io.grpc.StatusRuntimeException} - *
- */ @Override public void onError(Throwable t) { - invocationHandle.setException(t); + if (t instanceof ApiException) { + // use StorageExceptions logic to translate from ApiException to our status codes ensuring + // things fall in line with our retry handlers. + // This is suboptimal, as it will initialize a second exception, however this is the + // unusual case, and it should not cause a significant overhead given its rarity. + StorageException tmp = StorageException.asStorageException((ApiException) t); + StorageException storageException = + ResumableSessionFailureScenario.toStorageException( + tmp.getCode(), + tmp.getMessage(), + tmp.getReason(), + ImmutableList.of(lastWrittenRequest), + null, + context, + t); + resultFuture.setException(storageException); + invocationHandle.setException(storageException); + } else { + resultFuture.setException(t); + invocationHandle.setException(t); + } } @Override public void onCompleted() { - if (last != null && last.hasResource()) { - completeCallback.accept(last); + boolean finalizing = lastWrittenRequest.getFinishWrite(); + if (last == null) { + clientDetectedError( + ResumableSessionFailureScenario.toStorageException( + 0, + "onComplete without preceding onNext, unable to determine success.", + "invalid", + ImmutableList.of(lastWrittenRequest), + null, + context, + null)); + } else if (last.hasResource() /* && finalizing*/) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long finalSize = last.getResource().getSize(); + if (totalSentBytes == finalSize) { + ok(finalSize); + } else if (finalSize < totalSentBytes) { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( + ImmutableList.of(lastWrittenRequest), last, context, null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( + ImmutableList.of(lastWrittenRequest), last, context, null)); + } + } else if (!finalizing || last.hasPersistedSize()) { // unexpected incremental response + clientDetectedError( + ResumableSessionFailureScenario.toStorageException( + 0, + "Unexpected incremental response for finalizing request.", + "invalid", + ImmutableList.of(lastWrittenRequest), + last, + context, + null)); + } else { + clientDetectedError( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ImmutableList.of(lastWrittenRequest), last, context, null)); } + } + + private void ok(long persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + resultFuture.set(last); invocationHandle.set(null); } + private void clientDetectedError(StorageException storageException) { + open = false; + resultFuture.setException(storageException); + invocationHandle.setException(storageException); + } + void await() { try { invocationHandle.get(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 8854053322..2a933b0925 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -305,7 +305,7 @@ UnbufferedWritableByteChannelSession build() { Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, start); + result, getChunkSegmenter(), write, new WriteCtx<>(start)); } }) .andThen(StorageByteChannels.writable()::createSynchronized)); @@ -346,7 +346,7 @@ BufferedWritableByteChannelSession build() { Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, start); + result, getChunkSegmenter(), write, new WriteCtx<>(start)); } }) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java new file mode 100644 index 0000000000..8181bd2bc2 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest.java @@ -0,0 +1,329 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.ByteSizeConstants._512KiB; +import static com.google.cloud.storage.ByteSizeConstants._768KiB; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.storage.ITGapicUnbufferedWritableByteChannelTest.DirectWriteService; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.storage.v2.Object; +import com.google.storage.v2.StartResumableWriteRequest; +import com.google.storage.v2.StartResumableWriteResponse; +import com.google.storage.v2.StorageClient; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; + +public final class ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest { + + private static final ChunkSegmenter CHUNK_SEGMENTER = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + + @Test + public void incrementalResponseForFinalizingRequest() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(0); + + //noinspection resource + GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); + + StorageException se = assertThrows(StorageException.class, channel::close); + se.printStackTrace(System.out); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_512KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4

+ * + * Attempting to finalize an already finalized session + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * writeObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(0); + + GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); + + channel.close(); + + WriteObjectResponse writeObjectResponse = done.get(2, TimeUnit.SECONDS); + assertThat(writeObjectResponse).isEqualTo(resp1); + assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB); + } + } + + /** + * + * + *

S.4.1

+ * + * Attempting to finalize an already finalized session (ack < expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * writeObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4_1() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(0); + + //noinspection resource + GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); + + StorageException se = assertThrows(StorageException.class, channel::close); + se.printStackTrace(System.out); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4.2

+ * + * Attempting to finalize an already finalized session (ack > expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 786432}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * writeObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 786432 } })
+   *     
+ */ + @Test + public void scenario4_2() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_768KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(0); + + //noinspection resource + GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel channel = + new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + done, CHUNK_SEGMENTER, storageClient.writeObjectCallable(), writeCtx); + + StorageException se = assertThrows(StorageException.class, channel::close); + se.printStackTrace(System.out); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + private static @NonNull ResumableWrite getResumableWrite(String uploadId) { + StartResumableWriteRequest req = StartResumableWriteRequest.getDefaultInstance(); + StartResumableWriteResponse resp = + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); + return new ResumableWrite( + req, resp, id -> WriteObjectRequest.newBuilder().setUploadId(id).build()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java index e1eb11c981..5efbae9f60 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.function.Function; import java.util.stream.IntStream; @@ -315,4 +316,12 @@ public static Storage.BucketField[] filterOutHttpOnlyBucketFields(Storage.Bucket .collect(ImmutableSet.toImmutableSet()) .toArray(new Storage.BucketField[0]); } + + public static Optional last(List l) { + if (l.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(l.get(l.size() - 1)); + } + } }