diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java index 0996bfabc6..7a73d99b51 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadCallable.java @@ -16,4 +16,64 @@ package com.google.cloud.storage.transfermanager; -public class DownloadCallable {} +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.Callable; + +public class DownloadCallable implements Callable { + private final TransferManagerConfig transferManagerConfig; + private final BlobInfo originalBlob; + + private final ParallelDownloadConfig parallelDownloadConfig; + + public DownloadCallable( + TransferManagerConfig transferManagerConfig, + BlobInfo originalBlob, + ParallelDownloadConfig parallelDownloadConfig) { + this.transferManagerConfig = transferManagerConfig; + this.originalBlob = originalBlob; + this.parallelDownloadConfig = parallelDownloadConfig; + } + + @Override + public DownloadResult call() throws Exception { + // TODO: Check for chunking + return downloadWithoutChunking(); + } + + private DownloadResult downloadWithoutChunking() { + try (ReadChannel rc = + transferManagerConfig + .getStorageOptions() + .getService() + .reader( + originalBlob.getBlobId(), + parallelDownloadConfig + .getOptionsPerRequest() + .toArray(new Storage.BlobSourceOption[0]))) { + FileChannel destFile = + FileChannel.open(Paths.get(createDestPath()), StandardOpenOption.WRITE); + ByteStreams.copy(rc, destFile); + } catch (IOException e) { + throw new StorageException(e); + } + DownloadResult result = + DownloadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) + .setOutputDestination(Paths.get(createDestPath())) + .build(); + return result; + } + + private String createDestPath() { + return originalBlob + .getName() + .replaceFirst(parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix()); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java index 92b5b502c0..ca67eb8496 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/DownloadJob.java @@ -22,37 +22,25 @@ import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; +import java.util.concurrent.Future; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; public final class DownloadJob { - @NonNull private final List successResponses; - @NonNull private final List failedResponses; + @NonNull private final List> downloadResults; @NonNull private final ParallelDownloadConfig parallelDownloadConfig; - private final boolean anyFailed; private DownloadJob( - @NonNull List successResponses, - @NonNull List failedResponses, + @NonNull List> successResponses, @NonNull ParallelDownloadConfig parallelDownloadConfig) { - this.successResponses = successResponses; - this.failedResponses = failedResponses; - this.anyFailed = !failedResponses.isEmpty(); + this.downloadResults = successResponses; this.parallelDownloadConfig = parallelDownloadConfig; } - public List getSuccessResponses() { - return successResponses; - } - - public List getFailedResponses() { - return failedResponses; - } - - public boolean isAnyFailed() { - return anyFailed; + public List> getDownloadResults() { + return downloadResults; } public ParallelDownloadConfig getParallelDownloadConfig() { @@ -68,24 +56,20 @@ public boolean equals(Object o) { return false; } DownloadJob that = (DownloadJob) o; - return anyFailed == that.anyFailed - && successResponses.equals(that.successResponses) - && failedResponses.equals(that.failedResponses) + return downloadResults.equals(that.downloadResults) && parallelDownloadConfig.equals(that.parallelDownloadConfig); } @Override public int hashCode() { - return Objects.hash(successResponses, failedResponses, parallelDownloadConfig, anyFailed); + return Objects.hash(downloadResults, parallelDownloadConfig); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("successResponses", successResponses) - .add("failedResponses", failedResponses) + .add("successResponses", downloadResults) .add("parallelDownloadConfig", parallelDownloadConfig) - .add("anyFailed", anyFailed) .toString(); } @@ -95,22 +79,15 @@ public static Builder newBuilder() { public static final class Builder { - private @NonNull List successResponses; - private @NonNull List failedResponses; + private @NonNull List> downloadResults; private @MonotonicNonNull ParallelDownloadConfig parallelDownloadConfig; private Builder() { - this.successResponses = ImmutableList.of(); - this.failedResponses = ImmutableList.of(); - } - - public Builder setSuccessResponses(@NonNull List successResponses) { - this.successResponses = ImmutableList.copyOf(successResponses); - return this; + this.downloadResults = ImmutableList.of(); } - public Builder setFailedResponses(@NonNull List failedResponses) { - this.failedResponses = ImmutableList.copyOf(failedResponses); + public Builder setDownloadResults(@NonNull List> downloadResults) { + this.downloadResults = ImmutableList.copyOf(downloadResults); return this; } @@ -121,10 +98,9 @@ public Builder setParallelDownloadConfig( } public DownloadJob build() { - checkNotNull(successResponses); - checkNotNull(failedResponses); + checkNotNull(downloadResults); checkNotNull(parallelDownloadConfig); - return new DownloadJob(successResponses, failedResponses, parallelDownloadConfig); + return new DownloadJob(downloadResults, parallelDownloadConfig); } } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 849929dcbc..324d8c76eb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -45,7 +45,7 @@ public TransferManagerImpl(TransferManagerConfig transferManagerConfig) { String blobName = TransferManagerUtils.createBlobName(opts, file); BlobInfo blobInfo = BlobInfo.newBuilder(opts.getBucketName(), blobName).build(); // TODO: Apply opts per request - UploadCallable callable = new UploadCallable(executor, transferManagerConfig, blobInfo, file); + UploadCallable callable = new UploadCallable(transferManagerConfig, blobInfo, file); uploadTasks.add(executor.submit(callable)); } return UploadJob.newBuilder() @@ -56,6 +56,14 @@ public TransferManagerImpl(TransferManagerConfig transferManagerConfig) { @Override public @NonNull DownloadJob downloadBlobs(List blobs, ParallelDownloadConfig opts) { - return null; + List> downloadTasks = new ArrayList<>(); + for (BlobInfo blob : blobs) { + DownloadCallable callable = new DownloadCallable(transferManagerConfig, blob, opts); + downloadTasks.add(executor.submit(callable)); + } + return DownloadJob.newBuilder() + .setDownloadResults(downloadTasks) + .setParallelDownloadConfig(opts) + .build(); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java index 0a61a9bc02..a462c6ff5c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java @@ -29,10 +29,8 @@ import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; public class UploadCallable implements Callable { - private final ExecutorService executor; private final TransferManagerConfig transferManagerConfig; private final BlobInfo originalBlob; @@ -40,11 +38,7 @@ public class UploadCallable implements Callable { private final Path sourceFile; public UploadCallable( - ExecutorService executor, - TransferManagerConfig transferManagerConfig, - BlobInfo originalBlob, - Path sourceFile) { - this.executor = executor; + TransferManagerConfig transferManagerConfig, BlobInfo originalBlob, Path sourceFile) { this.transferManagerConfig = transferManagerConfig; this.originalBlob = originalBlob; this.sourceFile = sourceFile; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java index 65436c2863..033babc219 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java @@ -18,8 +18,12 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.Storage; import com.google.cloud.storage.TmpFile; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.runner.StorageITRunner; @@ -27,6 +31,8 @@ import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; import com.google.cloud.storage.it.runner.registry.Generator; +import com.google.cloud.storage.transfermanager.DownloadJob; +import com.google.cloud.storage.transfermanager.ParallelDownloadConfig; import com.google.cloud.storage.transfermanager.ParallelUploadConfig; import com.google.cloud.storage.transfermanager.TransferManager; import com.google.cloud.storage.transfermanager.TransferManagerConfig; @@ -34,7 +40,10 @@ import com.google.cloud.storage.transfermanager.UploadJob; import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.junit.Before; import org.junit.Rule; @@ -47,6 +56,7 @@ transports = {Transport.HTTP}, backends = {Backend.PROD}) public class ITTransferManagerTest { + @Inject public Storage storage; @Inject public BucketInfo bucket; @Inject public Generator generator; @@ -54,16 +64,35 @@ public class ITTransferManagerTest { private Path baseDir; private static final int objectContentSize = 64; + private List blobs = new ArrayList<>(); @Before public void setUp() throws Exception { baseDir = tmpDir.getRoot().toPath(); + BlobInfo blobInfo1 = + BlobInfo.newBuilder( + BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName()))) + .build(); + BlobInfo blobInfo2 = + BlobInfo.newBuilder( + BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName()))) + .build(); + BlobInfo blobInfo3 = + BlobInfo.newBuilder( + BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName()))) + .build(); + Collections.addAll(blobs, blobInfo1, blobInfo2, blobInfo3); + ByteBuffer content = DataGenerator.base64Characters().genByteBuffer(108); + for (BlobInfo blob : blobs) { + try (WriteChannel writeChannel = storage.writer(blob)) { + writeChannel.write(content); + } + } } @Test public void uploadFiles() throws IOException { - TransferManagerConfig config = - TransferManagerConfig.newBuilder().setAllowChunking(false).setMaxWorkers(1).build(); + TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build(); TransferManager transferManager = new TransferManagerImpl(config); try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); TmpFile tmpFile1 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize); @@ -79,5 +108,13 @@ public void uploadFiles() throws IOException { } @Test - public void downloadBlobs() {} + public void downloadBlobs() throws IOException { + TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build(); + TransferManager transferManager = new TransferManagerImpl(config); + String bucketName = bucket.getName(); + ParallelDownloadConfig parallelDownloadConfig = + ParallelDownloadConfig.newBuilder().setBucketName(bucketName).build(); + DownloadJob job = transferManager.downloadBlobs(blobs, parallelDownloadConfig); + assertThat(job.getDownloadResults()).hasSize(3); + } }