Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BlobWriteOption to support MD5 and CRC32C checks on create/write #271

Merged
merged 3 commits into from
Oct 20, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static com.google.gcloud.spi.StorageRpc.Option.IF_SOURCE_GENERATION_NOT_MATCH;
import static com.google.gcloud.spi.StorageRpc.Option.IF_SOURCE_METAGENERATION_MATCH;
import static com.google.gcloud.spi.StorageRpc.Option.IF_SOURCE_METAGENERATION_NOT_MATCH;
import static com.google.gcloud.spi.StorageRpc.Option.IF_MD5_MATCH;
import static com.google.gcloud.spi.StorageRpc.Option.IF_CRC32C_MATCH;
import static com.google.gcloud.spi.StorageRpc.Option.MAX_RESULTS;
import static com.google.gcloud.spi.StorageRpc.Option.PAGE_TOKEN;
import static com.google.gcloud.spi.StorageRpc.Option.PREDEFINED_ACL;
Expand Down Expand Up @@ -106,6 +108,15 @@ private static StorageException translate(GoogleJsonError exception) {
return new StorageException(exception.getCode(), exception.getMessage(), retryable);
}

private static void applyOptions(StorageObject storageObject, Map<Option, ?> options) {

This comment was marked as spam.

if (IF_MD5_MATCH.getBoolean(options) == null) {
storageObject.setMd5Hash(null);
}
if (IF_CRC32C_MATCH.getBoolean(options) == null) {
storageObject.setCrc32c(null);
}
}

@Override
public Bucket create(Bucket bucket, Map<Option, ?> options) throws StorageException {
try {
Expand All @@ -123,6 +134,7 @@ public Bucket create(Bucket bucket, Map<Option, ?> options) throws StorageExcept
@Override
public StorageObject create(StorageObject storageObject, final InputStream content,
Map<Option, ?> options) throws StorageException {
applyOptions(storageObject, options);
try {
Storage.Objects.Insert insert = storage.objects()
.insert(storageObject.getBucket(), storageObject,
Expand Down Expand Up @@ -491,6 +503,7 @@ public void write(String uploadId, byte[] toWrite, int toWriteOffset, StorageObj
@Override
public String open(StorageObject object, Map<Option, ?> options)
throws StorageException {
applyOptions(object, options);
try {
Insert req = storage.objects().insert(object.getBucket(), object);
GenericUrl url = req.buildHttpRequest().getUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ enum Option {
IF_SOURCE_METAGENERATION_NOT_MATCH("ifSourceMetagenerationNotMatch"),
IF_SOURCE_GENERATION_MATCH("ifSourceGenerationMatch"),
IF_SOURCE_GENERATION_NOT_MATCH("ifSourceGenerationNotMatch"),
IF_MD5_MATCH("md5Hash"),

This comment was marked as spam.

This comment was marked as spam.

IF_CRC32C_MATCH("crc32c"),
PREFIX("prefix"),
MAX_RESULTS("maxResults"),
PAGE_TOKEN("pageToken"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Lists;
import com.google.gcloud.spi.StorageRpc;
import com.google.gcloud.storage.Storage.BlobTargetOption;
import com.google.gcloud.storage.Storage.BlobWriteOption;
import com.google.gcloud.storage.Storage.CopyRequest;
import com.google.gcloud.storage.Storage.SignUrlOption;

Expand Down Expand Up @@ -274,7 +275,7 @@ public BlobReadChannel reader(BlobSourceOption... options) {
* @param options target blob options
* @throws StorageException upon failure
*/
public BlobWriteChannel writer(BlobTargetOption... options) {
public BlobWriteChannel writer(BlobWriteOption... options) {
return storage.writer(info, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,67 @@ public static BlobTargetOption metagenerationMatch() {
public static BlobTargetOption metagenerationNotMatch() {
return new BlobTargetOption(StorageRpc.Option.IF_METAGENERATION_NOT_MATCH);
}

static BlobWriteOption[] convert(BlobTargetOption[] options, BlobWriteOption... optionsToAdd) {
BlobWriteOption[] writeOptions = new BlobWriteOption[options.length + optionsToAdd.length];
int index = 0;
for (BlobTargetOption option : options) {
writeOptions[index++] = new BlobWriteOption(option);
}
for (BlobWriteOption option : optionsToAdd) {
writeOptions[index++] = option;
}
return writeOptions;
}
}

class BlobWriteOption extends Option {

This comment was marked as spam.


private static final long serialVersionUID = -3880421670966224580L;

BlobWriteOption(BlobTargetOption option) {
super(option.rpcOption(), option.value());
}

private BlobWriteOption(StorageRpc.Option rpcOption, Object value) {
super(rpcOption, value);
}

private BlobWriteOption(StorageRpc.Option rpcOption) {
this(rpcOption, null);
}

public static BlobWriteOption predefinedAcl(PredefinedAcl acl) {
return new BlobWriteOption(StorageRpc.Option.PREDEFINED_ACL, acl.entry());
}

public static BlobWriteOption doesNotExist() {
return new BlobWriteOption(StorageRpc.Option.IF_GENERATION_MATCH, 0L);
}

public static BlobWriteOption generationMatch() {
return new BlobWriteOption(StorageRpc.Option.IF_GENERATION_MATCH);
}

public static BlobWriteOption generationNotMatch() {
return new BlobWriteOption(StorageRpc.Option.IF_GENERATION_NOT_MATCH);
}

public static BlobWriteOption metagenerationMatch() {
return new BlobWriteOption(StorageRpc.Option.IF_METAGENERATION_MATCH);
}

public static BlobWriteOption metagenerationNotMatch() {
return new BlobWriteOption(StorageRpc.Option.IF_METAGENERATION_NOT_MATCH);
}

public static BlobWriteOption md5Match() {
return new BlobWriteOption(StorageRpc.Option.IF_MD5_MATCH, true);
}

public static BlobWriteOption crc32cMatch() {
return new BlobWriteOption(StorageRpc.Option.IF_CRC32C_MATCH, true);
}
}

class BlobSourceOption extends Option {
Expand Down Expand Up @@ -510,10 +571,12 @@ public static Builder builder() {

/**
* Create a new blob. Direct upload is used to upload {@code content}. For large content,
* {@link #writer} is recommended as it uses resumable upload.
* {@link #writer} is recommended as it uses resumable upload. MD5 and CRC32C hashes of
* {@code content} are computed and used for validating transferred data.
*
* @return a complete blob information.
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
BlobInfo create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options);

Expand All @@ -524,7 +587,7 @@ public static Builder builder() {
* @return a complete blob information.
* @throws StorageException upon failure
*/
BlobInfo create(BlobInfo blobInfo, InputStream content, BlobTargetOption... options);
BlobInfo create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);

This comment was marked as spam.


/**
* Return the requested bucket or {@code null} if not found.
Expand Down Expand Up @@ -683,7 +746,7 @@ public static Builder builder() {
*
* @throws StorageException upon failure
*/
BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options);
BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);

/**
* Generates a signed URL for a blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import com.google.gcloud.AuthCredentials.ServiceAccountAuthCredentials;
Expand Down Expand Up @@ -93,13 +94,14 @@ public RetryResult beforeEval(Exception exception) {
static final ExceptionHandler EXCEPTION_HANDLER = ExceptionHandler.builder()
.abortOn(RuntimeException.class).interceptor(EXCEPTION_HANDLER_INTERCEPTOR).build();
private static final byte[] EMPTY_BYTE_ARRAY = {};
private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==";
private static final String EMPTY_BYTE_ARRAY_CRC32C = "AAAAAA==";

private final StorageRpc storageRpc;

StorageImpl(StorageOptions options) {
super(options);
storageRpc = options.storageRpc();
// todo: configure timeouts - https://developers.google.com/api-client-library/java/google-api-java-client/errors
// todo: provide rewrite - https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
// todo: check if we need to expose https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert vs using bucket update/patch
}
Expand All @@ -123,18 +125,29 @@ public com.google.api.services.storage.model.Bucket call() {

@Override
public BlobInfo create(BlobInfo blobInfo, BlobTargetOption... options) {
return create(blobInfo, new ByteArrayInputStream(EMPTY_BYTE_ARRAY), options);
BlobInfo updatedInfo = blobInfo.toBuilder()
.md5(EMPTY_BYTE_ARRAY_MD5)
.crc32c(EMPTY_BYTE_ARRAY_CRC32C)
.build();
return create(updatedInfo, new ByteArrayInputStream(EMPTY_BYTE_ARRAY),
BlobTargetOption.convert(
options, BlobWriteOption.md5Match(), BlobWriteOption.crc32cMatch()));
}

@Override
public BlobInfo create(BlobInfo blobInfo, final byte[] content, BlobTargetOption... options) {
return create(blobInfo,
new ByteArrayInputStream(firstNonNull(content, EMPTY_BYTE_ARRAY)), options);
public BlobInfo create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options) {
content = firstNonNull(content, EMPTY_BYTE_ARRAY);
BlobInfo updatedInfo = blobInfo.toBuilder()
.md5(BaseEncoding.base64().encode(Hashing.md5().hashBytes(content).asBytes()))
.crc32c(BaseEncoding.base64().encode(
Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

.build();
return create(updatedInfo, new ByteArrayInputStream(content), BlobTargetOption.convert(
options, BlobWriteOption.md5Match(), BlobWriteOption.crc32cMatch()));
}

@Override
public BlobInfo create(BlobInfo blobInfo, final InputStream content,
BlobTargetOption... options) {
public BlobInfo create(BlobInfo blobInfo, final InputStream content, BlobWriteOption... options) {

This comment was marked as spam.

final StorageObject blobPb = blobInfo.toPb();
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
try {
Expand Down Expand Up @@ -544,7 +557,7 @@ public BlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
}

@Override
public BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannelImpl(options(), blobInfo, optionsMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,22 @@ public void testCreateBlobFail() {
assertTrue(storage.delete(bucket, blobName));
}

@Test
public void testCreateBlobMd5Fail() throws UnsupportedEncodingException {
String blobName = "test-create-blob-md5-fail";
BlobInfo blob = BlobInfo.builder(bucket, blobName)
.contentType(CONTENT_TYPE)
.md5("O1R4G1HJSDUISJjoIYmVhQ==")
.build();
ByteArrayInputStream stream = new ByteArrayInputStream(BLOB_STRING_CONTENT.getBytes(UTF_8));
try {
storage.create(blob, stream, Storage.BlobWriteOption.md5Match());
fail("StorageException was expected");
} catch (StorageException ex) {
// expected
}
}

@Test
public void testUpdateBlob() {
String blobName = "test-update-blob";
Expand Down Expand Up @@ -449,7 +465,7 @@ public void testWriteChannelFail() throws UnsupportedEncodingException, IOExcept
BlobInfo blob = BlobInfo.builder(bucket, blobName).generation(-1L).build();
try {
try (BlobWriteChannel writer =
storage.writer(blob, Storage.BlobTargetOption.generationMatch())) {
storage.writer(blob, Storage.BlobWriteOption.generationMatch())) {
writer.write(ByteBuffer.allocate(42));
}
fail("StorageException was expected");
Expand All @@ -458,6 +474,20 @@ public void testWriteChannelFail() throws UnsupportedEncodingException, IOExcept
}
}

@Test
public void testWriteChannelExistingBlob() throws UnsupportedEncodingException, IOException {
String blobName = "test-write-channel-existing-blob";
BlobInfo blob = BlobInfo.builder(bucket, blobName).build();
BlobInfo remoteBlob = storage.create(blob);
byte[] stringBytes;
try (BlobWriteChannel writer = storage.writer(remoteBlob)) {
stringBytes = BLOB_STRING_CONTENT.getBytes(UTF_8);
writer.write(ByteBuffer.wrap(stringBytes));
}
assertArrayEquals(stringBytes, storage.readAllBytes(blob.blobId()));
assertTrue(storage.delete(bucket, blobName));
}

@Test
public void testGetSignedUrl() throws IOException {
String blobName = "test-get-signed-url-blob";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class StorageImplTest {
private static final String BLOB_NAME2 = "n2";
private static final String BLOB_NAME3 = "n3";
private static final byte[] BLOB_CONTENT = {0xD, 0xE, 0xA, 0xD};
private static final String CONTENT_MD5 = "O1R4G1HJSDUISJjoIYmVhQ==";
private static final String CONTENT_CRC32C = "9N3EPQ==";
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;

// BucketInfo objects
Expand Down Expand Up @@ -121,6 +123,27 @@ public class StorageImplTest {
StorageRpc.Option.IF_GENERATION_MATCH, BLOB_INFO1.generation(),
StorageRpc.Option.IF_METAGENERATION_MATCH, BLOB_INFO1.metageneration());

// Blob write options (create, writer)
private static final Storage.BlobWriteOption BLOB_WRITE_METAGENERATION =
Storage.BlobWriteOption.metagenerationMatch();
private static final Storage.BlobWriteOption BLOB_WRITE_NOT_EXIST =
Storage.BlobWriteOption.doesNotExist();
private static final Storage.BlobWriteOption BLOB_WRITE_PREDEFINED_ACL =
Storage.BlobWriteOption.predefinedAcl(Storage.PredefinedAcl.PRIVATE);
private static final Storage.BlobWriteOption BLOB_WRITE_MD5_HASH =
Storage.BlobWriteOption.md5Match();
private static final Storage.BlobWriteOption BLOB_WRITE_CRC2C =
Storage.BlobWriteOption.crc32cMatch();
private static final Map<StorageRpc.Option, ?> BLOB_WRITE_OPTIONS_SIMPLE = ImmutableMap.of(
StorageRpc.Option.IF_MD5_MATCH, true,
StorageRpc.Option.IF_CRC32C_MATCH, true);
private static final Map<StorageRpc.Option, ?> BLOB_WRITE_OPTIONS_COMPLEX = ImmutableMap.of(
StorageRpc.Option.IF_METAGENERATION_MATCH, BLOB_INFO1.metageneration(),
StorageRpc.Option.IF_GENERATION_MATCH, 0L,
StorageRpc.Option.PREDEFINED_ACL, BUCKET_TARGET_PREDEFINED_ACL.value(),
StorageRpc.Option.IF_MD5_MATCH, true,
StorageRpc.Option.IF_CRC32C_MATCH, true);

// Bucket source options
private static final Storage.BucketSourceOption BUCKET_SOURCE_METAGENERATION =
Storage.BucketSourceOption.metagenerationMatch(BUCKET_INFO1.metageneration());
Expand Down Expand Up @@ -250,10 +273,10 @@ public void testCreateBlob() throws IOException {
EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock);
EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries());
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
EasyMock
.expect(
storageRpcMock.create(EasyMock.eq(BLOB_INFO1.toPb()), EasyMock.capture(capturedStream),
EasyMock.eq(EMPTY_RPC_OPTIONS)))
EasyMock.expect(storageRpcMock.create(
EasyMock.eq(BLOB_INFO1.toBuilder().md5(CONTENT_MD5).crc32c(CONTENT_CRC32C).build().toPb()),
EasyMock.capture(capturedStream),
EasyMock.eq(BLOB_WRITE_OPTIONS_SIMPLE)))
.andReturn(BLOB_INFO1.toPb());
EasyMock.replay(optionsMock, storageRpcMock);
storage = StorageFactory.instance().get(optionsMock);
Expand All @@ -271,10 +294,14 @@ public void testCreateEmptyBlob() throws IOException {
EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock);
EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries());
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
EasyMock
.expect(
storageRpcMock.create(EasyMock.eq(BLOB_INFO1.toPb()), EasyMock.capture(capturedStream),
EasyMock.eq(EMPTY_RPC_OPTIONS)))
EasyMock.expect(storageRpcMock.create(
EasyMock.eq(BLOB_INFO1.toBuilder()
.md5("1B2M2Y8AsgTpgAmY7PhCfg==")
.crc32c("AAAAAA==")
.build()
.toPb()),
EasyMock.capture(capturedStream),
EasyMock.eq(BLOB_WRITE_OPTIONS_SIMPLE)))
.andReturn(BLOB_INFO1.toPb());
EasyMock.replay(optionsMock, storageRpcMock);
storage = StorageFactory.instance().get(optionsMock);
Expand All @@ -290,9 +317,14 @@ public void testCreateBlobWithOptions() throws IOException {
EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock);
EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries());
Capture<ByteArrayInputStream> capturedStream = Capture.newInstance();
EasyMock.expect(
storageRpcMock.create(EasyMock.eq(BLOB_INFO1.toPb()), EasyMock.capture(capturedStream),
EasyMock.eq(BLOB_TARGET_OPTIONS_CREATE)))
EasyMock.expect(storageRpcMock.create(
EasyMock.eq(BLOB_INFO1.toBuilder()
.md5(CONTENT_MD5)
.crc32c(CONTENT_CRC32C)
.build()
.toPb()),
EasyMock.capture(capturedStream),
EasyMock.eq(BLOB_WRITE_OPTIONS_COMPLEX)))
.andReturn(BLOB_INFO1.toPb());
EasyMock.replay(optionsMock, storageRpcMock);
storage = StorageFactory.instance().get(optionsMock);
Expand Down Expand Up @@ -787,12 +819,12 @@ public void testWriter() {
@Test
public void testWriterWithOptions() {
EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2);
EasyMock.expect(storageRpcMock.open(BLOB_INFO1.toPb(), BLOB_TARGET_OPTIONS_CREATE))
EasyMock.expect(storageRpcMock.open(BLOB_INFO1.toPb(), BLOB_WRITE_OPTIONS_COMPLEX))
.andReturn("upload-id");
EasyMock.replay(optionsMock, storageRpcMock);
storage = StorageFactory.instance().get(optionsMock);
BlobWriteChannel channel = storage.writer(BLOB_INFO1, BLOB_TARGET_METAGENERATION,
BLOB_TARGET_NOT_EXIST, BLOB_TARGET_PREDEFINED_ACL);
BlobWriteChannel channel = storage.writer(BLOB_INFO1, BLOB_WRITE_METAGENERATION,
BLOB_WRITE_NOT_EXIST, BLOB_WRITE_PREDEFINED_ACL, BLOB_WRITE_CRC2C, BLOB_WRITE_MD5_HASH);
assertNotNull(channel);
assertTrue(channel.isOpen());
}
Expand Down