Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-383: Support granting/revoking permissions for multiple topics #23372

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -223,6 +226,16 @@ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName namespac
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);

default CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("grantPermissionAsync is not supported by the Authorization")));
}

default CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("revokePermissionAsync is not supported by the Authorization")));
}


/**
* Revoke authorization-action permission on a topic to the given client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,6 +33,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -181,6 +184,14 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
return provider.grantPermissionAsync(topicName, actions, role, authDataJson);
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return provider.grantPermissionAsync(options);
}

public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return provider.revokePermissionAsync(options);
}

/**
* Revoke authorization-action permission on a topic to the given client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -251,6 +255,80 @@ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<Aut
});
}

public CompletableFuture<Void> grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> getPoliciesReadOnlyAsync())
.thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfAbsent(topicUri, __ -> new HashMap<>())
.put(o.getRole(), o.getActions());
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to grant permissions for {}", options);
} else {
log.info("Successfully granted access for {}", options);
}
});
});
}

@Override
public CompletableFuture<Void> revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> getPoliciesReadOnlyAsync())
.thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
TopicName topicName = TopicName.get(options.get(0).getTopic());
return pulsarResources.getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), policies -> {
options.stream().forEach(o -> {
final String topicUri = TopicName.get(o.getTopic()).toString();
policies.auth_policies.getTopicAuthentication()
.computeIfPresent(topicUri, (topicNameUri, roles) -> {
roles.remove(o.getRole());
if (roles.isEmpty()) {
return null;
}
return roles;
});
});
return policies;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to revoke permissions for {}", options, ex);
} else {
log.info("Successfully revoke permissions for {}", options);
}
});
});
}

private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
throw new IllegalArgumentException("The namespace should be the same");
}
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,4 +924,15 @@ protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
"The bucket must be specified for namespace offload.");
}
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(info -> {
boolean exists = info.isExists();
info.recycle();
if (!exists) {
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
Expand All @@ -65,8 +66,10 @@
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
Expand Down Expand Up @@ -613,6 +616,78 @@ protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String
});
}

protected CompletableFuture<Void> internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> validateAdminAccessForTenantAsync(
TopicName.get(options.get(0).getTopic()).getTenant())
).thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic()))))
.thenCompose(__ -> getAuthorizationService().grantPermissionAsync(options))
.thenAccept(unused -> log.info("[{}] Successfully granted access for {}", clientAppId(), options))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to grant permissions for namespace {}: does not exist", clientAppId(),
namespaceName, ex);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to grant permissions for namespace {}: {}",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to grant permissions for namespace {}",
clientAppId(), namespaceName, ex);
throw new RestException(realCause);
}
});
}

protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
List<RevokeTopicPermissionOptions> options) {
return checkNamespace(options.stream().map(o -> TopicName.get(o.getTopic()).getNamespace()))
.thenCompose(__ -> validateAdminAccessForTenantAsync(
TopicName.get(options.get(0).getTopic()).getTenant()))
.thenCompose(__ -> internalCheckTopicExists(options.stream().map(o -> TopicName.get(o.getTopic()))))
.thenCompose(__ -> getAuthorizationService().revokePermissionAsync(options))
.thenAccept(unused -> log.info("[{}] Successfully revoke access for {}", clientAppId(), options))
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
//The IllegalArgumentException and the IllegalStateException were historically thrown by the
// grantPermissionAsync method, so we catch them here to ensure backwards compatibility.
if (realCause instanceof MetadataStoreException.NotFoundException
|| realCause instanceof IllegalArgumentException) {
log.warn("[{}] Failed to revoke permissions for namespace {}: does not exist", clientAppId(),
namespaceName, ex);
throw new RestException(Status.NOT_FOUND, "Topic's namespace does not exist");
} else if (realCause instanceof MetadataStoreException.BadVersionException
|| realCause instanceof IllegalStateException) {
log.warn("[{}] Failed to revoke permissions for namespace {}: {}",
clientAppId(), namespaceName, ex.getCause().getMessage(), ex);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} else {
log.error("[{}] Failed to revoke permissions for namespace {}",
clientAppId(), namespaceName, ex);
throw new RestException(realCause);
}
});
}

private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
boolean sameNamespace = namespaces.distinct().count() == 1;
if (!sameNamespace) {
throw new RestException(Status.BAD_REQUEST, "The namespace should be the same");
}
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> internalCheckTopicExists(Stream<TopicName> topicNameStream) {
List<TopicName> topicNames = topicNameStream.collect(Collectors.toList());
return CompletableFuture.allOf(topicNames.stream().map(topic -> internalCheckTopicExists(topic))
.toArray(CompletableFuture[]::new));
}

protected CompletableFuture<Void> internalGrantPermissionOnSubscriptionAsync(String subscription,
Set<String> roles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -314,6 +316,48 @@ public void grantPermissionOnNamespace(@Suspended AsyncResponse asyncResponse,
});
}

@POST
@Path("/grantPermissionsOnTopics")
@ApiOperation(value = "Grant new permissions to a role on multi-topics.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 500, message = "Internal server error") })
public void grantPermissionsOnTopics(@Suspended final AsyncResponse asyncResponse,
List<GrantTopicPermissionOptions> options) {
internalGrantPermissionOnTopicsAsync(options)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to grant permissions {}",
clientAppId(), options, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/revokePermissionsOnTopics")
@ApiOperation(value = "Revoke new permissions to a role on multi-topics.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 500, message = "Internal server error") })
public void revokePermissionsOnTopics(@Suspended final AsyncResponse asyncResponse,
List<RevokeTopicPermissionOptions> options) {
internalRevokePermissionOnTopicsAsync(options)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to revoke permissions {}",
clientAppId(), options, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{property}/{namespace}/permissions/subscription/{subscription}")
@ApiOperation(hidden = true, value = "Grant a new permission to roles for a subscription."
Expand Down
Loading
Loading