Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker]make revokePermissionsOnTopic method async #14149

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
Expand Down Expand Up @@ -320,49 +321,54 @@ protected void internalDeleteTopicForcefully(boolean authoritative, boolean dele
}
}

private void revokePermissions(String topicUri, String role) {
Policies policies;
try {
policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
throw new RestException(e);
}
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", clientAppId(),
role, topicUri);
throw new RestException(Status.PRECONDITION_FAILED, "Permissions are not set at the topic level");
}
try {
// Write the new policies to metadata store
namespaceResources().setPolicies(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return p;
});
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, topicUri);
} catch (Exception e) {
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicUri, e);
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
throw new RestException(e);
}

private CompletableFuture<Void> revokePermissions(String topicUri, String role) {
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
policiesOptional -> {
Policies policies = policiesOptional.orElseThrow(() ->
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
|| !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) {
log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}",
clientAppId(), role, topicUri);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Permissions are not set at the topic level"));
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
return p;
}).thenAccept(__ ->
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
topicUri)
);
}
);
}

protected void internalRevokePermissionsOnTopic(String role) {
protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();

PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, true, false);
int numPartitions = meta.partitions;
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
revokePermissions(topicNamePartition.toString(), role);
}
}
revokePermissions(topicName.toString(), role);
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
getPartitionedTopicMetadataAsync(topicName, true, false)
.thenCompose(metadata -> {
int numPartitions = metadata.partitions;
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
TopicName topicNamePartition = topicName.getPartition(i);
future = future.thenComposeAsync(unused -> revokePermissions(topicNamePartition.toString(),
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
role));
}
}
return future.thenComposeAsync(unused -> revokePermissions(topicName.toString(), role))
liudezhi2098 marked this conversation as resolved.
Show resolved Hide resolved
.thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
}))
).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to revoke permissions for topic {}", clientAppId(), topicName, realCause);
resumeAsyncResponseExceptionally(asyncResponse, realCause);
return null;
});
}

protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
Expand Down Expand Up @@ -3866,7 +3872,8 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData,
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException authException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ public void grantPermissionsOnTopic(@PathParam("property") String property,
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 412, message = "Permissions are not set at the topic level")})
public void revokePermissionsOnTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("role") String role) {
validateTopicName(property, cluster, namespace, encodedTopic);
internalRevokePermissionsOnTopic(role);
public void revokePermissionsOnTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("role") String role) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
internalRevokePermissionsOnTopic(asyncResponse, role);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void grantPermissionsOnTopic(
@ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
@ApiResponse(code = 500, message = "Internal server error")})
public void revokePermissionsOnTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -200,8 +201,14 @@ public void revokePermissionsOnTopic(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions", required = true)
@PathParam("role") String role) {
validateTopicName(tenant, namespace, encodedTopic);
internalRevokePermissionsOnTopic(role);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalRevokePermissionsOnTopic(asyncResponse, role);
} catch (WebApplicationException wae) {
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@PUT
Expand Down
Loading