Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement GrpcStorageImpl ObjectAccessControl operations #1818

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ public URL signUrl(long duration, TimeUnit unit, SignUrlOption... options) {
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public Acl getAcl(Entity entity) {
return storage.getAcl(getBlobId(), entity);
}
Expand All @@ -1085,7 +1085,7 @@ public Acl getAcl(Entity entity) {
* @return {@code true} if the ACL was deleted, {@code false} if it was not found
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public boolean deleteAcl(Entity entity) {
return storage.deleteAcl(getBlobId(), entity);
}
Expand All @@ -1101,7 +1101,7 @@ public boolean deleteAcl(Entity entity) {
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public Acl createAcl(Acl acl) {
return storage.createAcl(getBlobId(), acl);
}
Expand All @@ -1117,7 +1117,7 @@ public Acl createAcl(Acl acl) {
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public Acl updateAcl(Acl acl) {
return storage.updateAcl(getBlobId(), acl);
}
Expand All @@ -1136,7 +1136,7 @@ public Acl updateAcl(Acl acl) {
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
public List<Acl> listAcls() {
return storage.listAcls(getBlobId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ public boolean deleteAcl(String bucket, Entity entity, BucketSourceOption... opt
}
long metageneration = resp.getMetageneration();

UpdateBucketRequest req = createUpdateAclRequest(bucket, newAcls, metageneration);
UpdateBucketRequest req = createUpdateBucketAclRequest(bucket, newAcls, metageneration);

com.google.storage.v2.Bucket updateResult = updateBucket(req);
// read the response to ensure there is no longer an acl for the specified entity
Expand Down Expand Up @@ -954,7 +954,7 @@ public Acl updateAcl(String bucket, Acl acl, BucketSourceOption... options) {
.collect(ImmutableList.toImmutableList());

UpdateBucketRequest req =
createUpdateAclRequest(bucket, newDefaultAcls, resp.getMetageneration());
createUpdateBucketAclRequest(bucket, newDefaultAcls, resp.getMetageneration());

com.google.storage.v2.Bucket updateResult = updateBucket(req);

Expand Down Expand Up @@ -1109,27 +1109,117 @@ public List<Acl> listDefaultAcls(String bucket) {

@Override
public Acl getAcl(BlobId blob, Entity entity) {
return throwNotYetImplemented(fmtMethodName("getAcl", BlobId.class, Entity.class));
try {
Object req = codecs.blobId().encode(blob);
Object resp = getObjectWithAcls(req);

Predicate<ObjectAccessControl> entityPredicate =
objectAclEntityOrAltEq(codecs.entity().encode(entity));

Optional<ObjectAccessControl> first =
resp.getAclList().stream().filter(entityPredicate).findFirst();

// HttpStorageRpc defaults to null if Not Found
return first.map(codecs.objectAcl()::decode).orElse(null);
} catch (NotFoundException e) {
return null;
} catch (StorageException se) {
if (se.getCode() == 404) {
return null;
} else {
throw se;
}
}
}

@Override
public boolean deleteAcl(BlobId blob, Entity entity) {
return throwNotYetImplemented(fmtMethodName("deleteAcl", BlobId.class, Entity.class));
try {
Object obj = codecs.blobId().encode(blob);
Object resp = getObjectWithAcls(obj);
String encode = codecs.entity().encode(entity);

Predicate<ObjectAccessControl> entityPredicate = objectAclEntityOrAltEq(encode);

List<ObjectAccessControl> currentDefaultAcls = resp.getAclList();
ImmutableList<ObjectAccessControl> newDefaultAcls =
currentDefaultAcls.stream()
.filter(entityPredicate.negate())
.collect(ImmutableList.toImmutableList());
if (newDefaultAcls.equals(currentDefaultAcls)) {
// we didn't actually filter anything out, no need to send an RPC, simply return false
return false;
}
long metageneration = resp.getMetageneration();

UpdateObjectRequest req = createUpdateObjectAclRequest(obj, newDefaultAcls, metageneration);

Object updateResult = updateObject(req);
// read the response to ensure there is no longer an acl for the specified entity
Optional<ObjectAccessControl> first =
updateResult.getAclList().stream().filter(entityPredicate).findFirst();
return !first.isPresent();
} catch (NotFoundException e) {
// HttpStorageRpc returns false if the bucket doesn't exist :(
return false;
} catch (StorageException se) {
if (se.getCode() == 404) {
return false;
} else {
throw se;
}
}
}

@Override
public Acl createAcl(BlobId blob, Acl acl) {
return throwNotYetImplemented(fmtMethodName("createAcl", BlobId.class, Acl.class));
return updateAcl(blob, acl);
}

@Override
public Acl updateAcl(BlobId blob, Acl acl) {
return throwNotYetImplemented(fmtMethodName("updateAcl", BlobId.class, Acl.class));
try {
Object obj = codecs.blobId().encode(blob);
Object resp = getObjectWithAcls(obj);
ObjectAccessControl encode = codecs.objectAcl().encode(acl);
String entity = encode.getEntity();

Predicate<ObjectAccessControl> entityPredicate = objectAclEntityOrAltEq(entity);

ImmutableList<ObjectAccessControl> newDefaultAcls =
Streams.concat(
resp.getAclList().stream().filter(entityPredicate.negate()), Stream.of(encode))
.collect(ImmutableList.toImmutableList());

UpdateObjectRequest req =
createUpdateObjectAclRequest(obj, newDefaultAcls, resp.getMetageneration());

Object updateResult = updateObject(req);

Optional<Acl> first =
updateResult.getAclList().stream()
.filter(entityPredicate)
.findFirst()
.map(codecs.objectAcl()::decode);

return first.orElseThrow(
() -> new StorageException(0, "Acl update call success, but not in response"));
} catch (NotFoundException e) {
throw StorageException.coalesce(e);
}
}

@Override
public List<Acl> listAcls(BlobId blob) {
return throwNotYetImplemented(fmtMethodName("listAcls", BlobId.class));
try {
Object req = codecs.blobId().encode(blob);
Object resp = getObjectWithAcls(req);
return resp.getAclList().stream()
.map(codecs.objectAcl()::decode)
.collect(ImmutableList.toImmutableList());
} catch (NotFoundException e) {
throw StorageException.coalesce(e);
}
}

@Override
Expand Down Expand Up @@ -1637,7 +1727,7 @@ private static UpdateBucketRequest createUpdateDefaultAclRequest(
.build();
}

private static UpdateBucketRequest createUpdateAclRequest(
private static UpdateBucketRequest createUpdateBucketAclRequest(
String bucket, ImmutableList<BucketAccessControl> newDefaultAcls, long metageneration) {
com.google.storage.v2.Bucket update =
com.google.storage.v2.Bucket.newBuilder()
Expand All @@ -1653,4 +1743,50 @@ private static UpdateBucketRequest createUpdateAclRequest(
.setBucket(update)
.build();
}

private Object getObjectWithAcls(Object obj) {
Fields fields =
UnifiedOpts.fields(ImmutableSet.of(BucketField.ACL, BucketField.METAGENERATION));
GrpcCallContext grpcCallContext = GrpcCallContext.createDefault();
GetObjectRequest req =
fields
.getObject()
.apply(GetObjectRequest.newBuilder())
.setBucket(obj.getBucket())
.setObject(obj.getName())
.build();

return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.getObjectCallable().call(req, grpcCallContext),
Decoder.identity());
}

private static UpdateObjectRequest createUpdateObjectAclRequest(
Object obj, ImmutableList<ObjectAccessControl> newAcls, long metageneration) {
Object update =
Object.newBuilder()
.setBucket(obj.getBucket())
.setName(obj.getName())
.addAllAcl(newAcls)
.build();
Opts<BucketTargetOpt> opts =
Opts.from(
UnifiedOpts.fields(ImmutableSet.of(BlobField.ACL)),
UnifiedOpts.metagenerationMatch(metageneration));
return opts.updateObjectsRequest()
.apply(UpdateObjectRequest.newBuilder())
.setObject(update)
.build();
}

private Object updateObject(UpdateObjectRequest req) {
GrpcCallContext grpcCallContext = GrpcCallContext.createDefault();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.updateObjectCallable().call(req, grpcCallContext),
Decoder.identity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3599,7 +3599,7 @@ PostPolicyV4 generateSignedPostPolicyV4(
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
Acl getAcl(BlobId blob, Entity entity);

/**
Expand All @@ -3623,7 +3623,7 @@ PostPolicyV4 generateSignedPostPolicyV4(
* @return {@code true} if the ACL was deleted, {@code false} if it was not found
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
boolean deleteAcl(BlobId blob, Entity entity);

/**
Expand Down Expand Up @@ -3651,7 +3651,7 @@ PostPolicyV4 generateSignedPostPolicyV4(
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
Acl createAcl(BlobId blob, Acl acl);

/**
Expand All @@ -3669,7 +3669,7 @@ PostPolicyV4 generateSignedPostPolicyV4(
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
Acl updateAcl(BlobId blob, Acl acl);

/**
Expand All @@ -3690,7 +3690,7 @@ PostPolicyV4 generateSignedPostPolicyV4(
*
* @throws StorageException upon failure
*/
@TransportCompatibility({Transport.HTTP})
@TransportCompatibility({Transport.HTTP, Transport.GRPC})
List<Acl> listAcls(BlobId blob);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,12 @@ public Mapper<ListObjectsRequest.Builder> listObjects() {
}

static final class Fields extends RpcOptVal<ImmutableSet<NamedField>>
implements ObjectSourceOpt, ObjectListOpt, BucketSourceOpt, BucketTargetOpt, BucketListOpt {
implements ObjectSourceOpt,
ObjectListOpt,
ObjectTargetOpt,
BucketSourceOpt,
BucketTargetOpt,
BucketListOpt {

/**
* Apiary and gRPC have differing handling of where the field selector is evaluated relative to
Expand Down Expand Up @@ -772,6 +777,16 @@ public Mapper<ReadObjectRequest.Builder> readObject() {
return b -> b.setReadMask(FieldMask.newBuilder().addAllPaths(getPaths()).build());
}

@Override
public Mapper<UpdateObjectRequest.Builder> updateObject() {
return b -> b.setUpdateMask(FieldMask.newBuilder().addAllPaths(getPaths()).build());
}

@Override
public Mapper<RewriteObjectRequest.Builder> rewriteObject() {
return Mapper.identity();
}

/**
* Define a decoder which can clear out any fields which may have not been selected.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1125,10 +1125,14 @@ public void testBlobAcl() {

static ImmutableList<Acl> dropEtags(List<Acl> defaultAcls) {
return defaultAcls.stream()
.map(acl -> Acl.of(acl.getEntity(), acl.getRole()))
.map(ITAccessTest::dropEtag)
.collect(ImmutableList.toImmutableList());
}

static Acl dropEtag(Acl acl) {
return Acl.of(acl.getEntity(), acl.getRole());
}

static Predicate<Acl> hasRole(Acl.Role expected) {
return acl -> acl.getRole().equals(expected);
}
Expand Down
Loading