From e2ddc4bace5ad3d2f1725ed38c1e6708a64407d9 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 8 Sep 2016 09:43:25 +0200 Subject: [PATCH 1/2] Add support for Pub/Sub IAM methods and tests --- .../google/cloud/pubsub/PolicyMarshaller.java | 36 +++ .../java/com/google/cloud/pubsub/PubSub.java | 179 +++++++++++- .../com/google/cloud/pubsub/PubSubImpl.java | 110 +++++++ .../com/google/cloud/pubsub/Subscription.java | 101 +++++++ .../java/com/google/cloud/pubsub/Topic.java | 101 +++++++ .../cloud/pubsub/spi/DefaultPubSubRpc.java | 22 ++ .../google/cloud/pubsub/spi/PubSubRpc.java | 25 ++ .../google/cloud/pubsub/PubSubImplTest.java | 273 ++++++++++++++++++ .../google/cloud/pubsub/SubscriptionTest.java | 86 ++++++ .../com/google/cloud/pubsub/TopicTest.java | 85 ++++++ .../google/cloud/pubsub/it/ITPubSubTest.java | 99 +++++++ 11 files changed, 1114 insertions(+), 3 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PolicyMarshaller.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PolicyMarshaller.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PolicyMarshaller.java new file mode 100644 index 000000000000..5e910db00dd6 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PolicyMarshaller.java @@ -0,0 +1,36 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import com.google.cloud.Policy; + +final class PolicyMarshaller extends Policy.DefaultMarshaller { + + static final PolicyMarshaller INSTANCE = new PolicyMarshaller(); + + private PolicyMarshaller() {} + + @Override + protected com.google.iam.v1.Policy toPb(Policy policy) { + return super.toPb(policy); + } + + @Override + protected Policy fromPb(com.google.iam.v1.Policy policyPb) { + return super.fromPb(policyPb); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index a548b3850d15..0833f691ff25 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -19,6 +19,7 @@ import com.google.cloud.AsyncPage; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; +import com.google.cloud.Policy; import com.google.cloud.Service; import java.util.Iterator; @@ -655,7 +656,179 @@ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit, Iterable ackIds); - // IAM Policy operations: getPolicy, replacePolicy, testPermissions - // Not sure if ready (docs is not up-to-date) - // Looks like policy is per resource (topic or subscription) but not per service? + /** + * Returns the IAM access control policy for the specified topic. Returns {@code null} if the + * topic was not found. + * + * @throws PubSubException upon failure + */ + Policy getTopicPolicy(String topic); + + /** + * Sends a request for getting the IAM access control policy for the specified topic. This method + * returns a {@code Future} object to consume the result. {@link Future#get()} returns the + * requested policy or {@code null} if the topic was not found. + * + * @throws PubSubException upon failure + */ + Future getTopicPolicyAsync(String topic); + + /** + * Sets the IAM access control policy for the specified topic. Replaces any existing policy. This + * method returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, a + * {@code PubSubException} is thrown, denoting that the server aborted update. If an etag is not + * provided, the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + Policy replaceTopicPolicy(String topic, Policy newPolicy); + + /** + * Sends a request to set the IAM access control policy for the specified topic. Replaces any + * existing policy. This method returns a {@code Future} object to consume the result. + * {@link Future#get()} returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, + * {@link Future#get()} will throw a {@link java.util.concurrent.ExecutionException} caused by a + * {@code PubSubException}, denoting that the server aborted update. If an etag is not provided, + * the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + Future replaceTopicPolicyAsync(String topic, Policy newPolicy); + + /** + * Returns the permissions that a caller has on the specified topic. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A list of booleans representing whether the caller has the permissions specified (in + * the order of the given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + List testTopicPermissions(String topic, List permissions); + + /** + * Sends a request to get the permissions that a caller has on the specified topic. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A {@code Future} object to consume the result. {@link Future#get()} returns a list of + * booleans representing whether the caller has the permissions specified (in the order of the + * given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + Future> testTopicPermissionsAsync(String topic, List permissions); + + /** + * Returns the IAM access control policy for the specified subscription. Returns {@code null} if + * the subscription was not found. + * + * @throws PubSubException upon failure + */ + Policy getSubscriptionPolicy(String subscription); + + /** + * Sends a request for getting the IAM access control policy for the specified subscription. This + * method returns a {@code Future} object to consume the result. {@link Future#get()} returns the + * requested policy or {@code null} if the subscription was not found. + * + * @throws PubSubException upon failure + */ + Future getSubscriptionPolicyAsync(String subscription); + + /** + * Sets the IAM access control policy for the specified subscription. Replaces any existing + * policy. This method returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, a + * {@code PubSubException} is thrown, denoting that the server aborted update. If an etag is not + * provided, the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + Policy replaceSubscriptionPolicy(String subscription, Policy newPolicy); + + /** + * Sends a request to set the IAM access control policy for the specified subscription. Replaces + * any existing policy. This method returns a {@code Future} object to consume the result. + * {@link Future#get()} returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, + * {@link Future#get()} will throw a {@link java.util.concurrent.ExecutionException} caused by a + * {@code PubSubException}, denoting that the server aborted update. If an etag is not provided, + * the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + Future replaceSubscriptionPolicyAsync(String subscription, Policy newPolicy); + + /** + * Returns the permissions that a caller has on the specified subscription. You typically don't + * call this method if you're using Google Cloud Platform directly to manage permissions. This + * method is intended for integration with your proprietary software, such as a customized + * graphical user interface. For example, the Cloud Platform Console tests IAM permissions + * internally to determine which UI should be available to the logged-in user. + * + * @return A list of booleans representing whether the caller has the permissions specified (in + * the order of the given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + List testSubscriptionPermissions(String subscription, List permissions); + + /** + * Sends a request to get the permissions that a caller has on the specified subscription. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A {@code Future} object to consume the result. {@link Future#get()} returns a list of + * booleans representing whether the caller has the permissions specified (in the order of the + * given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + Future> testSubscriptionPermissionsAsync(String subscription, + List permissions); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 88c87b56b0f7..d830290eb732 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -20,6 +20,7 @@ import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY; import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import com.google.cloud.AsyncPage; @@ -27,6 +28,7 @@ import com.google.cloud.BaseService; import com.google.cloud.Page; import com.google.cloud.PageImpl; +import com.google.cloud.Policy; import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; import com.google.cloud.pubsub.spi.v1.PublisherApi; @@ -35,6 +37,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -42,6 +45,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import com.google.iam.v1.SetIamPolicyRequest; +import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.iam.v1.TestIamPermissionsResponse; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -65,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -95,6 +102,13 @@ public String apply(com.google.pubsub.v1.ReceivedMessage message) { return message.getAckId(); } }; + private static final Function POLICY_TO_PB_FUNCTION = + new Function() { + @Override + public Policy apply(com.google.iam.v1.Policy policyPb) { + return policyPb == null ? null : PolicyMarshaller.INSTANCE.fromPb(policyPb); + } + }; PubSubImpl(PubSubOptions options) { super(options); @@ -601,6 +615,102 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION); } + @Override + public Policy getTopicPolicy(String topic) { + return get(getTopicPolicyAsync(topic)); + } + + @Override + public Future getTopicPolicyAsync(String topic) { + return transform(rpc.getIamPolicy(PublisherApi.formatTopicName(options().projectId(), topic)), + POLICY_TO_PB_FUNCTION); + } + + @Override + public Policy replaceTopicPolicy(String topic, Policy newPolicy) { + return get(replaceTopicPolicyAsync(topic, newPolicy)); + } + + @Override + public Future replaceTopicPolicyAsync(String topic, Policy newPolicy) { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setPolicy(PolicyMarshaller.INSTANCE.toPb(newPolicy)) + .setResource(PublisherApi.formatTopicName(options().projectId(), topic)) + .build(); + return transform(rpc.setIamPolicy(request), POLICY_TO_PB_FUNCTION); + } + + @Override + public List testTopicPermissions(String topic, final List permissions) { + return get(testTopicPermissionsAsync(topic, permissions)); + } + + @Override + public Future> testTopicPermissionsAsync(String topic, List permissions) { + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(PublisherApi.formatTopicName(options().projectId(), topic)) + .addAllPermissions(permissions) + .build(); + return transform(rpc.testIamPermissions(request), permissionsFromPbFunction(permissions)); + } + + @Override + public Policy getSubscriptionPolicy(String subscription) { + return get(getSubscriptionPolicyAsync(subscription)); + } + + @Override + public Future getSubscriptionPolicyAsync(String subscription) { + return transform( + rpc.getIamPolicy(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)), + POLICY_TO_PB_FUNCTION); + } + + @Override + public Policy replaceSubscriptionPolicy(String subscription, Policy newPolicy) { + return get(replaceSubscriptionPolicyAsync(subscription, newPolicy)); + } + + @Override + public Future replaceSubscriptionPolicyAsync(String subscription, Policy newPolicy) { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setPolicy(PolicyMarshaller.INSTANCE.toPb(newPolicy)) + .setResource(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .build(); + return transform(rpc.setIamPolicy(request), POLICY_TO_PB_FUNCTION); + } + + @Override + public List testSubscriptionPermissions(String subscription, List permissions) { + return get(testSubscriptionPermissionsAsync(subscription, permissions)); + } + + @Override + public Future> testSubscriptionPermissionsAsync(String subscription, + List permissions) { + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .addAllPermissions(permissions) + .build(); + return transform(rpc.testIamPermissions(request), permissionsFromPbFunction(permissions)); + } + + private static Function> permissionsFromPbFunction( + final List permissions) { + return new Function>() { + @Override + public List apply(TestIamPermissionsResponse response) { + Set permissionsOwned = ImmutableSet.copyOf( + firstNonNull(response.getPermissionsList(), ImmutableList.of())); + ImmutableList.Builder answer = ImmutableList.builder(); + for (String permission : permissions) { + answer.add(permissionsOwned.contains(permission)); + } + return answer.build(); + } + }; + } + static Map optionMap(Option... options) { Map optionMap = Maps.newHashMap(); for (Option option : options) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index d96d51015fd3..afb064c734f9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.cloud.GrpcServiceOptions; +import com.google.cloud.Policy; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.cloud.pubsub.PubSub.PullOption; @@ -27,6 +28,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.concurrent.Future; @@ -301,6 +303,105 @@ public MessageConsumer pullAsync(MessageProcessor callback, PullOption... option return pubsub.pullAsync(name(), callback, options); } + /** + * Returns the IAM access control policy for this subscription. Returns {@code null} if the + * subscription was not found. + * + * @throws PubSubException upon failure + */ + public Policy getPolicy() { + return pubsub.getSubscriptionPolicy(this.name()); + } + + /** + * Sends a request for getting the IAM access control policy for this subscription. This method + * returns a {@code Future} object to consume the result. {@link Future#get()} returns the + * requested policy or {@code null} if the subscription was not found. + * + * @throws PubSubException upon failure + */ + public Future getPolicyAsync() { + return pubsub.getSubscriptionPolicyAsync(this.name()); + } + + /** + * Sets the IAM access control policy for this subscription. Replaces any existing policy. This + * method returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, a + * {@code PubSubException} is thrown, denoting that the server aborted update. If an etag is not + * provided, the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + public Policy replacePolicy(Policy newPolicy) { + return pubsub.replaceSubscriptionPolicy(this.name(), newPolicy); + } + + /** + * Sends a request to set the IAM access control policy for this subscription. Replaces any + * existing policy. This method returns a {@code Future} object to consume the result. + * {@link Future#get()} returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, + * {@link Future#get()} will throw a {@link java.util.concurrent.ExecutionException} caused by a + * {@code PubSubException}, denoting that the server aborted update. If an etag is not provided, + * the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + public Future replacePolicyAsync(Policy newPolicy) { + return pubsub.replaceSubscriptionPolicyAsync(this.name(), newPolicy); + } + + /** + * Returns the permissions that a caller has on this subscription. You typically don't call this + * method if you're using Google Cloud Platform directly to manage permissions. This method is + * intended for integration with your proprietary software, such as a customized graphical user + * interface. For example, the Cloud Platform Console tests IAM permissions internally to + * determine which UI should be available to the logged-in user. + * + * @return A list of booleans representing whether the caller has the permissions specified (in + * the order of the given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + public List testPermissions(List permissions) { + return pubsub.testSubscriptionPermissions(this.name(), permissions); + } + + /** + * Sends a request to get the permissions that a caller has on this subscription. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A {@code Future} object to consume the result. {@link Future#get()} returns a list of + * booleans representing whether the caller has the permissions specified (in the order of the + * given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + public Future> testPermissionsAsync(List permissions) { + return pubsub.testSubscriptionPermissionsAsync(this.name(), permissions); + } + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { input.defaultReadObject(); this.pubsub = options.service(); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java index a2b348f4e91d..e0c3cbf30f0f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java @@ -20,6 +20,7 @@ import com.google.cloud.AsyncPage; import com.google.cloud.Page; +import com.google.cloud.Policy; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.common.base.Function; @@ -253,6 +254,106 @@ public Future> listSubscriptionsAsync(ListOption... op return pubsub.listSubscriptionsAsync(name(), options); } + /** + * Returns the IAM access control policy for this topic. Returns {@code null} if the topic was not + * found. + * + * @throws PubSubException upon failure + */ + public Policy getPolicy() { + return pubsub.getTopicPolicy(this.name()); + } + + /** + * Sends a request for getting the IAM access control policy for this topic. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns the requested policy + * or {@code null} if the topic was not found. + * + * @throws PubSubException upon failure + */ + public Future getPolicyAsync() { + return pubsub.getTopicPolicyAsync(this.name()); + } + + /** + * Sets the IAM access control policy for this topic. Replaces any existing policy. This method + * returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, a + * {@code PubSubException} is thrown, denoting that the server aborted update. If an etag is not + * provided, the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + public Policy replacePolicy(Policy newPolicy) { + return pubsub.replaceTopicPolicy(this.name(), newPolicy); + } + + /** + * Sends a request to set the IAM access control policy for this topic. Replaces any existing + * policy. This method returns a {@code Future} object to consume the result. {@link Future#get()} + * returns the new policy. + * + *

It is recommended that you use the read-modify-write pattern. This pattern entails reading + * the project's current policy, updating it locally, and then sending the modified policy for + * writing. Cloud IAM solves the problem of conflicting processes simultaneously attempting to + * modify a policy by using the {@link Policy#etag etag} property. This property is used to + * verify whether the policy has changed since the last request. When you make a request with an + * etag value, the value in the request is compared with the existing etag value associated with + * the policy. The policy is written only if the etag values match. If the etags don't match, + * {@link Future#get()} will throw a {@link java.util.concurrent.ExecutionException} caused by a + * {@code PubSubException}, denoting that the server aborted update. If an etag is not provided, + * the policy is overwritten blindly. + * + * @throws PubSubException upon failure + */ + public Future replacePolicyAsync(Policy newPolicy) { + return pubsub.replaceTopicPolicyAsync(this.name(), newPolicy); + } + + /** + * Returns the permissions that a caller has on this topic. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A list of booleans representing whether the caller has the permissions specified (in + * the order of the given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + public List testPermissions(List permissions) { + return pubsub.testTopicPermissions(this.name(), permissions); + } + + /** + * Sends a request to get the permissions that a caller has on this topic. + * + *

You typically don't call this method if you're using Google Cloud Platform directly to + * manage permissions. This method is intended for integration with your proprietary software, + * such as a customized graphical user interface. For example, the Cloud Platform Console tests + * IAM permissions internally to determine which UI should be available to the logged-in user. + * + * @return A {@code Future} object to consume the result. {@link Future#get()} returns a list of + * booleans representing whether the caller has the permissions specified (in the order of the + * given permissions) + * @throws PubSubException upon failure + * @see + * Permissions and Roles + */ + public Future> testPermissionsAsync(List permissions) { + return pubsub.testTopicPermissionsAsync(this.name(), permissions); + } + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { input.defaultReadObject(); this.pubsub = options.service(); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index d952a773a4b7..ed59a2b90a72 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -35,6 +35,11 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.iam.v1.GetIamPolicyRequest; +import com.google.iam.v1.Policy; +import com.google.iam.v1.SetIamPolicyRequest; +import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.iam.v1.TestIamPermissionsResponse; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -256,6 +261,23 @@ public Future modify(ModifyPushConfigRequest request) { return translate(subscriberApi.modifyPushConfigCallable().futureCall(request), false); } + @Override + public Future getIamPolicy(String resource) { + GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder().setResource(resource).build(); + return translate(subscriberApi.getIamPolicyCallable().futureCall(request), true, + Code.NOT_FOUND.value()); + } + + @Override + public Future setIamPolicy(SetIamPolicyRequest request) { + return translate(subscriberApi.setIamPolicyCallable().futureCall(request), false); + } + + @Override + public Future testIamPermissions(TestIamPermissionsRequest request) { + return translate(subscriberApi.testIamPermissionsCallable().futureCall(request), true); + } + @Override public void close() throws Exception { if (closed) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java index 43ff2fa6223c..172f2e8dcc5f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -16,6 +16,10 @@ package com.google.cloud.pubsub.spi; +import com.google.iam.v1.Policy; +import com.google.iam.v1.SetIamPolicyRequest; +import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.iam.v1.TestIamPermissionsResponse; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -201,4 +205,25 @@ interface PullFuture extends Future { * @param request the request object containing all of the parameters for the API call */ Future modify(ModifyPushConfigRequest request); + + /** + * Sends a request to get the IAM policy for the provided resource. + * + * @param resource the resource for which to get the IAM policy + */ + Future getIamPolicy(String resource); + + /** + * Sends a request to set the IAM policy for a resource. + * + * @param request the request object containing all of the parameters for the API call + */ + Future setIamPolicy(SetIamPolicyRequest request); + + /** + * Sends a request to test the permissions that the caller has on a provided resource. + * + * @param request the request object containing all of the parameters for the API call + */ + Future testIamPermissions(TestIamPermissionsRequest request); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 7d6dc928d883..b46379cc402a 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -27,8 +27,11 @@ import com.google.cloud.AsyncPage; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; +import com.google.cloud.Identity; import com.google.cloud.Page; +import com.google.cloud.Policy; import com.google.cloud.RetryParams; +import com.google.cloud.Role; import com.google.cloud.pubsub.MessageConsumerImplTest.TestPullFuture; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.PubSub.MessageConsumer; @@ -44,6 +47,9 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; +import com.google.iam.v1.SetIamPolicyRequest; +import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.iam.v1.TestIamPermissionsResponse; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; @@ -121,6 +127,10 @@ public com.google.pubsub.v1.Topic apply(TopicInfo topicInfo) { .setMessage(MESSAGE2.toPb()) .setAckId("ackId2") .build(); + private static final Policy POLICY = Policy.builder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build(); + private static final com.google.iam.v1.Policy POLICY_PB = PolicyMarshaller.INSTANCE.toPb(POLICY); private static final Function SUBSCRIPTION_TO_PB_FUNCTION = new Function() { @@ -1698,6 +1708,269 @@ public void testModifyAckDeadlineMessageListAsync() assertNull(future.get()); } + @Test + public void testGetTopicPolicy() { + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.getIamPolicy(TOPIC_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Policy policy = pubsub.getTopicPolicy(TOPIC); + assertEquals(POLICY, policy); + } + + @Test + public void testGetTopicPolicy_Null() { + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.getIamPolicy(TOPIC_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + assertNull(pubsub.getTopicPolicy(TOPIC)); + } + + @Test + public void testGetTopicPolicyAsync() throws ExecutionException, InterruptedException { + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.getIamPolicy(TOPIC_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future future = pubsub.getTopicPolicyAsync(TOPIC); + assertEquals(POLICY, future.get()); + } + + @Test + public void testGetTopicPolicyAsync_Null() throws ExecutionException, InterruptedException { + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.getIamPolicy(TOPIC_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + assertNull(pubsub.getTopicPolicyAsync(TOPIC).get()); + } + + @Test + public void testReplaceTopicPolicy() { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .setPolicy(PolicyMarshaller.INSTANCE.toPb(POLICY)) + .build(); + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.setIamPolicy(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Policy policy = pubsub.replaceTopicPolicy(TOPIC, POLICY); + assertEquals(POLICY, policy); + } + + @Test + public void testReplaceTopicPolicyAsync() throws ExecutionException, InterruptedException { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .setPolicy(PolicyMarshaller.INSTANCE.toPb(POLICY)) + .build(); + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.setIamPolicy(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future future = pubsub.replaceTopicPolicyAsync(TOPIC, POLICY); + assertEquals(POLICY, future.get()); + } + + @Test + public void testTestTopicPermissions() { + List permissions = ImmutableList.of("pubsub.topics.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(permissions) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + List permissionBooleans = pubsub.testTopicPermissions(TOPIC, permissions); + assertEquals(ImmutableList.of(true), permissionBooleans); + } + + @Test + public void testTestTopicNoPermissions() { + List permissions = ImmutableList.of("pubsub.topics.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(ImmutableList.of()) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + List permissionBooleans = pubsub.testTopicPermissions(TOPIC, permissions); + assertEquals(ImmutableList.of(false), permissionBooleans); + } + + @Test + public void testTestTopicPermissionsAsync() throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.topics.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(permissions) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future> future = pubsub.testTopicPermissionsAsync(TOPIC, permissions); + assertEquals(ImmutableList.of(true), future.get()); + } + + @Test + public void testTestTopicNoPermissionsAsync() throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.topics.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(TOPIC_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(ImmutableList.of()) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future> future = pubsub.testTopicPermissionsAsync(TOPIC, permissions); + assertEquals(ImmutableList.of(false), future.get()); + } + + @Test + public void testGetSubscriptionPolicy() { + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.getIamPolicy(SUBSCRIPTION_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Policy policy = pubsub.getSubscriptionPolicy(SUBSCRIPTION); + assertEquals(POLICY, policy); + } + + @Test + public void testGetSubscriptionPolicy_Null() { + Future response = Futures.immediateFuture(null); + EasyMock.expect(pubsubRpcMock.getIamPolicy(SUBSCRIPTION_NAME_PB)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + assertNull(pubsub.getSubscriptionPolicy(SUBSCRIPTION)); + } + + @Test + public void testReplaceSubscriptionPolicy() { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .setPolicy(PolicyMarshaller.INSTANCE.toPb(POLICY)) + .build(); + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.setIamPolicy(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Policy policy = pubsub.replaceSubscriptionPolicy(SUBSCRIPTION, POLICY); + assertEquals(POLICY, policy); + } + + @Test + public void testReplaceSubscriptionPolicyAsync() throws ExecutionException, InterruptedException { + SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .setPolicy(PolicyMarshaller.INSTANCE.toPb(POLICY)) + .build(); + Future response = Futures.immediateFuture(POLICY_PB); + EasyMock.expect(pubsubRpcMock.setIamPolicy(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future future = pubsub.replaceSubscriptionPolicyAsync(SUBSCRIPTION, POLICY); + assertEquals(POLICY, future.get()); + } + + @Test + public void testTestSubscriptionPermissions() { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(permissions) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + List permissionBooleans = + pubsub.testSubscriptionPermissions(SUBSCRIPTION, permissions); + assertEquals(ImmutableList.of(true), permissionBooleans); + } + + @Test + public void testTestSubscriptionNoPermissions() { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(ImmutableList.of()) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + List permissionBooleans = + pubsub.testSubscriptionPermissions(SUBSCRIPTION, permissions); + assertEquals(ImmutableList.of(false), permissionBooleans); + } + + @Test + public void testTestSubscriptionPermissionsAsync() + throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(permissions) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future> future = + pubsub.testSubscriptionPermissionsAsync(SUBSCRIPTION, permissions); + assertEquals(ImmutableList.of(true), future.get()); + } + + @Test + public void testTestSubscriptionNoPermissionsAsync() + throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder() + .setResource(SUBSCRIPTION_NAME_PB) + .addAllPermissions(permissions) + .build(); + TestIamPermissionsResponse response = TestIamPermissionsResponse.newBuilder() + .addAllPermissions(ImmutableList.of()) + .build(); + Future responseFuture = Futures.immediateFuture(response); + EasyMock.expect(pubsubRpcMock.testIamPermissions(request)).andReturn(responseFuture); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); + Future> future = + pubsub.testSubscriptionPermissionsAsync(SUBSCRIPTION, permissions); + assertEquals(ImmutableList.of(false), future.get()); + } + @Test public void testClose() throws Exception { pubsub = new PubSubImpl(options, renewerMock); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java index b04fd800759f..7d0dbb1529c2 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriptionTest.java @@ -27,6 +27,9 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import com.google.cloud.Identity; +import com.google.cloud.Policy; +import com.google.cloud.Role; import com.google.cloud.pubsub.PubSub.MessageConsumer; import com.google.cloud.pubsub.PubSub.MessageProcessor; import com.google.cloud.pubsub.PubSub.PullOption; @@ -64,6 +67,9 @@ public class SubscriptionTest { .setMessage(MESSAGE2.toPb()) .setAckId("ackId2") .build(); + private static final Policy POLICY = Policy.builder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build(); private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class); private final PubSubOptions mockOptions = createStrictMock(PubSubOptions.class); @@ -316,6 +322,86 @@ public void testMessageConsumerWithOptions() throws ExecutionException, Interrup verify(messageConsumer, messageProcessor); } + @Test + public void testGetPolicy() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscriptionPolicy(NAME)).andReturn(POLICY); + replay(pubsub); + initializeSubscription(); + Policy policy = subscription.getPolicy(); + assertEquals(POLICY, policy); + } + + @Test + public void testGetPolicyNull() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscriptionPolicy(NAME)).andReturn(null); + replay(pubsub); + initializeSubscription(); + assertNull(subscription.getPolicy()); + } + + @Test + public void testGetPolicyAsync() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getSubscriptionPolicyAsync(NAME)).andReturn(Futures.immediateFuture(POLICY)); + replay(pubsub); + initializeSubscription(); + Policy policy = subscription.getPolicyAsync().get(); + assertEquals(POLICY, policy); + } + + @Test + public void testReplacePolicy() { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.replaceSubscriptionPolicy(NAME, POLICY)).andReturn(POLICY); + replay(pubsub); + initializeSubscription(); + Policy policy = subscription.replacePolicy(POLICY); + assertEquals(POLICY, policy); + } + + @Test + public void testReplacePolicyAsync() throws ExecutionException, InterruptedException { + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.replaceSubscriptionPolicyAsync(NAME, POLICY)) + .andReturn(Futures.immediateFuture(POLICY)); + replay(pubsub); + initializeSubscription(); + Policy policy = subscription.replacePolicyAsync(POLICY).get(); + assertEquals(POLICY, policy); + } + + @Test + public void testTestPermissions() { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + List permissionsResult = ImmutableList.of(true); + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.testSubscriptionPermissions(NAME, permissions)).andReturn(permissionsResult); + replay(pubsub); + initializeSubscription(); + assertEquals(permissionsResult, subscription.testPermissions(permissions)); + } + + @Test + public void testTestPermissionsAsync() throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.subscriptions.get"); + List permissionsResult = ImmutableList.of(true); + initializeExpectedSubscription(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.testSubscriptionPermissionsAsync(NAME, permissions)) + .andReturn(Futures.immediateFuture(permissionsResult)); + replay(pubsub); + initializeSubscription(); + assertEquals(permissionsResult, subscription.testPermissionsAsync(permissions).get()); + } + private void compareSubscription(Subscription expected, Subscription value) { assertEquals(expected, value); assertEquals(expected.topic(), value.topic()); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java index 911c6ec0d627..be9587a9f98c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java @@ -28,8 +28,11 @@ import com.google.cloud.AsyncPage; import com.google.cloud.AsyncPageImpl; +import com.google.cloud.Identity; import com.google.cloud.Page; import com.google.cloud.PageImpl; +import com.google.cloud.Policy; +import com.google.cloud.Role; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; @@ -44,6 +47,9 @@ public class TopicTest { private static final String NAME = "topic"; private static final TopicInfo TOPIC_INFO = TopicInfo.of(NAME); + private static final Policy POLICY = Policy.builder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build(); private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class); private final PubSubOptions mockOptions = createMock(PubSubOptions.class); @@ -312,6 +318,85 @@ public void testListSubscriptionsAsyncWithOptions() topic.listSubscriptionsAsync(ListOption.pageSize(42)).get().values()); } + @Test + public void testGetPolicy() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopicPolicy(NAME)).andReturn(POLICY); + replay(pubsub); + initializeTopic(); + Policy policy = topic.getPolicy(); + assertEquals(POLICY, policy); + } + + @Test + public void testGetPolicyNull() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopicPolicy(NAME)).andReturn(null); + replay(pubsub); + initializeTopic(); + assertNull(topic.getPolicy()); + } + + @Test + public void testGetPolicyAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.getTopicPolicyAsync(NAME)).andReturn(Futures.immediateFuture(POLICY)); + replay(pubsub); + initializeTopic(); + Policy policy = topic.getPolicyAsync().get(); + assertEquals(POLICY, policy); + } + + @Test + public void testReplacePolicy() { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.replaceTopicPolicy(NAME, POLICY)).andReturn(POLICY); + replay(pubsub); + initializeTopic(); + Policy policy = topic.replacePolicy(POLICY); + assertEquals(POLICY, policy); + } + + @Test + public void testReplacePolicyAsync() throws ExecutionException, InterruptedException { + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.replaceTopicPolicyAsync(NAME, POLICY)).andReturn(Futures.immediateFuture(POLICY)); + replay(pubsub); + initializeTopic(); + Policy policy = topic.replacePolicyAsync(POLICY).get(); + assertEquals(POLICY, policy); + } + + @Test + public void testTestPermissions() { + List permissions = ImmutableList.of("pubsub.topics.get"); + List permissionsResult = ImmutableList.of(true); + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.testTopicPermissions(NAME, permissions)).andReturn(permissionsResult); + replay(pubsub); + initializeTopic(); + assertEquals(permissionsResult, topic.testPermissions(permissions)); + } + + @Test + public void testTestPermissionsAsync() throws ExecutionException, InterruptedException { + List permissions = ImmutableList.of("pubsub.topics.get"); + List permissionsResult = ImmutableList.of(true); + initializeExpectedTopic(1); + expect(pubsub.options()).andReturn(mockOptions); + expect(pubsub.testTopicPermissionsAsync(NAME, permissions)) + .andReturn(Futures.immediateFuture(permissionsResult)); + replay(pubsub); + initializeTopic(); + assertEquals(permissionsResult, topic.testPermissionsAsync(permissions).get()); + } + private void compareTopic(Topic expected, Topic value) { assertEquals(expected, value); assertEquals(expected.name(), value.name()); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index cf8fcc2668b5..249942b705cf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -16,14 +16,28 @@ package com.google.cloud.pubsub.it; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.Identity; +import com.google.cloud.Policy; +import com.google.cloud.Role; import com.google.cloud.pubsub.BaseSystemTest; import com.google.cloud.pubsub.PubSub; import com.google.cloud.pubsub.PubSubOptions; +import com.google.cloud.pubsub.Subscription; +import com.google.cloud.pubsub.SubscriptionInfo; +import com.google.cloud.pubsub.Topic; +import com.google.cloud.pubsub.TopicInfo; +import com.google.common.collect.ImmutableList; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; public class ITPubSubTest extends BaseSystemTest { @@ -42,4 +56,89 @@ protected PubSub pubsub() { protected String formatForTest(String resourceName) { return resourceName + "-" + NAME_SUFFIX; } + + // Policy tests are defined here and not in BaseSystemTest because Pub/Sub emulator does not + // support IAM yet + + @Test + public void testTopicPolicy() { + String topicName = formatForTest("test-topic-policy"); + Topic topic = pubsub().create(TopicInfo.of(topicName)); + Policy policy = pubsub().getTopicPolicy(topicName); + policy = pubsub().replaceTopicPolicy(topicName, policy.toBuilder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build()); + assertTrue(policy.bindings().containsKey(Role.viewer())); + assertTrue(policy.bindings().get(Role.viewer()).contains(Identity.allAuthenticatedUsers())); + List permissions = + pubsub().testTopicPermissions(topicName, ImmutableList.of("pubsub.topics.get")); + assertTrue(permissions.get(0)); + topic.delete(); + } + + @Test + public void testNonExistingTopicPolicy() { + String topicName = formatForTest("test-non-existing-topic-policy"); + assertNull(pubsub().getTopicPolicy(topicName)); + } + + @Test + public void testTopicPolicyAsync() throws ExecutionException, InterruptedException { + String topicName = formatForTest("test-topic-policy-async"); + Topic topic = pubsub().create(TopicInfo.of(topicName)); + Policy policy = pubsub().getTopicPolicyAsync(topicName).get(); + policy = pubsub().replaceTopicPolicyAsync(topicName, policy.toBuilder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build()).get(); + assertTrue(policy.bindings().containsKey(Role.viewer())); + assertTrue(policy.bindings().get(Role.viewer()).contains(Identity.allAuthenticatedUsers())); + List permissions = + pubsub().testTopicPermissionsAsync(topicName, ImmutableList.of("pubsub.topics.get")).get(); + assertTrue(permissions.get(0)); + topic.delete(); + } + + @Test + public void testSubscriptionPolicy() { + String topicName = formatForTest("test-subscription-policy"); + Topic topic = pubsub().create(TopicInfo.of(topicName)); + String subscriptionName = formatForTest("test-subscription-policy"); + Subscription subscription = pubsub().create(SubscriptionInfo.of(topicName, subscriptionName)); + Policy policy = pubsub().getSubscriptionPolicy(subscriptionName); + policy = pubsub().replaceSubscriptionPolicy(subscriptionName, policy.toBuilder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build()); + assertTrue(policy.bindings().containsKey(Role.viewer())); + assertTrue(policy.bindings().get(Role.viewer()).contains(Identity.allAuthenticatedUsers())); + List permissions = pubsub().testSubscriptionPermissions(subscriptionName, + ImmutableList.of("pubsub.subscriptions.get")); + assertTrue(permissions.get(0)); + topic.delete(); + subscription.delete(); + } + + @Test + public void testSubscriptionPolicyAsync() throws ExecutionException, InterruptedException { + String topicName = formatForTest("test-subscription-policy-async"); + Topic topic = pubsub().create(TopicInfo.of(topicName)); + String subscriptionName = formatForTest("test-subscription-policy-async"); + Subscription subscription = pubsub().create(SubscriptionInfo.of(topicName, subscriptionName)); + Policy policy = pubsub().getSubscriptionPolicyAsync(subscriptionName).get(); + policy = pubsub().replaceSubscriptionPolicyAsync(subscriptionName, policy.toBuilder() + .addIdentity(Role.viewer(), Identity.allAuthenticatedUsers()) + .build()).get(); + assertTrue(policy.bindings().containsKey(Role.viewer())); + assertTrue(policy.bindings().get(Role.viewer()).contains(Identity.allAuthenticatedUsers())); + List permissions = pubsub().testSubscriptionPermissionsAsync(subscriptionName, + ImmutableList.of("pubsub.subscriptions.get")).get(); + assertTrue(permissions.get(0)); + topic.delete(); + subscription.delete(); + } + + @Test + public void testNonExistingSubscriptionPolicy() { + String subscriptionName = formatForTest("test-non-existing-subscription-policy"); + assertNull(pubsub().getSubscriptionPolicy(subscriptionName)); + } } From c2a70279217cc40e5eaf9c0d5fd3c8564bea1bf0 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 8 Sep 2016 09:49:53 +0200 Subject: [PATCH 2/2] Add IAM methods to PubSubExample --- .../cloud/examples/pubsub/PubSubExample.java | 192 +++++++++++++++++- 1 file changed, 191 insertions(+), 1 deletion(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java index b3f4405e1950..7396de7d1df5 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java @@ -16,6 +16,9 @@ package com.google.cloud.examples.pubsub; +import com.google.cloud.Identity; +import com.google.cloud.Policy; +import com.google.cloud.Role; import com.google.cloud.pubsub.Message; import com.google.cloud.pubsub.PubSub; import com.google.cloud.pubsub.PubSub.MessageProcessor; @@ -61,7 +64,13 @@ * delete topic * delete subscription * info topic - * info subscription "} + * info subscription + * get-policy topic + * get-policy subscription + * add-identity topic + * add-identity subscription + * test-permissions topic + + * test-permissions subscription +"} * *

The first parameter is an optional {@code project_id} (logged-in project will be used if not * supplied). Second parameter is a Pub/Sub operation and can be used to demonstrate its usage. For @@ -76,6 +85,9 @@ public class PubSubExample { private static final Map LIST_ACTIONS = new HashMap<>(); private static final Map DELETE_ACTIONS = new HashMap<>(); private static final Map PULL_ACTIONS = new HashMap<>(); + private static final Map GET_IAM_ACTIONS = new HashMap<>(); + private static final Map REPLACE_IAM_ACTIONS = new HashMap<>(); + private static final Map TEST_IAM_ACTIONS = new HashMap<>(); private static final Map ACTIONS = new HashMap<>(); private abstract static class PubSubAction { @@ -627,6 +639,175 @@ public String params() { } } + private abstract static class GetPolicyAction extends PubSubAction { + @Override + String parse(String... args) throws Exception { + String message; + if (args.length == 1) { + return args[0]; + } else if (args.length > 1) { + message = "Too many arguments."; + } else { + message = "Missing required resource name"; + } + throw new IllegalArgumentException(message); + } + + @Override + public String params() { + return ""; + } + } + + /** + * This class demonstrates how to get the IAM policy of a topic. + * + * @see Access Control + */ + private static class GetTopicPolicyAction extends GetPolicyAction { + @Override + public void run(PubSub pubsub, String topic) throws Exception { + Policy policy = pubsub.getTopicPolicy(topic); + System.out.printf("Policy for topic %s%n", topic); + System.out.println(policy); + } + } + + /** + * This class demonstrates how to get the IAM policy of a subscription. + * + * @see Access Control + */ + private static class GetSubscriptionPolicyAction extends GetPolicyAction { + @Override + public void run(PubSub pubsub, String subscription) throws Exception { + Policy policy = pubsub.getSubscriptionPolicy(subscription); + System.out.printf("Policy for subscription %s%n", subscription); + System.out.println(policy); + } + } + + private abstract static class AddIdentityAction + extends PubSubAction>> { + @Override + Tuple> parse(String... args) throws Exception { + String message; + if (args.length == 3) { + String resourceName = args[0]; + Role role = Role.of(args[1]); + Identity identity = Identity.valueOf(args[2]); + return Tuple.of(resourceName, Tuple.of(role, identity)); + } else if (args.length > 2) { + message = "Too many arguments."; + } else { + message = "Missing required resource name, role and identity"; + } + throw new IllegalArgumentException(message); + } + + @Override + public String params() { + return " "; + } + } + + /** + * This class demonstrates how to add an identity to a certain role in a topic's IAM policy. + * + * @see Access Control + */ + private static class AddIdentityTopicAction extends AddIdentityAction { + @Override + public void run(PubSub pubsub, Tuple> param) throws Exception { + String topic = param.x(); + Tuple roleAndIdentity = param.y(); + Role role = roleAndIdentity.x(); + Identity identity = roleAndIdentity.y(); + Policy policy = pubsub.getTopicPolicy(topic); + policy = pubsub.replaceTopicPolicy(topic, + policy.toBuilder().addIdentity(role, identity).build()); + System.out.printf("Added role %s to identity %s for topic %s%n", role, identity, topic); + System.out.println(policy); + } + } + + /** + * This class demonstrates how to add an identity to a certain role in a subscription's IAM + * policy. + * + * @see Access Control + */ + private static class AddIdentitySubscriptionAction extends AddIdentityAction { + @Override + public void run(PubSub pubsub, Tuple> param) throws Exception { + String subscription = param.x(); + Tuple roleAndIdentity = param.y(); + Role role = roleAndIdentity.x(); + Identity identity = roleAndIdentity.y(); + Policy policy = pubsub.getSubscriptionPolicy(subscription); + policy = pubsub.replaceSubscriptionPolicy(subscription, + policy.toBuilder().addIdentity(role, identity).build()); + System.out.printf("Added role %s to identity %s for subscription %s%n", role, identity, + subscription); + System.out.println(policy); + } + } + + private abstract static class TestPermissionsAction + extends PubSubAction>> { + @Override + Tuple> parse(String... args) throws Exception { + if (args.length >= 2) { + String resourceName = args[0]; + return Tuple.of(resourceName, Arrays.asList(Arrays.copyOfRange(args, 1, args.length))); + } + throw new IllegalArgumentException("Missing required resource name and permissions"); + } + + @Override + public String params() { + return " +"; + } + } + + /** + * This class demonstrates how to test whether the caller has the provided permissions on a topic. + * + * @see Access Control + */ + private static class TestTopicPermissionsAction extends TestPermissionsAction { + @Override + public void run(PubSub pubsub, Tuple> param) throws Exception { + String topic = param.x(); + List permissions = param.y(); + List booleanPermissions = pubsub.testTopicPermissions(topic, permissions); + System.out.printf("Caller permissions on topic %s%n", topic); + for (int i = 0; i < permissions.size(); i++) { + System.out.printf("%s: %b%n", permissions.get(i), booleanPermissions.get(i)); + } + } + } + + /** + * This class demonstrates how to test whether the caller has the provided permissions on a + * subscription. + * + * @see Access Control + */ + private static class TestSubscriptionPermissionsAction extends TestPermissionsAction { + @Override + public void run(PubSub pubsub, Tuple> param) throws Exception { + String subscription = param.x(); + List permissions = param.y(); + List booleanPermissions = + pubsub.testSubscriptionPermissions(subscription, permissions); + System.out.printf("Caller permissions on subscription %s%n", subscription); + for (int i = 0; i < permissions.size(); i++) { + System.out.printf("%s: %b%n", permissions.get(i), booleanPermissions.get(i)); + } + } + } + static { CREATE_ACTIONS.put("topic", new CreateTopicAction()); CREATE_ACTIONS.put("subscription", new CreateSubscriptionAction()); @@ -638,11 +819,20 @@ public String params() { DELETE_ACTIONS.put("subscription", new DeleteSubscriptionAction()); PULL_ACTIONS.put("async", new PullAsyncAction()); PULL_ACTIONS.put("sync", new PullSyncAction()); + GET_IAM_ACTIONS.put("topic", new GetTopicPolicyAction()); + GET_IAM_ACTIONS.put("subscription", new GetSubscriptionPolicyAction()); + REPLACE_IAM_ACTIONS.put("topic", new AddIdentityTopicAction()); + REPLACE_IAM_ACTIONS.put("subscription", new AddIdentitySubscriptionAction()); + TEST_IAM_ACTIONS.put("topic", new TestTopicPermissionsAction()); + TEST_IAM_ACTIONS.put("subscription", new TestSubscriptionPermissionsAction()); ACTIONS.put("create", new ParentAction(CREATE_ACTIONS)); ACTIONS.put("info", new ParentAction(INFO_ACTIONS)); ACTIONS.put("list", new ParentAction(LIST_ACTIONS)); ACTIONS.put("delete", new ParentAction(DELETE_ACTIONS)); ACTIONS.put("pull", new ParentAction(PULL_ACTIONS)); + ACTIONS.put("get-policy", new ParentAction(GET_IAM_ACTIONS)); + ACTIONS.put("add-identity", new ParentAction(REPLACE_IAM_ACTIONS)); + ACTIONS.put("test-permissions", new ParentAction(TEST_IAM_ACTIONS)); ACTIONS.put("publish", new PublishMessagesAction()); ACTIONS.put("replace-push-config", new ReplacePushConfigAction()); ACTIONS.put("ack", new AckMessagesAction());