Skip to content

Commit

Permalink
feat: First Pass Implementation of UploadMany (#1922)
Browse files Browse the repository at this point in the history
* feat: First Pass Implementation of UploadMany
  • Loading branch information
sydney-munro authored Mar 15, 2023
1 parent fa57e3a commit 01d7de9
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Google LLC
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,6 +70,17 @@ public static Function<WriteChannel, Optional<StorageObject>> maybeGetStorageObj
};
}

public static Function<WriteChannel, Optional<BlobInfo>> maybeGetBlobInfoFunction() {
return writeChannel -> {
Optional<StorageObject> so = maybeGetStorageObjectFunction().apply(writeChannel);
if (so.isPresent()) {
return Optional.of(Conversions.apiary().blobInfo().decode(so.get()));
} else {
return Optional.empty();
}
};
}

public static ApiFuture<BlobInfo> getBlobInfoFromReadChannelFunction(ReadChannel c) {
if (c instanceof StorageReadChannel) {
StorageReadChannel src = (StorageReadChannel) c;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

public class DownloadCallable {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.List;
Expand All @@ -30,17 +31,21 @@ public final class ParallelUploadConfig {
private final boolean skipIfExists;
@NonNull private final String prefix;
@NonNull private final String bucketName;
@NonNull private final List<BlobTargetOption> optionsPerRequest;
@NonNull private final List<BlobTargetOption> targetOptsPerRequest;

@NonNull private final List<BlobWriteOption> writeOptsPerRequest;

private ParallelUploadConfig(
boolean skipIfExists,
@NonNull String prefix,
@NonNull String bucketName,
@NonNull List<BlobTargetOption> optionsPerRequest) {
@NonNull List<BlobTargetOption> targetOptsPerRequest,
@NonNull List<BlobWriteOption> writeOptsPerRequest) {
this.skipIfExists = skipIfExists;
this.prefix = prefix;
this.bucketName = bucketName;
this.optionsPerRequest = optionsPerRequest;
this.targetOptsPerRequest = targetOptsPerRequest;
this.writeOptsPerRequest = writeOptsPerRequest;
}

public boolean isSkipIfExists() {
Expand All @@ -55,8 +60,12 @@ public boolean isSkipIfExists() {
return bucketName;
}

public @NonNull List<BlobTargetOption> getOptionsPerRequest() {
return optionsPerRequest;
public @NonNull List<BlobTargetOption> getTargetOptsPerRequest() {
return targetOptsPerRequest;
}

public @NonNull List<BlobWriteOption> getWriteOptsPerRequest() {
return writeOptsPerRequest;
}

@Override
Expand All @@ -71,12 +80,12 @@ public boolean equals(Object o) {
return skipIfExists == that.skipIfExists
&& prefix.equals(that.prefix)
&& bucketName.equals(that.bucketName)
&& optionsPerRequest.equals(that.optionsPerRequest);
&& targetOptsPerRequest.equals(that.targetOptsPerRequest);
}

@Override
public int hashCode() {
return Objects.hash(skipIfExists, prefix, bucketName, optionsPerRequest);
return Objects.hash(skipIfExists, prefix, bucketName, targetOptsPerRequest);
}

@Override
Expand All @@ -85,7 +94,7 @@ public String toString() {
.add("skipIfExists", skipIfExists)
.add("prefix", prefix)
.add("bucketName", bucketName)
.add("optionsPerRequest", optionsPerRequest)
.add("optionsPerRequest", targetOptsPerRequest)
.toString();
}

Expand All @@ -100,10 +109,13 @@ public static final class Builder {
private @NonNull String bucketName;
private @NonNull List<BlobTargetOption> optionsPerRequest;

private @NonNull List<BlobWriteOption> writeOptsPerRequest;

private Builder() {
this.prefix = "";
this.bucketName = "";
this.optionsPerRequest = ImmutableList.of();
this.writeOptsPerRequest = ImmutableList.of();
}

public Builder setSkipIfExists(boolean skipIfExists) {
Expand All @@ -126,11 +138,18 @@ public Builder setOptionsPerRequest(@NonNull List<BlobTargetOption> optionsPerRe
return this;
}

public Builder setWriteOptsPerRequest(@NonNull List<BlobWriteOption> writeOptsPerRequest) {
this.writeOptsPerRequest = writeOptsPerRequest;
return this;
}

public ParallelUploadConfig build() {
checkNotNull(prefix);
checkNotNull(bucketName);
checkNotNull(optionsPerRequest);
return new ParallelUploadConfig(skipIfExists, prefix, bucketName, optionsPerRequest);
checkNotNull(writeOptsPerRequest);
return new ParallelUploadConfig(
skipIfExists, prefix, bucketName, optionsPerRequest, writeOptsPerRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package com.google.cloud.storage.transfermanager;

import com.google.cloud.storage.BlobInfo;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import org.checkerframework.checker.nullness.qual.NonNull;

public interface TransferManager {

@NonNull
UploadJob uploadFiles(List<Path> files, ParallelUploadConfig opts);
UploadJob uploadFiles(List<Path> files, ParallelUploadConfig opts) throws IOException;

@NonNull
DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig opts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@

package com.google.cloud.storage.transfermanager;

import com.google.cloud.storage.StorageOptions;
import com.google.common.base.MoreObjects;
import java.util.Objects;

public class TransferManagerConfig {
public final class TransferManagerConfig {
private final int maxWorkers;
private final int perWorkerBufferSize;
private final boolean allowChunking;

// Getting stuff in for implementation bits
// getService to get Storage instance
// private final StorageOptions storageOptions;
private final StorageOptions storageOptions;

private TransferManagerConfig(int maxWorkers, int perWorkerBufferSize, boolean allowChunking) {
TransferManagerConfig(
int maxWorkers,
int perWorkerBufferSize,
boolean allowChunking,
StorageOptions storageOptions) {
this.maxWorkers = maxWorkers;
this.perWorkerBufferSize = perWorkerBufferSize;
this.allowChunking = allowChunking;
this.storageOptions = storageOptions;
}

public int getMaxWorkers() {
Expand All @@ -46,23 +50,28 @@ public boolean isAllowChunking() {
return allowChunking;
}

public StorageOptions getStorageOptions() {
return storageOptions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof TransferManagerConfig)) {
return false;
}
TransferManagerConfig that = (TransferManagerConfig) o;
return maxWorkers == that.maxWorkers
&& perWorkerBufferSize == that.perWorkerBufferSize
&& allowChunking == that.allowChunking;
&& allowChunking == that.allowChunking
&& Objects.equals(storageOptions, that.storageOptions);
}

@Override
public int hashCode() {
return Objects.hash(maxWorkers, perWorkerBufferSize, allowChunking);
return Objects.hash(maxWorkers, perWorkerBufferSize, allowChunking, storageOptions);
}

@Override
Expand All @@ -71,25 +80,29 @@ public String toString() {
.add("maxWorkers", maxWorkers)
.add("perWorkerBufferSize", perWorkerBufferSize)
.add("allowChunking", allowChunking)
.add("storageOptions", storageOptions)
.toString();
}

public static Builder newBuilder() {
return new Builder();
}

static class Builder {
public static class Builder {

private int maxWorkers;
private int perWorkerBufferSize;
private boolean allowChunking;

private StorageOptions storageOptions;

private Builder() {
// TODO: add default values
// bufferSize tbd?
this.perWorkerBufferSize = 16 * 1024 * 1024;
this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors();
this.allowChunking = false;
this.storageOptions = StorageOptions.getDefaultInstance();
}

public Builder setMaxWorkers(int maxWorkers) {
Expand All @@ -107,8 +120,13 @@ public Builder setAllowChunking(boolean allowChunking) {
return this;
}

public void setStorageOptions(StorageOptions storageOptions) {
this.storageOptions = storageOptions;
}

public TransferManagerConfig build() {
return new TransferManagerConfig(maxWorkers, perWorkerBufferSize, allowChunking);
return new TransferManagerConfig(
maxWorkers, perWorkerBufferSize, allowChunking, storageOptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,46 @@

package com.google.cloud.storage.transfermanager;

public class TransferManagerImpl {}
import com.google.cloud.storage.BlobInfo;
import com.google.common.collect.ImmutableList;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.checkerframework.checker.nullness.qual.NonNull;

public final class TransferManagerImpl implements TransferManager {

private final TransferManagerConfig transferManagerConfig;
private final ExecutorService executor;

public TransferManagerImpl(TransferManagerConfig transferManagerConfig) {
this.transferManagerConfig = transferManagerConfig;
this.executor = Executors.newFixedThreadPool(transferManagerConfig.getMaxWorkers());
}

@Override
public @NonNull UploadJob uploadFiles(List<Path> files, ParallelUploadConfig opts) {
List<Future<UploadResult>> uploadTasks = new ArrayList<>();
for (Path file : files) {
if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported");
String blobName = TransferManagerUtils.createBlobName(opts, file);
BlobInfo blobInfo = BlobInfo.newBuilder(opts.getBucketName(), blobName).build();
// TODO: Apply opts per request
UploadCallable callable = new UploadCallable(executor, transferManagerConfig, blobInfo, file);
uploadTasks.add(executor.submit(callable));
}
return UploadJob.newBuilder()
.setParallelUploadConfig(opts)
.setUploadResponses(ImmutableList.copyOf(uploadTasks))
.build();
}

@Override
public @NonNull DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig opts) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

import java.nio.file.Path;

public class TransferManagerUtils {

public static String createBlobName(ParallelUploadConfig config, Path file) {
if (config.getPrefix().isEmpty()) {
return file.toString();
} else {
return config.getPrefix().concat(file.toString());
}
}
}
Loading

0 comments on commit 01d7de9

Please sign in to comment.