Skip to content

Commit

Permalink
Merge pull request #294 from mziccard/add-support-for-rewrite
Browse files Browse the repository at this point in the history
Add support for blob rewrite
  • Loading branch information
aozarov committed Oct 29, 2015
2 parents ff9f60c + 23215a1 commit d9c6b85
Show file tree
Hide file tree
Showing 11 changed files with 763 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import com.google.gcloud.storage.BlobReadChannel;
import com.google.gcloud.storage.CopyWriter;
import com.google.gcloud.storage.BlobWriteChannel;
import com.google.gcloud.storage.Bucket;
import com.google.gcloud.storage.BucketInfo;
Expand Down Expand Up @@ -366,8 +367,8 @@ public String params() {
private static class CopyAction extends StorageAction<CopyRequest> {
@Override
public void run(Storage storage, CopyRequest request) {
BlobInfo copiedBlobInfo = storage.copy(request);
System.out.println("Copied " + copiedBlobInfo);
CopyWriter copyWriter = storage.copy(request);
System.out.println("Copied " + copyWriter.result());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class DefaultStorageRpc implements StorageRpc {

// see: https://cloud.google.com/storage/docs/concepts-techniques#practices
private static final Set<Integer> RETRYABLE_CODES = ImmutableSet.of(504, 503, 502, 500, 429, 408);
private static final long MEGABYTE = 1024L * 1024L;

public DefaultStorageRpc(StorageOptions options) {
HttpTransport transport = options.httpTransportFactory().create();
Expand Down Expand Up @@ -320,30 +321,6 @@ public StorageObject compose(Iterable<StorageObject> sources, StorageObject targ
}
}

@Override
public StorageObject copy(StorageObject source, Map<Option, ?> sourceOptions,
StorageObject target, Map<Option, ?> targetOptions) throws StorageException {
try {
return storage
.objects()
.copy(source.getBucket(), source.getName(), target.getBucket(), target.getName(),
target.getContentType() != null ? target : null)
.setProjection(DEFAULT_PROJECTION)
.setIfSourceMetagenerationMatch(IF_SOURCE_METAGENERATION_MATCH.getLong(sourceOptions))
.setIfSourceMetagenerationNotMatch(
IF_SOURCE_METAGENERATION_NOT_MATCH.getLong(sourceOptions))
.setIfSourceGenerationMatch(IF_SOURCE_GENERATION_MATCH.getLong(sourceOptions))
.setIfSourceGenerationNotMatch(IF_SOURCE_GENERATION_NOT_MATCH.getLong(sourceOptions))
.setIfMetagenerationMatch(IF_METAGENERATION_MATCH.getLong(targetOptions))
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(targetOptions))
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(targetOptions))
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(targetOptions))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public byte[] load(StorageObject from, Map<Option, ?> options)
throws StorageException {
Expand Down Expand Up @@ -521,4 +498,46 @@ public String open(StorageObject object, Map<Option, ?> options)
throw translate(ex);
}
}

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) throws StorageException {
return rewrite(rewriteRequest, null);
}

@Override
public RewriteResponse continueRewrite(RewriteResponse previousResponse) throws StorageException {
return rewrite(previousResponse.rewriteRequest, previousResponse.rewriteToken);
}

private RewriteResponse rewrite(RewriteRequest req, String token) throws StorageException {
try {
Long maxBytesRewrittenPerCall = req.megabytesRewrittenPerCall != null
? req.megabytesRewrittenPerCall * MEGABYTE : null;
com.google.api.services.storage.model.RewriteResponse rewriteReponse = storage.objects()
.rewrite(req.source.getBucket(), req.source.getName(), req.target.getBucket(),
req.target.getName(), req.target.getContentType() != null ? req.target : null)
.setRewriteToken(token)
.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall)
.setProjection(DEFAULT_PROJECTION)
.setIfSourceMetagenerationMatch(IF_SOURCE_METAGENERATION_MATCH.getLong(req.sourceOptions))
.setIfSourceMetagenerationNotMatch(
IF_SOURCE_METAGENERATION_NOT_MATCH.getLong(req.sourceOptions))
.setIfSourceGenerationMatch(IF_SOURCE_GENERATION_MATCH.getLong(req.sourceOptions))
.setIfSourceGenerationNotMatch(IF_SOURCE_GENERATION_NOT_MATCH.getLong(req.sourceOptions))
.setIfMetagenerationMatch(IF_METAGENERATION_MATCH.getLong(req.targetOptions))
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(req.targetOptions))
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(req.targetOptions))
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(req.targetOptions))
.execute();
return new RewriteResponse(
req,
rewriteReponse.getResource(),
rewriteReponse.getObjectSize().longValue(),
rewriteReponse.getDone(),
rewriteReponse.getRewriteToken(),
rewriteReponse.getTotalBytesRewritten().longValue());
} catch (IOException ex) {
throw translate(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public interface StorageRpc {

Expand Down Expand Up @@ -132,6 +133,89 @@ public BatchResponse(Map<StorageObject, Tuple<Boolean, StorageException>> delete
}
}

class RewriteRequest {

public final StorageObject source;
public final Map<StorageRpc.Option, ?> sourceOptions;
public final StorageObject target;
public final Map<StorageRpc.Option, ?> targetOptions;
public final Long megabytesRewrittenPerCall;

public RewriteRequest(StorageObject source, Map<StorageRpc.Option, ?> sourceOptions,
StorageObject target, Map<StorageRpc.Option, ?> targetOptions,
Long megabytesRewrittenPerCall) {
this.source = source;
this.sourceOptions = sourceOptions;
this.target = target;
this.targetOptions = targetOptions;
this.megabytesRewrittenPerCall = megabytesRewrittenPerCall;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof RewriteRequest)) {
return false;
}
final RewriteRequest other = (RewriteRequest) obj;
return Objects.equals(this.source, other.source)
&& Objects.equals(this.sourceOptions, other.sourceOptions)
&& Objects.equals(this.target, other.target)
&& Objects.equals(this.targetOptions, other.targetOptions)
&& Objects.equals(this.megabytesRewrittenPerCall, other.megabytesRewrittenPerCall);
}

@Override
public int hashCode() {
return Objects.hash(source, sourceOptions, target, targetOptions, megabytesRewrittenPerCall);
}
}

class RewriteResponse {

public final RewriteRequest rewriteRequest;
public final StorageObject result;
public final long blobSize;
public final boolean isDone;
public final String rewriteToken;
public final long totalBytesRewritten;

public RewriteResponse(RewriteRequest rewriteRequest, StorageObject result, long blobSize,
boolean isDone, String rewriteToken, long totalBytesRewritten) {
this.rewriteRequest = rewriteRequest;
this.result = result;
this.blobSize = blobSize;
this.isDone = isDone;
this.rewriteToken = rewriteToken;
this.totalBytesRewritten = totalBytesRewritten;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof RewriteResponse)) {
return false;
}
final RewriteResponse other = (RewriteResponse) obj;
return Objects.equals(this.rewriteRequest, other.rewriteRequest)
&& Objects.equals(this.result, other.result)
&& Objects.equals(this.rewriteToken, other.rewriteToken)
&& this.blobSize == other.blobSize
&& Objects.equals(this.isDone, other.isDone)
&& this.totalBytesRewritten == other.totalBytesRewritten;
}

@Override
public int hashCode() {
return Objects.hash(rewriteRequest, result, blobSize, isDone, rewriteToken,
totalBytesRewritten);
}
}

Bucket create(Bucket bucket, Map<Option, ?> options) throws StorageException;

StorageObject create(StorageObject object, InputStream content, Map<Option, ?> options)
Expand Down Expand Up @@ -161,9 +245,6 @@ StorageObject patch(StorageObject storageObject, Map<Option, ?> options)
StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
Map<Option, ?> targetOptions) throws StorageException;

StorageObject copy(StorageObject source, Map<Option, ?> sourceOptions,
StorageObject target, Map<Option, ?> targetOptions) throws StorageException;

byte[] load(StorageObject storageObject, Map<Option, ?> options)
throws StorageException;

Expand All @@ -174,4 +255,8 @@ byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes

void write(String uploadId, byte[] toWrite, int toWriteOffset, StorageObject dest,
long destOffset, int length, boolean last) throws StorageException;

RewriteResponse openRewrite(RewriteRequest rewriteRequest) throws StorageException;

RewriteResponse continueRewrite(RewriteResponse previousResponse) throws StorageException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,20 @@ public Blob update(BlobInfo blobInfo, BlobTargetOption... options) {
}

/**
* Copies this blob to the specified target. Possibly copying also some of the metadata
* (e.g. content-type).
* Sends a copy request for the current blob to the target blob. Possibly also some of the
* metadata are copied (e.g. content-type).
*
* @param targetBlob target blob's id
* @param options source blob options
* @return the copied blob
* @return a {@link CopyWriter} object that can be used to get information on the newly created
* blob or to complete the copy if more than one RPC request is needed
* @throws StorageException upon failure
*/
public Blob copyTo(BlobId targetBlob, BlobSourceOption... options) {
BlobInfo updatedInfo = info.toBuilder().blobId(targetBlob).build();
public CopyWriter copyTo(BlobId targetBlob, BlobSourceOption... options) {
BlobInfo updatedInfo = BlobInfo.builder(targetBlob).build();
CopyRequest copyRequest = CopyRequest.builder().source(info.bucket(), info.name())
.sourceOptions(convert(info, options)).target(updatedInfo).build();
return new Blob(storage, storage.copy(copyRequest));
return storage.copy(copyRequest);
}

/**
Expand All @@ -240,33 +241,35 @@ public boolean delete(BlobSourceOption... options) {
}

/**
* Copies this blob to the target bucket, preserving its name. Possibly copying also some of the
* metadata (e.g. content-type).
* Sends a copy request for the current blob to the target bucket, preserving its name. Possibly
* copying also some of the metadata (e.g. content-type).
*
* @param targetBucket target bucket's name
* @param options source blob options
* @return the copied blob
* @return a {@link CopyWriter} object that can be used to get information on the newly created
* blob or to complete the copy if more than one RPC request is needed
* @throws StorageException upon failure
*/
public Blob copyTo(String targetBucket, BlobSourceOption... options) {
public CopyWriter copyTo(String targetBucket, BlobSourceOption... options) {
return copyTo(targetBucket, info.name(), options);
}

/**
* Copies this blob to the target bucket with a new name. Possibly copying also some of the
* metadata (e.g. content-type).
* Sends a copy request for the current blob to the target blob. Possibly also some of the
* metadata are copied (e.g. content-type).
*
* @param targetBucket target bucket's name
* @param targetBlob target blob's name
* @param options source blob options
* @return the copied blob
* @return a {@link CopyWriter} object that can be used to get information on the newly created
* blob or to complete the copy if more than one RPC request is needed
* @throws StorageException upon failure
*/
public Blob copyTo(String targetBucket, String targetBlob, BlobSourceOption... options) {
BlobInfo updatedInfo = info.toBuilder().blobId(BlobId.of(targetBucket, targetBlob)).build();
public CopyWriter copyTo(String targetBucket, String targetBlob, BlobSourceOption... options) {
BlobInfo updatedInfo = BlobInfo.builder(targetBucket, targetBlob).build();
CopyRequest copyRequest = CopyRequest.builder().source(info.bucket(), info.name())
.sourceOptions(convert(info, options)).target(updatedInfo).build();
return new Blob(storage, storage.copy(copyRequest));
return storage.copy(copyRequest);
}

/**
Expand Down
Loading

0 comments on commit d9c6b85

Please sign in to comment.