From 3a04e5b458cd7e3bf10116b3dbab0b3254e7e10f Mon Sep 17 00:00:00 2001 From: Sydney Munro <sydmunro@google.com> Date: Wed, 21 Jun 2023 11:06:12 -0700 Subject: [PATCH 1/5] fix: Fix bug where the Storage Object is accessed before WriteChannel Closes --- .../PackagePrivateMethodWorkarounds.java | 1 + .../transfermanager/ParallelUploadConfig.java | 13 ++++++- .../transfermanager/UploadCallable.java | 8 +++-- .../storage/it/ITTransferManagerTest.java | 34 +++++++++++++++++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java index 6fa31d71c8..6d43c0a43b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java @@ -64,6 +64,7 @@ public static Function<WriteChannel, Optional<BlobInfo>> maybeGetBlobInfoFunctio return (w) -> { BlobWriteChannel blobWriteChannel; if (w instanceof BlobWriteChannel) { + blobWriteChannel = (BlobWriteChannel) w; return Optional.of(blobWriteChannel.getStorageObject()) .map(Conversions.apiary().blobInfo()::decode); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java index 88f0fe944c..237dd74257 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java @@ -47,7 +47,7 @@ private ParallelUploadConfig( this.prefix = prefix; this.bucketName = bucketName; this.targetOptsPerRequest = targetOptsPerRequest; - this.writeOptsPerRequest = writeOptsPerRequest; + this.writeOptsPerRequest = applySkipIfExists(skipIfExists, writeOptsPerRequest); } /** If a corresponding object already exists skip uploading the object */ @@ -115,6 +115,17 @@ public static Builder newBuilder() { return new Builder(); } + private static List<BlobWriteOption> applySkipIfExists( + boolean skipIfExists, List<BlobWriteOption> writeOptsPerRequest) { + if (skipIfExists) { + return writeOptsPerRequest.isEmpty() + ? ImmutableList.of(BlobWriteOption.generationMatch(0)) + : ImmutableList.copyOf( + BlobWriteOption.dedupe(writeOptsPerRequest, BlobWriteOption.generationMatch(0L))); + } + return writeOptsPerRequest; + } + @BetaApi public static final class Builder { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java index 7aca60342b..7d8a4e143d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java @@ -64,12 +64,14 @@ private UploadResult uploadWithoutChunking() { long bytesCopied = -1L; try { Optional<BlobInfo> newBlob; - try (FileChannel r = FileChannel.open(sourceFile, StandardOpenOption.READ); - WriteChannel w = storage.writer(originalBlob, opts)) { + WriteChannel w = storage.writer(originalBlob, opts); + try (FileChannel r = FileChannel.open(sourceFile, StandardOpenOption.READ)) { w.setChunkSize(transferManagerConfig.getPerWorkerBufferSize()); bytesCopied = ByteStreams.copy(r, w); - newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(w); + } finally { + w.close(); } + newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(w); return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) .setUploadedBlob(newBlob.get()) .build(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index bbebd10bd0..b72725c158 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -131,6 +131,11 @@ public void uploadFiles() throws Exception { UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); List<UploadResult> uploadResults = job.getUploadResults(); assertThat(uploadResults).hasSize(3); + assertThat( + uploadResults.stream() + .filter(result -> result.getStatus() == TransferStatus.SUCCESS) + .collect(Collectors.toList())) + .hasSize(3); } } @@ -153,6 +158,11 @@ public void uploadFilesWithOpts() throws Exception { UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); List<UploadResult> uploadResults = job.getUploadResults(); assertThat(uploadResults).hasSize(3); + assertThat( + uploadResults.stream() + .filter(result -> result.getStatus() == TransferStatus.SUCCESS) + .collect(Collectors.toList())) + .hasSize(3); } } @@ -181,6 +191,11 @@ public void uploadFilesOneFailure() throws Exception { .filter(x -> x.getStatus() == TransferStatus.FAILED_TO_START) .collect(Collectors.toList())) .hasSize(1); + assertThat( + uploadResults.stream() + .filter(result -> result.getStatus() == TransferStatus.SUCCESS) + .collect(Collectors.toList())) + .hasSize(3); } } @@ -217,6 +232,25 @@ public void uploadNonexistentFile() throws Exception { } } + @Test + public void uploadFailsSkipIfExists() throws Exception { + TransferManagerConfig config = + TransferManagerConfigTestingInstances.defaults(storage.getOptions()).toBuilder().build(); + String bucketName = bucket.getName(); + try (TransferManager transferManager = config.getService(); + TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) { + ParallelUploadConfig parallelUploadConfig = + ParallelUploadConfig.newBuilder().setBucketName(bucketName).setSkipIfExists(true).build(); + UploadJob jobInitUpload = + transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); + List<UploadResult> uploadResults = jobInitUpload.getUploadResults(); + assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS); + UploadJob failedSecondUpload = + transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); + List<UploadResult> failedResult = failedSecondUpload.getUploadResults(); + } + } + @Test public void downloadBlobs() throws Exception { TransferManagerConfig config = From 15b3d3f6d861231e3c631eeadf978e8175d2981a Mon Sep 17 00:00:00 2001 From: Sydney Munro <sydmunro@google.com> Date: Wed, 21 Jun 2023 16:11:03 -0700 Subject: [PATCH 2/5] Finish writing skipIfExists test --- .../com/google/cloud/storage/it/ITTransferManagerTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index b72725c158..e85fdf4b4c 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -248,6 +248,9 @@ public void uploadFailsSkipIfExists() throws Exception { UploadJob failedSecondUpload = transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); List<UploadResult> failedResult = failedSecondUpload.getUploadResults(); + assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); + assertThat(failedResult.get(0).getException()).isInstanceOf(StorageException.class); + assertThat(failedResult.get(0).getException().getMessage()).contains("Precondition Failed"); } } From fb66e68dd191569b7d5ea6f2732c04ee170c3258 Mon Sep 17 00:00:00 2001 From: Sydney Munro <sydmunro@google.com> Date: Wed, 21 Jun 2023 16:21:52 -0700 Subject: [PATCH 3/5] remove accidental commit --- .../google/cloud/storage/PackagePrivateMethodWorkarounds.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java index 6d43c0a43b..6fa31d71c8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/PackagePrivateMethodWorkarounds.java @@ -64,7 +64,6 @@ public static Function<WriteChannel, Optional<BlobInfo>> maybeGetBlobInfoFunctio return (w) -> { BlobWriteChannel blobWriteChannel; if (w instanceof BlobWriteChannel) { - blobWriteChannel = (BlobWriteChannel) w; return Optional.of(blobWriteChannel.getStorageObject()) .map(Conversions.apiary().blobInfo()::decode); From 76462df4649db6764043f1ca011e5d2d62a68ddd Mon Sep 17 00:00:00 2001 From: Sydney Munro <sydmunro@google.com> Date: Thu, 22 Jun 2023 11:04:20 -0700 Subject: [PATCH 4/5] Additional tests and applySkipIfExists simplification --- .../transfermanager/ParallelUploadConfig.java | 6 ++--- .../storage/it/ITTransferManagerTest.java | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java index 237dd74257..8c7289c803 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelUploadConfig.java @@ -118,10 +118,8 @@ public static Builder newBuilder() { private static List<BlobWriteOption> applySkipIfExists( boolean skipIfExists, List<BlobWriteOption> writeOptsPerRequest) { if (skipIfExists) { - return writeOptsPerRequest.isEmpty() - ? ImmutableList.of(BlobWriteOption.generationMatch(0)) - : ImmutableList.copyOf( - BlobWriteOption.dedupe(writeOptsPerRequest, BlobWriteOption.generationMatch(0L))); + return ImmutableList.copyOf( + BlobWriteOption.dedupe(writeOptsPerRequest, BlobWriteOption.doesNotExist())); } return writeOptsPerRequest; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index e85fdf4b4c..e9cf81718f 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -254,6 +254,33 @@ public void uploadFailsSkipIfExists() throws Exception { } } + @Test + public void uploadSkipIfExistsGenerationOverride() throws Exception { + TransferManagerConfig config = + TransferManagerConfigTestingInstances.defaults(storage.getOptions()).toBuilder().build(); + String bucketName = bucket.getName(); + try (TransferManager transferManager = config.getService(); + TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize)) { + ParallelUploadConfig parallelUploadConfig = + ParallelUploadConfig.newBuilder() + .setBucketName(bucketName) + .setSkipIfExists(true) + .setWriteOptsPerRequest(ImmutableList.of(BlobWriteOption.generationMatch(5L))) + .build(); + assertThat(parallelUploadConfig.getWriteOptsPerRequest()).hasSize(1); + UploadJob jobInitUpload = + transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); + List<UploadResult> uploadResults = jobInitUpload.getUploadResults(); + assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS); + UploadJob failedSecondUpload = + transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); + List<UploadResult> failedResult = failedSecondUpload.getUploadResults(); + assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); + assertThat(failedResult.get(0).getException()).isInstanceOf(StorageException.class); + assertThat(failedResult.get(0).getException().getMessage()).contains("Precondition Failed"); + } + } + @Test public void downloadBlobs() throws Exception { TransferManagerConfig config = From 976b9ba367d5eb5bcd4a2b50ef6f530b3a6dff94 Mon Sep 17 00:00:00 2001 From: Sydney Munro <sydmunro@google.com> Date: Thu, 22 Jun 2023 11:17:24 -0700 Subject: [PATCH 5/5] Use createFrom instead of managing the File/WriteChannels directly --- .../transfermanager/UploadCallable.java | 33 ++++++++----------- .../storage/it/ITTransferManagerTest.java | 14 +++----- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java index 7d8a4e143d..c2b2ff2169 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java @@ -16,16 +16,12 @@ package com.google.cloud.storage.transfermanager; -import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.PackagePrivateMethodWorkarounds; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; -import com.google.common.io.ByteStreams; -import java.nio.channels.FileChannel; +import com.google.cloud.storage.StorageException; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Optional; import java.util.concurrent.Callable; final class UploadCallable implements Callable<UploadResult> { @@ -61,26 +57,23 @@ public UploadResult call() throws Exception { } private UploadResult uploadWithoutChunking() { - long bytesCopied = -1L; try { - Optional<BlobInfo> newBlob; - WriteChannel w = storage.writer(originalBlob, opts); - try (FileChannel r = FileChannel.open(sourceFile, StandardOpenOption.READ)) { - w.setChunkSize(transferManagerConfig.getPerWorkerBufferSize()); - bytesCopied = ByteStreams.copy(r, w); - } finally { - w.close(); - } - newBlob = PackagePrivateMethodWorkarounds.maybeGetBlobInfoFunction().apply(w); + Blob from = storage.createFrom(originalBlob, sourceFile, opts); return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) - .setUploadedBlob(newBlob.get()) + .setUploadedBlob(from.asBlobInfo()) .build(); - } catch (Exception e) { - if (bytesCopied == -1) { - return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_START) + } catch (StorageException e) { + if (parallelUploadConfig.isSkipIfExists() && e.getCode() == 412) { + return UploadResult.newBuilder(originalBlob, TransferStatus.SKIPPED) + .setException(e) + .build(); + } else { + // TODO: check for FAILED_TO_START conditions + return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) .setException(e) .build(); } + } catch (Exception e) { return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) .setException(e) .build(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index e9cf81718f..7416109be2 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -188,7 +188,7 @@ public void uploadFilesOneFailure() throws Exception { assertThat(uploadResults).hasSize(4); assertThat( uploadResults.stream() - .filter(x -> x.getStatus() == TransferStatus.FAILED_TO_START) + .filter(x -> x.getStatus() == TransferStatus.FAILED_TO_FINISH) .collect(Collectors.toList())) .hasSize(1); assertThat( @@ -211,7 +211,7 @@ public void uploadNonexistentBucket() throws Exception { ParallelUploadConfig.newBuilder().setBucketName(bucketName).build(); UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); List<UploadResult> uploadResults = job.getUploadResults(); - assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_START); + assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); assertThat(uploadResults.get(0).getException()).isInstanceOf(StorageException.class); } } @@ -227,7 +227,7 @@ public void uploadNonexistentFile() throws Exception { ParallelUploadConfig.newBuilder().setBucketName(bucketName).build(); UploadJob job = transferManager.uploadFiles(files, parallelUploadConfig); List<UploadResult> uploadResults = job.getUploadResults(); - assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_START); + assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); assertThat(uploadResults.get(0).getException()).isInstanceOf(NoSuchFileException.class); } } @@ -248,9 +248,7 @@ public void uploadFailsSkipIfExists() throws Exception { UploadJob failedSecondUpload = transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); List<UploadResult> failedResult = failedSecondUpload.getUploadResults(); - assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); - assertThat(failedResult.get(0).getException()).isInstanceOf(StorageException.class); - assertThat(failedResult.get(0).getException().getMessage()).contains("Precondition Failed"); + assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.SKIPPED); } } @@ -275,9 +273,7 @@ public void uploadSkipIfExistsGenerationOverride() throws Exception { UploadJob failedSecondUpload = transferManager.uploadFiles(ImmutableList.of(tmpFile.getPath()), parallelUploadConfig); List<UploadResult> failedResult = failedSecondUpload.getUploadResults(); - assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.FAILED_TO_FINISH); - assertThat(failedResult.get(0).getException()).isInstanceOf(StorageException.class); - assertThat(failedResult.get(0).getException().getMessage()).contains("Precondition Failed"); + assertThat(failedResult.get(0).getStatus()).isEqualTo(TransferStatus.SKIPPED); } }