Skip to content

Commit

Permalink
fix: update Grpc Write implementation to allow specifying expected md5 (
Browse files Browse the repository at this point in the history
#1815)

Remove Hasher.Constant. StartResumableWriteRequest has been updated to allow specifying `object_checksums` when creating the session.

Add several new positive and negative integration test for md5 verification
  • Loading branch information
BenWhitehead authored Dec 16, 2022
1 parent 2eec791 commit 4662572
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 76 deletions.
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@
<!-- see https://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>

<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/storage/Hasher$ConstantConcatValueHasher</className>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.util.function.Function;

final class GapicUploadSessionBuilder {

Expand All @@ -49,8 +50,16 @@ ApiFuture<ResumableWrite> resumableWrite(
if (writeObjectRequest.hasCommonObjectRequestParams()) {
b.setCommonObjectRequestParams(writeObjectRequest.getCommonObjectRequestParams());
}
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
return ApiFutures.transform(
x.futureCall(req), (resp) -> new ResumableWrite(req, resp), MoreExecutors.directExecutor());
x.futureCall(req),
(resp) -> new ResumableWrite(req, resp, f),
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import com.google.storage.v2.LockBucketRetentionPolicyRequest;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectAccessControl;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.ProjectName;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.RewriteObjectRequest;
Expand Down Expand Up @@ -221,6 +222,7 @@ public Blob create(
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
Expand All @@ -231,7 +233,7 @@ public Blob create(
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(Hasher.enabled())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
Expand Down Expand Up @@ -273,10 +275,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

Hasher hasher = Hasher.enabled();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
GapicWritableByteChannelSessionBuilder channelSessionBuilder =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -346,10 +345,7 @@ public Blob createFrom(

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);

Hasher hasher = Hasher.enabled();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -736,10 +732,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable(),
getOptions(),
Expand Down Expand Up @@ -1789,4 +1782,17 @@ private Object updateObject(UpdateObjectRequest req) {
() -> storageClient.updateObjectCallable().call(req, grpcCallContext),
Decoder.identity());
}

private static Hasher getHasherForRequest(WriteObjectRequest req, Hasher defaultHasher) {
if (!req.hasObjectChecksums()) {
return defaultHasher;
} else {
ObjectChecksums checksums = req.getObjectChecksums();
if (!checksums.hasCrc32C() && checksums.getMd5Hash().isEmpty()) {
return defaultHasher;
} else {
return Hasher.noop();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,6 @@ static Hasher enabled() {
return GuavaHasher.INSTANCE;
}

/**
* Create a Hasher which will always yield the specified value when {@link
* #nullSafeConcat(Crc32cLengthKnown, Crc32cLengthKnown)} is invoked.
*/
// Not perfect, and not a great approach for a public API. However, this is the most pragmatic way
// right now to wire an externally defined value all the way down to the last write message of a
// resumable upload session.
static Hasher constant(int crc32c) {
return new ConstantConcatValueHasher(Crc32cValue.of(crc32c, -1));
}

@Immutable
class NoOpHasher implements Hasher {
private static final NoOpHasher INSTANCE = new NoOpHasher();
Expand Down Expand Up @@ -112,26 +101,4 @@ public Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown
}
}
}

@Immutable
class ConstantConcatValueHasher implements Hasher {
private final Crc32cLengthKnown value;

private ConstantConcatValueHasher(Crc32cLengthKnown value) {
this.value = value;
}

@Override
public @Nullable Crc32cLengthKnown hash(ByteBuffer b) {
return null;
}

@Override
public void validate(Crc32cValue<?> expected, Supplier<ByteBuffer> b) {}

@Override
public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ final class ResumableWrite implements WriteObjectRequestBuilderFactory {

private final WriteObjectRequest writeRequest;

public ResumableWrite(StartResumableWriteRequest req, StartResumableWriteResponse res) {
public ResumableWrite(
StartResumableWriteRequest req,
StartResumableWriteResponse res,
Function<String, WriteObjectRequest> f) {
this.req = req;
this.res = res;
WriteObjectRequest.Builder b = WriteObjectRequest.newBuilder().setUploadId(res.getUploadId());
if (req.hasCommonObjectRequestParams()) {
b.setCommonObjectRequestParams(req.getCommonObjectRequestParams());
}
this.writeRequest = b.build();
this.writeRequest = f.apply(res.getUploadId());
}

public StartResumableWriteRequest getReq() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ public static BlobWriteOption metagenerationNotMatch() {
* @deprecated Please compute and use a crc32c checksum instead. {@link #crc32cMatch()}
*/
@Deprecated
@TransportCompatibility(Transport.HTTP)
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public static BlobWriteOption md5Match() {
return new BlobWriteOption(UnifiedOpts.md5MatchExtractor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,15 @@ public boolean equals(Object o) {
return Objects.equals(val, md5Match.val);
}

@Override
public Mapper<WriteObjectRequest.Builder> writeObject() {
return b -> {
b.getObjectChecksumsBuilder()
.setMd5Hash(ByteString.copyFrom(BaseEncoding.base64().decode(val)));
return b;
};
}

@Override
public int hashCode() {
return Objects.hash(val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ private static GrpcCallContext contextWithBucketName(String bucketName) {
return ret;
}

/**
* Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
* this utility method centralizes the logic necessary to clear those fields for use by subsequent
* messages.
*/
private static WriteObjectRequest possiblyPairDownRequest(
WriteObjectRequest message, boolean firstMessageOfStream) {
if (firstMessageOfStream && message.getWriteOffset() == 0) {
return message;
}

WriteObjectRequest.Builder b = message.toBuilder();
if (!firstMessageOfStream) {
b.clearUploadId();
}

if (message.getWriteOffset() > 0) {
b.clearWriteObjectSpec().clearObjectChecksums();
}
return b.build();
}

@FunctionalInterface
interface FlusherFactory {
/**
Expand Down Expand Up @@ -144,9 +166,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {

boolean first = true;
for (WriteObjectRequest message : segments) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);

write.onNext(message);
first = false;
Expand Down Expand Up @@ -188,9 +208,7 @@ private FsyncOnClose(
public void flush(@NonNull List<WriteObjectRequest> segments) {
ensureOpen();
for (WriteObjectRequest message : segments) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);

stream.onNext(message);
first = false;
Expand All @@ -201,9 +219,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {
public void close(@Nullable WriteObjectRequest message) {
ensureOpen();
if (message != null) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);
stream.onNext(message);
}
stream.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public final class GapicUnbufferedWritableByteChannelTest {
WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build();

private static final WriteObjectRequestBuilderFactory reqFactory =
new ResumableWrite(startReq, startResp);
new ResumableWrite(startReq, startResp, TestUtils.onlyUploadId());

@Test
public void directUpload() throws IOException, InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.protobuf.ByteString;
import com.google.rpc.DebugInfo;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.WriteObjectRequest;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
Expand All @@ -49,6 +50,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class TestUtils {
Expand Down Expand Up @@ -179,4 +181,13 @@ public boolean shouldRetry(Throwable previousThrowable, Object previousResponse)
}
}
}

/**
* Return a function which when provided an {@code uploadId} will create a {@link
* WriteObjectRequest} with that {@code uploadId}
*/
@NonNull
public static Function<String, WriteObjectRequest> onlyUploadId() {
return uId -> WriteObjectRequest.newBuilder().setUploadId(uId).build();
}
}
Loading

0 comments on commit 4662572

Please sign in to comment.