Skip to content

Commit

Permalink
feat: First pass downloadMany (#1945)
Browse files Browse the repository at this point in the history
* feat: First pass downloadMany

* removing redundant setAllowChunking false flag
sydney-munro authored Mar 23, 2023
1 parent 01d7de9 commit fedbd9c
Showing 5 changed files with 127 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -16,4 +16,64 @@

package com.google.cloud.storage.transfermanager;

public class DownloadCallable {}
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;

public class DownloadCallable implements Callable<DownloadResult> {
private final TransferManagerConfig transferManagerConfig;
private final BlobInfo originalBlob;

private final ParallelDownloadConfig parallelDownloadConfig;

public DownloadCallable(
TransferManagerConfig transferManagerConfig,
BlobInfo originalBlob,
ParallelDownloadConfig parallelDownloadConfig) {
this.transferManagerConfig = transferManagerConfig;
this.originalBlob = originalBlob;
this.parallelDownloadConfig = parallelDownloadConfig;
}

@Override
public DownloadResult call() throws Exception {
// TODO: Check for chunking
return downloadWithoutChunking();
}

private DownloadResult downloadWithoutChunking() {
try (ReadChannel rc =
transferManagerConfig
.getStorageOptions()
.getService()
.reader(
originalBlob.getBlobId(),
parallelDownloadConfig
.getOptionsPerRequest()
.toArray(new Storage.BlobSourceOption[0]))) {
FileChannel destFile =
FileChannel.open(Paths.get(createDestPath()), StandardOpenOption.WRITE);
ByteStreams.copy(rc, destFile);
} catch (IOException e) {
throw new StorageException(e);
}
DownloadResult result =
DownloadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setOutputDestination(Paths.get(createDestPath()))
.build();
return result;
}

private String createDestPath() {
return originalBlob
.getName()
.replaceFirst(parallelDownloadConfig.getStripPrefix(), parallelDownloadConfig.getPrefix());
}
}
Original file line number Diff line number Diff line change
@@ -22,37 +22,25 @@
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

public final class DownloadJob {

@NonNull private final List<DownloadResult> successResponses;
@NonNull private final List<DownloadResult> failedResponses;
@NonNull private final List<Future<DownloadResult>> downloadResults;

@NonNull private final ParallelDownloadConfig parallelDownloadConfig;
private final boolean anyFailed;

private DownloadJob(
@NonNull List<DownloadResult> successResponses,
@NonNull List<DownloadResult> failedResponses,
@NonNull List<Future<DownloadResult>> successResponses,
@NonNull ParallelDownloadConfig parallelDownloadConfig) {
this.successResponses = successResponses;
this.failedResponses = failedResponses;
this.anyFailed = !failedResponses.isEmpty();
this.downloadResults = successResponses;
this.parallelDownloadConfig = parallelDownloadConfig;
}

public List<DownloadResult> getSuccessResponses() {
return successResponses;
}

public List<DownloadResult> getFailedResponses() {
return failedResponses;
}

public boolean isAnyFailed() {
return anyFailed;
public List<Future<DownloadResult>> getDownloadResults() {
return downloadResults;
}

public ParallelDownloadConfig getParallelDownloadConfig() {
@@ -68,24 +56,20 @@ public boolean equals(Object o) {
return false;
}
DownloadJob that = (DownloadJob) o;
return anyFailed == that.anyFailed
&& successResponses.equals(that.successResponses)
&& failedResponses.equals(that.failedResponses)
return downloadResults.equals(that.downloadResults)
&& parallelDownloadConfig.equals(that.parallelDownloadConfig);
}

@Override
public int hashCode() {
return Objects.hash(successResponses, failedResponses, parallelDownloadConfig, anyFailed);
return Objects.hash(downloadResults, parallelDownloadConfig);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("successResponses", successResponses)
.add("failedResponses", failedResponses)
.add("successResponses", downloadResults)
.add("parallelDownloadConfig", parallelDownloadConfig)
.add("anyFailed", anyFailed)
.toString();
}

@@ -95,22 +79,15 @@ public static Builder newBuilder() {

public static final class Builder {

private @NonNull List<DownloadResult> successResponses;
private @NonNull List<DownloadResult> failedResponses;
private @NonNull List<Future<DownloadResult>> downloadResults;
private @MonotonicNonNull ParallelDownloadConfig parallelDownloadConfig;

private Builder() {
this.successResponses = ImmutableList.of();
this.failedResponses = ImmutableList.of();
}

public Builder setSuccessResponses(@NonNull List<DownloadResult> successResponses) {
this.successResponses = ImmutableList.copyOf(successResponses);
return this;
this.downloadResults = ImmutableList.of();
}

public Builder setFailedResponses(@NonNull List<DownloadResult> failedResponses) {
this.failedResponses = ImmutableList.copyOf(failedResponses);
public Builder setDownloadResults(@NonNull List<Future<DownloadResult>> downloadResults) {
this.downloadResults = ImmutableList.copyOf(downloadResults);
return this;
}

@@ -121,10 +98,9 @@ public Builder setParallelDownloadConfig(
}

public DownloadJob build() {
checkNotNull(successResponses);
checkNotNull(failedResponses);
checkNotNull(downloadResults);
checkNotNull(parallelDownloadConfig);
return new DownloadJob(successResponses, failedResponses, parallelDownloadConfig);
return new DownloadJob(downloadResults, parallelDownloadConfig);
}
}
}
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ public TransferManagerImpl(TransferManagerConfig transferManagerConfig) {
String blobName = TransferManagerUtils.createBlobName(opts, file);
BlobInfo blobInfo = BlobInfo.newBuilder(opts.getBucketName(), blobName).build();
// TODO: Apply opts per request
UploadCallable callable = new UploadCallable(executor, transferManagerConfig, blobInfo, file);
UploadCallable callable = new UploadCallable(transferManagerConfig, blobInfo, file);
uploadTasks.add(executor.submit(callable));
}
return UploadJob.newBuilder()
@@ -56,6 +56,14 @@ public TransferManagerImpl(TransferManagerConfig transferManagerConfig) {

@Override
public @NonNull DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig opts) {
return null;
List<Future<DownloadResult>> downloadTasks = new ArrayList<>();
for (BlobInfo blob : blobs) {
DownloadCallable callable = new DownloadCallable(transferManagerConfig, blob, opts);
downloadTasks.add(executor.submit(callable));
}
return DownloadJob.newBuilder()
.setDownloadResults(downloadTasks)
.setParallelDownloadConfig(opts)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -29,22 +29,16 @@
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;

public class UploadCallable implements Callable<UploadResult> {
private final ExecutorService executor;
private final TransferManagerConfig transferManagerConfig;

private final BlobInfo originalBlob;

private final Path sourceFile;

public UploadCallable(
ExecutorService executor,
TransferManagerConfig transferManagerConfig,
BlobInfo originalBlob,
Path sourceFile) {
this.executor = executor;
TransferManagerConfig transferManagerConfig, BlobInfo originalBlob, Path sourceFile) {
this.transferManagerConfig = transferManagerConfig;
this.originalBlob = originalBlob;
this.sourceFile = sourceFile;
Original file line number Diff line number Diff line change
@@ -18,23 +18,32 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.TmpFile;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.CrossRun;
import com.google.cloud.storage.it.runner.annotations.Inject;
import com.google.cloud.storage.it.runner.registry.Generator;
import com.google.cloud.storage.transfermanager.DownloadJob;
import com.google.cloud.storage.transfermanager.ParallelDownloadConfig;
import com.google.cloud.storage.transfermanager.ParallelUploadConfig;
import com.google.cloud.storage.transfermanager.TransferManager;
import com.google.cloud.storage.transfermanager.TransferManagerConfig;
import com.google.cloud.storage.transfermanager.TransferManagerImpl;
import com.google.cloud.storage.transfermanager.UploadJob;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
@@ -47,23 +56,43 @@
transports = {Transport.HTTP},
backends = {Backend.PROD})
public class ITTransferManagerTest {
@Inject public Storage storage;
@Inject public BucketInfo bucket;
@Inject public Generator generator;

@Rule public final TemporaryFolder tmpDir = new TemporaryFolder();

private Path baseDir;
private static final int objectContentSize = 64;
private List<BlobInfo> blobs = new ArrayList<>();

@Before
public void setUp() throws Exception {
baseDir = tmpDir.getRoot().toPath();
BlobInfo blobInfo1 =
BlobInfo.newBuilder(
BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName())))
.build();
BlobInfo blobInfo2 =
BlobInfo.newBuilder(
BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName())))
.build();
BlobInfo blobInfo3 =
BlobInfo.newBuilder(
BlobId.of(bucket.getName(), String.format("%s/src", generator.randomObjectName())))
.build();
Collections.addAll(blobs, blobInfo1, blobInfo2, blobInfo3);
ByteBuffer content = DataGenerator.base64Characters().genByteBuffer(108);
for (BlobInfo blob : blobs) {
try (WriteChannel writeChannel = storage.writer(blob)) {
writeChannel.write(content);
}
}
}

@Test
public void uploadFiles() throws IOException {
TransferManagerConfig config =
TransferManagerConfig.newBuilder().setAllowChunking(false).setMaxWorkers(1).build();
TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build();
TransferManager transferManager = new TransferManagerImpl(config);
try (TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize);
TmpFile tmpFile1 = DataGenerator.base64Characters().tempFile(baseDir, objectContentSize);
@@ -79,5 +108,13 @@ public void uploadFiles() throws IOException {
}

@Test
public void downloadBlobs() {}
public void downloadBlobs() throws IOException {
TransferManagerConfig config = TransferManagerConfig.newBuilder().setMaxWorkers(1).build();
TransferManager transferManager = new TransferManagerImpl(config);
String bucketName = bucket.getName();
ParallelDownloadConfig parallelDownloadConfig =
ParallelDownloadConfig.newBuilder().setBucketName(bucketName).build();
DownloadJob job = transferManager.downloadBlobs(blobs, parallelDownloadConfig);
assertThat(job.getDownloadResults()).hasSize(3);
}
}

0 comments on commit fedbd9c

Please sign in to comment.