Skip to content

Commit

Permalink
[Broker]make revokePermissionsOnTopic method async (#14149)
Browse files Browse the repository at this point in the history
(cherry picked from commit d7ddda8)
  • Loading branch information
liudezhi2098 authored and codelipenghui committed Jun 12, 2022
1 parent 4a82c5b commit a7f3efa
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
Expand Down Expand Up @@ -320,49 +321,54 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
}
}

private void revokePermissions(String topicUri, String role) {
Policies policies;
try {
policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
role, topicUri);
throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
}
try {
// Write the new policies to metadata store
namespaceResources().setPolicies(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return p;
});
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}

private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String role) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
clientAppId(), role, topicUri);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Permissions are not set at the topic level"));
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return p;
}).thenAccept(__ ->
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
topicUri)
);
}
);
}

protected void internalRevokePermissionsOnTopic(String role) {
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
int numPartitions = meta.partitions;
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
revokePermissions(topicNamePartition.toString(), role);
}
}
revokePermissions(topicName.toString(), role);
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused ->
revokePermissionsAsync(topicNamePartition.toString(), role));
}
}
return future.thenComposeAsync(unused -> revokePermissionsAsync(topicName.toString(), role))
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}

protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
Expand Down Expand Up @@ -3999,7 +4005,8 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException authException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ public void grantPermissionsOnTopic(@PathParam("property") String property,
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
public void revokePermissionsOnTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalRevokePermissionsOnTopic(role);
public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("role") String role) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalRevokePermissionsOnTopic(asyncResponse, role);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void grantPermissionsOnTopic(
@ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
@ApiResponse(code = 500, message = "Internal server error")})
public void revokePermissionsOnTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -200,8 +201,14 @@ public void revokePermissionsOnTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions", required = true)
@PathParam("role") String role) {
validateTopicName(tenant, namespace, encodedTopic);
internalRevokePermissionsOnTopic(role);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalRevokePermissionsOnTopic(asyncResponse, role);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down
Loading

0 comments on commit a7f3efa

Please sign in to comment.