Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update Grpc Write implementation to allow specifying expected md5 #1815

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@
<!-- see https://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>

<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/storage/Hasher$ConstantConcatValueHasher</className>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.util.function.Function;

final class GapicUploadSessionBuilder {

Expand All @@ -49,8 +50,16 @@ ApiFuture<ResumableWrite> resumableWrite(
if (writeObjectRequest.hasCommonObjectRequestParams()) {
b.setCommonObjectRequestParams(writeObjectRequest.getCommonObjectRequestParams());
}
if (writeObjectRequest.hasObjectChecksums()) {
b.setObjectChecksums(writeObjectRequest.getObjectChecksums());
}
StartResumableWriteRequest req = b.build();
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
return ApiFutures.transform(
x.futureCall(req), (resp) -> new ResumableWrite(req, resp), MoreExecutors.directExecutor());
x.futureCall(req),
(resp) -> new ResumableWrite(req, resp, f),
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import com.google.storage.v2.LockBucketRetentionPolicyRequest;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectAccessControl;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.ProjectName;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.RewriteObjectRequest;
Expand Down Expand Up @@ -221,6 +222,7 @@ public Blob create(
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
Expand All @@ -231,7 +233,7 @@ public Blob create(
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setByteStringStrategy(ByteStringStrategy.noCopy())
.setHasher(Hasher.enabled())
.setHasher(hasher)
.direct()
.unbuffered()
.setRequest(req)
Expand Down Expand Up @@ -273,10 +275,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

Hasher hasher = Hasher.enabled();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
GapicWritableByteChannelSessionBuilder channelSessionBuilder =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -346,10 +345,7 @@ public Blob createFrom(

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);

Hasher hasher = Hasher.enabled();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
Expand Down Expand Up @@ -736,10 +732,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
if (req.hasObjectChecksums() && req.getObjectChecksums().hasCrc32C()) {
hasher = Hasher.constant(req.getObjectChecksums().getCrc32C());
}
Hasher hasher = getHasherForRequest(req, Hasher.enabled());
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable(),
getOptions(),
Expand Down Expand Up @@ -1789,4 +1782,17 @@ private Object updateObject(UpdateObjectRequest req) {
() -> storageClient.updateObjectCallable().call(req, grpcCallContext),
Decoder.identity());
}

private static Hasher getHasherForRequest(WriteObjectRequest req, Hasher defaultHasher) {
if (!req.hasObjectChecksums()) {
return defaultHasher;
} else {
ObjectChecksums checksums = req.getObjectChecksums();
if (!checksums.hasCrc32C() && checksums.getMd5Hash().isEmpty()) {
return defaultHasher;
} else {
return Hasher.noop();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,6 @@ static Hasher enabled() {
return GuavaHasher.INSTANCE;
}

/**
* Create a Hasher which will always yield the specified value when {@link
* #nullSafeConcat(Crc32cLengthKnown, Crc32cLengthKnown)} is invoked.
*/
// Not perfect, and not a great approach for a public API. However, this is the most pragmatic way
// right now to wire an externally defined value all the way down to the last write message of a
// resumable upload session.
static Hasher constant(int crc32c) {
return new ConstantConcatValueHasher(Crc32cValue.of(crc32c, -1));
}

@Immutable
class NoOpHasher implements Hasher {
private static final NoOpHasher INSTANCE = new NoOpHasher();
Expand Down Expand Up @@ -112,26 +101,4 @@ public Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown
}
}
}

@Immutable
class ConstantConcatValueHasher implements Hasher {
private final Crc32cLengthKnown value;

private ConstantConcatValueHasher(Crc32cLengthKnown value) {
this.value = value;
}

@Override
public @Nullable Crc32cLengthKnown hash(ByteBuffer b) {
return null;
}

@Override
public void validate(Crc32cValue<?> expected, Supplier<ByteBuffer> b) {}

@Override
public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ final class ResumableWrite implements WriteObjectRequestBuilderFactory {

private final WriteObjectRequest writeRequest;

public ResumableWrite(StartResumableWriteRequest req, StartResumableWriteResponse res) {
public ResumableWrite(
StartResumableWriteRequest req,
StartResumableWriteResponse res,
Function<String, WriteObjectRequest> f) {
this.req = req;
this.res = res;
WriteObjectRequest.Builder b = WriteObjectRequest.newBuilder().setUploadId(res.getUploadId());
if (req.hasCommonObjectRequestParams()) {
b.setCommonObjectRequestParams(req.getCommonObjectRequestParams());
}
this.writeRequest = b.build();
this.writeRequest = f.apply(res.getUploadId());
}

public StartResumableWriteRequest getReq() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ public static BlobWriteOption metagenerationNotMatch() {
* @deprecated Please compute and use a crc32c checksum instead. {@link #crc32cMatch()}
*/
@Deprecated
@TransportCompatibility(Transport.HTTP)
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public static BlobWriteOption md5Match() {
return new BlobWriteOption(UnifiedOpts.md5MatchExtractor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,15 @@ public boolean equals(Object o) {
return Objects.equals(val, md5Match.val);
}

@Override
public Mapper<WriteObjectRequest.Builder> writeObject() {
return b -> {
b.getObjectChecksumsBuilder()
.setMd5Hash(ByteString.copyFrom(BaseEncoding.base64().decode(val)));
return b;
};
}

@Override
public int hashCode() {
return Objects.hash(val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ private static GrpcCallContext contextWithBucketName(String bucketName) {
return ret;
}

/**
* Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
* this utility method centralizes the logic necessary to clear those fields for use by subsequent
* messages.
*/
private static WriteObjectRequest possiblyPairDownRequest(
WriteObjectRequest message, boolean firstMessageOfStream) {
if (firstMessageOfStream && message.getWriteOffset() == 0) {
return message;
}

WriteObjectRequest.Builder b = message.toBuilder();
if (!firstMessageOfStream) {
b.clearUploadId();
}

if (message.getWriteOffset() > 0) {
b.clearWriteObjectSpec().clearObjectChecksums();
}
return b.build();
}

@FunctionalInterface
interface FlusherFactory {
/**
Expand Down Expand Up @@ -144,9 +166,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {

boolean first = true;
for (WriteObjectRequest message : segments) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);

write.onNext(message);
first = false;
Expand Down Expand Up @@ -188,9 +208,7 @@ private FsyncOnClose(
public void flush(@NonNull List<WriteObjectRequest> segments) {
ensureOpen();
for (WriteObjectRequest message : segments) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);

stream.onNext(message);
first = false;
Expand All @@ -201,9 +219,7 @@ public void flush(@NonNull List<WriteObjectRequest> segments) {
public void close(@Nullable WriteObjectRequest message) {
ensureOpen();
if (message != null) {
if (!first) {
message = message.toBuilder().clearUploadId().clearWriteObjectSpec().build();
}
message = possiblyPairDownRequest(message, first);
stream.onNext(message);
}
stream.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public final class GapicUnbufferedWritableByteChannelTest {
WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build();

private static final WriteObjectRequestBuilderFactory reqFactory =
new ResumableWrite(startReq, startResp);
new ResumableWrite(startReq, startResp, TestUtils.onlyUploadId());

@Test
public void directUpload() throws IOException, InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.protobuf.ByteString;
import com.google.rpc.DebugInfo;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.WriteObjectRequest;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
Expand All @@ -49,6 +50,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class TestUtils {
Expand Down Expand Up @@ -179,4 +181,13 @@ public boolean shouldRetry(Throwable previousThrowable, Object previousResponse)
}
}
}

/**
* Return a function which when provided an {@code uploadId} will create a {@link
* WriteObjectRequest} with that {@code uploadId}
*/
@NonNull
public static Function<String, WriteObjectRequest> onlyUploadId() {
return uId -> WriteObjectRequest.newBuilder().setUploadId(uId).build();
}
}
Loading