diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 7d25580ff92bb..ffb38f770a9cc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -20,12 +20,15 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -223,6 +226,16 @@ CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespac CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role, String authDataJson); + default CompletableFuture grantPermissionAsync(List options) { + return FutureUtil.failedFuture(new IllegalStateException( + String.format("grantPermissionAsync is not supported by the Authorization"))); + } + + default CompletableFuture revokePermissionAsync(List options) { + return FutureUtil.failedFuture(new IllegalStateException( + String.format("revokePermissionAsync is not supported by the Authorization"))); + } + /** * Revoke authorization-action permission on a topic to the given client. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index c121d93b9b750..2951eb1f2973f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -20,6 +20,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.net.SocketAddress; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -32,6 +33,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationParameters; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -181,6 +184,14 @@ public CompletableFuture grantPermissionAsync(TopicName topicName, Set grantPermissionAsync(List options) { + return provider.grantPermissionAsync(options); + } + + public CompletableFuture revokePermissionAsync(List options) { + return provider.revokePermissionAsync(options); + } + /** * Revoke authorization-action permission on a topic to the given client. * diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index a39c3d0560760..0af63724cc812 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -24,14 +24,18 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; import javax.ws.rs.core.Response; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; @@ -251,6 +255,80 @@ public CompletableFuture grantPermissionAsync(TopicName topicName, Set grantPermissionAsync(List options) { + return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace())) + .thenCompose(__ -> getPoliciesReadOnlyAsync()) + .thenCompose(readonly -> { + if (readonly) { + if (log.isDebugEnabled()) { + log.debug("Policies are read-only. Broker cannot do read-write operations"); + } + throw new IllegalStateException("policies are in readonly mode"); + } + TopicName topicName = TopicName.get(options.get(0).getTopic()); + return pulsarResources.getNamespaceResources() + .setPoliciesAsync(topicName.getNamespaceObject(), policies -> { + options.stream().forEach(o -> { + final String topicUri = TopicName.get(o.getTopic()).toString(); + policies.auth_policies.getTopicAuthentication() + .computeIfAbsent(topicUri, __ -> new HashMap<>()) + .put(o.getRole(), o.getActions()); + }); + return policies; + }).whenComplete((__, ex) -> { + if (ex != null) { + log.error("Failed to grant permissions for {}", options); + } else { + log.info("Successfully granted access for {}", options); + } + }); + }); + } + + @Override + public CompletableFuture revokePermissionAsync(List options) { + return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace())) + .thenCompose(__ -> getPoliciesReadOnlyAsync()) + .thenCompose(readonly -> { + if (readonly) { + if (log.isDebugEnabled()) { + log.debug("Policies are read-only. Broker cannot do read-write operations"); + } + throw new IllegalStateException("policies are in readonly mode"); + } + TopicName topicName = TopicName.get(options.get(0).getTopic()); + return pulsarResources.getNamespaceResources() + .setPoliciesAsync(topicName.getNamespaceObject(), policies -> { + options.stream().forEach(o -> { + final String topicUri = TopicName.get(o.getTopic()).toString(); + policies.auth_policies.getTopicAuthentication() + .computeIfPresent(topicUri, (topicNameUri, roles) -> { + roles.remove(o.getRole()); + if (roles.isEmpty()) { + return null; + } + return roles; + }); + }); + return policies; + }).whenComplete((__, ex) -> { + if (ex != null) { + log.error("Failed to revoke permissions for {}", options, ex); + } else { + log.info("Successfully revoke permissions for {}", options); + } + }); + }); + } + + private CompletableFuture checkNamespace(Stream namespaces) { + boolean sameNamespace = namespaces.distinct().count() == 1; + if (!sameNamespace) { + throw new IllegalArgumentException("The namespace should be the same"); + } + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture revokePermissionAsync(TopicName topicName, String role) { return getPoliciesReadOnlyAsync().thenCompose(readonly -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 3268f07b13d88..45772dc279bab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -924,4 +924,15 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { "The bucket must be specified for namespace offload."); } } + + protected CompletableFuture internalCheckTopicExists(TopicName topicName) { + return pulsar().getNamespaceService().checkTopicExists(topicName) + .thenAccept(info -> { + boolean exists = info.isExists(); + info.recycle(); + if (!exists) { + throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); + } + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 18c80d6bef4bf..d80e2487b4f1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; @@ -65,8 +66,10 @@ import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.NamedEntity; @@ -613,6 +616,78 @@ protected CompletableFuture internalGrantPermissionOnNamespaceAsync(String }); } + protected CompletableFuture internalGrantPermissionOnTopicsAsync(List options) { + return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace())) + .thenCompose(__ -> validateAdminAccessForTenantAsync( + TopicName.get(options.get(0).getTopic()).getTenant()) + ).thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic())))) + .thenCompose(__ -> getAuthorizationService().grantPermissionAsync(options)) + .thenAccept(unused -> log.info("[{}] Successfully granted access for {}", clientAppId(), options)) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + //The IllegalArgumentException and the IllegalStateException were historically thrown by the + // grantPermissionAsync method, so we catch them here to ensure backwards compatibility. + if (realCause instanceof MetadataStoreException.NotFoundException + || realCause instanceof IllegalArgumentException) { + log.warn("[{}] Failed to grant permissions for namespace {}: does not exist", clientAppId(), + namespaceName, ex); + throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist"); + } else if (realCause instanceof MetadataStoreException.BadVersionException + || realCause instanceof IllegalStateException) { + log.warn("[{}] Failed to grant permissions for namespace {}: {}", + clientAppId(), namespaceName, ex.getCause().getMessage(), ex); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } else { + log.error("[{}] Failed to grant permissions for namespace {}", + clientAppId(), namespaceName, ex); + throw new RestException(realCause); + } + }); + } + + protected CompletableFuture internalRevokePermissionOnTopicsAsync( + List options) { + return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace())) + .thenCompose(__ -> validateAdminAccessForTenantAsync( + TopicName.get(options.get(0).getTopic()).getTenant())) + .thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic())))) + .thenCompose(__ -> getAuthorizationService().revokePermissionAsync(options)) + .thenAccept(unused -> log.info("[{}] Successfully revoke access for {}", clientAppId(), options)) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + //The IllegalArgumentException and the IllegalStateException were historically thrown by the + // grantPermissionAsync method, so we catch them here to ensure backwards compatibility. + if (realCause instanceof MetadataStoreException.NotFoundException + || realCause instanceof IllegalArgumentException) { + log.warn("[{}] Failed to revoke permissions for namespace {}: does not exist", clientAppId(), + namespaceName, ex); + throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist"); + } else if (realCause instanceof MetadataStoreException.BadVersionException + || realCause instanceof IllegalStateException) { + log.warn("[{}] Failed to revoke permissions for namespace {}: {}", + clientAppId(), namespaceName, ex.getCause().getMessage(), ex); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } else { + log.error("[{}] Failed to revoke permissions for namespace {}", + clientAppId(), namespaceName, ex); + throw new RestException(realCause); + } + }); + } + + private CompletableFuture checkNamespace(Stream namespaces) { + boolean sameNamespace = namespaces.distinct().count() == 1; + if (!sameNamespace) { + throw new RestException(Status.BAD_REQUEST, "The namespace should be the same"); + } + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture internalCheckTopicExists(Stream topicNameStream) { + List topicNames = topicNameStream.collect(Collectors.toList()); + return CompletableFuture.allOf(topicNames.stream().map(topic -> internalCheckTopicExists(topic)) + .toArray(CompletableFuture[]::new)); + } protected CompletableFuture internalGrantPermissionOnSubscriptionAsync(String subscription, Set roles) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 54cceaf09e9fe..36150ee21b32c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -50,6 +50,8 @@ import org.apache.pulsar.broker.admin.impl.NamespacesBase; import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.naming.NamespaceName; @@ -314,6 +316,48 @@ public void grantPermissionOnNamespace(@Suspended AsyncResponse asyncResponse, }); } + @POST + @Path("/grantPermissionsOnTopics") + @ApiOperation(value = "Grant new permissions to a role on multi-topics.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), + @ApiResponse(code = 500, message = "Internal server error") }) + public void grantPermissionsOnTopics(@Suspended final AsyncResponse asyncResponse, + List options) { + internalGrantPermissionOnTopicsAsync(options) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to grant permissions {}", + clientAppId(), options, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/revokePermissionsOnTopics") + @ApiOperation(value = "Revoke new permissions to a role on multi-topics.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"), + @ApiResponse(code = 500, message = "Internal server error") }) + public void revokePermissionsOnTopics(@Suspended final AsyncResponse asyncResponse, + List options) { + internalRevokePermissionOnTopicsAsync(options) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to revoke permissions {}", + clientAppId(), options, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @POST @Path("/{property}/{namespace}/permissions/subscription/{subscription}") @ApiOperation(hidden = true, value = "Grant a new permission to roles for a subscription." diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 900babbecf4ad..3f5ee721a7e6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -87,11 +87,13 @@ import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.Mode; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.client.admin.Topics.QueryParam; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -3685,4 +3687,61 @@ public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String topicTyp List.of(".*", "broker.*") ); } + + @Test + public void testGrantAndRevokePermissions() throws Exception { + + String namespace = newUniqueName(defaultTenant + "/") + "-unload-test-"; + String namespace2 = newUniqueName(defaultTenant + "/") + "-unload-test-"; + admin.namespaces().createNamespace(namespace, Set.of("test")); + admin.namespaces().createNamespace(namespace2, Set.of("test")); + // + final String topic1 = "persistent://" + namespace + "/test1"; + final String topic2 = "persistent://" + namespace + "/test2"; + final String topic3 = "non-persistent://" + namespace + "/test3"; + final String topic4 = "persistent://" + namespace2 + "/test4";; + + admin.topics().createPartitionedTopic(topic1, 3); + admin.topics().createPartitionedTopic(topic2, 3); + admin.topics().createPartitionedTopic(topic3, 3); + admin.topics().createPartitionedTopic(topic4, 3); + pulsarClient.newProducer().topic(topic1).create().close(); + pulsarClient.newProducer().topic(topic2).create().close(); + pulsarClient.newProducer().topic(topic3).create().close(); + pulsarClient.newProducer().topic(topic4).create().close(); + + List grantPermissionOptions = new ArrayList<>(); + grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build()); + grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic4).role("role4").actions(Set.of(AuthAction.produce)).build()); + try { + admin.namespaces().grantPermissionOnTopics(grantPermissionOptions); + fail("Should go here, because there are two namespaces"); + } catch (Exception ex) { + Assert.assertTrue(ex != null); + } + grantPermissionOptions.clear(); + grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build()); + grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic2).role("role2").actions(Set.of(AuthAction.consume)).build()); + grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic3).role("role3").actions(Set.of(AuthAction.produce, AuthAction.consume)).build()); + admin.namespaces().grantPermissionOnTopics(grantPermissionOptions); + + final Map> permissions1 = admin.topics().getPermissions(topic1); + final Map> permissions2 = admin.topics().getPermissions(topic2); + final Map> permissions3 = admin.topics().getPermissions(topic3); + + Assert.assertEquals(permissions1.get("role1"), Set.of(AuthAction.produce)); + Assert.assertEquals(permissions2.get("role2"), Set.of(AuthAction.consume)); + Assert.assertEquals(permissions3.get("role3"), Set.of(AuthAction.produce, AuthAction.consume)); + // + List revokePermissionOptions = new ArrayList<>(); + revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic1).role("role1").build()); + revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic2).role("role2").build()); + admin.namespaces().revokePermissionOnTopics(revokePermissionOptions); + + final Map> permissions11 = admin.topics().getPermissions(topic1); + final Map> permissions22 = admin.topics().getPermissions(topic2); + + Assert.assertTrue(permissions11.isEmpty()); + Assert.assertTrue(permissions22.isEmpty()); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java new file mode 100644 index 0000000000000..e365a086a771f --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import java.util.Set; +import lombok.Builder; +import lombok.Data; +import org.apache.pulsar.common.policies.data.AuthAction; + +@Data +@Builder +public class GrantTopicPermissionOptions { + + private final String topic; + + private final String role; + + private final Set actions; + +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 65124a6a76a8f..28ad852064b4f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -703,6 +703,34 @@ public interface Namespaces { */ CompletableFuture grantPermissionOnNamespaceAsync(String namespace, String role, Set actions); + /** + * Grant permissions on topics asynchronously. + * @param options + * @return + */ + CompletableFuture grantPermissionOnTopicsAsync(List options); + + /** + * Grant permissions on topics. + * @param options + * @throws PulsarAdminException + */ + void grantPermissionOnTopics(List options) throws PulsarAdminException; + + /** + * Revoke permissions on topics asynchronously. + * @param options + * @return + */ + CompletableFuture revokePermissionOnTopicsAsync(List options); + + /** + * Revoke permissions on topics. + * @param options + * @throws PulsarAdminException + */ + void revokePermissionOnTopics(List options) throws PulsarAdminException; + /** * Revoke permissions on a namespace. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java new file mode 100644 index 0000000000000..38e33c966b284 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class RevokeTopicPermissionOptions { + + private final String topic; + + private final String role; + +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7d41c7203d2c7..7695abdd4809b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -28,9 +28,11 @@ import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceName; @@ -288,6 +290,30 @@ public CompletableFuture grantPermissionOnNamespaceAsync( return asyncPostRequest(path, Entity.entity(actions, MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture grantPermissionOnTopicsAsync(List options) { + final WebTarget base = adminV2Namespaces; + WebTarget path = base.path("/grantPermissionsOnTopics"); + return asyncPostRequest(path, Entity.entity(options, MediaType.APPLICATION_JSON)); + } + + @Override + public void grantPermissionOnTopics(List options) throws PulsarAdminException { + sync(() -> grantPermissionOnTopicsAsync(options)); + } + + @Override + public CompletableFuture revokePermissionOnTopicsAsync(List options) { + final WebTarget base = adminV2Namespaces; + WebTarget path = base.path("/revokePermissionsOnTopics"); + return asyncPostRequest(path, Entity.entity(options, MediaType.APPLICATION_JSON)); + } + + @Override + public void revokePermissionOnTopics(List options) throws PulsarAdminException { + sync(() -> revokePermissionOnTopicsAsync(options)); + } + @Override public void revokePermissionsOnNamespace(String namespace, String role) throws PulsarAdminException { sync(() -> revokePermissionsOnNamespaceAsync(namespace, role));