Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Add new test unit
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Aug 19, 2021
1 parent 34e12bc commit 3cdac8e
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms(), kafkaConfig)
: null;
final boolean authorizationEnabled = pulsarService.getBrokerService().isAuthorizationEnabled();
this.authorizer = authorizationEnabled && authenticationEnabled
this.authorizer = authorizationEnabled && authenticationEnabled
? new SimpleAclAuthorizer(pulsarService)
: null;
this.adminManager = adminManager;
Expand Down Expand Up @@ -561,8 +561,7 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
if (kafkaConfig.isAllowAutoTopicCreation()
&& metadataRequest.allowAutoTopicCreation()) {
authorize(AclOperation.CREATE,
Resource.of(ResourceType.NAMESPACE,
TopicName.get(fullTopicName).getNamespace())
Resource.of(ResourceType.TOPIC, fullTopicName)
).whenComplete((canCreateTopic, authEx) -> {
if (authEx != null) {
completeOneAuthFailedTopic.accept(topic, fullTopicName);
Expand Down Expand Up @@ -2270,7 +2269,8 @@ private void updateProducerStats(final TopicPartition topicPartition, final int
RequestStats.BATCH_COUNT_PER_MEMORY_RECORDS_INSTANCE.set(numMessages);
}

private CompletableFuture<Boolean> authorize(AclOperation operation, Resource resource) {
@VisibleForTesting
protected CompletableFuture<Boolean> authorize(AclOperation operation, Resource resource) {
if (authorizer == null) {
return CompletableFuture.completedFuture(true);
}
Expand All @@ -2292,9 +2292,9 @@ private CompletableFuture<Boolean> authorize(AclOperation operation, Resource re
isAuthorizedFuture = authorizer.canLookupAsync(session.getPrincipal(), resource);
break;
case CREATE:
case DELETE:
isAuthorizedFuture = authorizer.canManageTopicAsync(session.getPrincipal(), resource);
isAuthorizedFuture = authorizer.canCreateTopicAsync(session.getPrincipal(), resource);
break;
case DELETE:
case CLUSTER_ACTION:
case DESCRIBE_CONFIGS:
case ALTER_CONFIGS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public interface Authorizer {
CompletableFuture<Boolean> canConsumeAsync(KafkaPrincipal principal, Resource resource);

/**
* Check whether the specified role can manage topics.
* Check whether the specified role can create topics.
*
* For that the caller needs to have created or delete topic permission.
* For that the caller needs to have created topic permission.
*
* @param principal login info
* @param resource resources to be authorized
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> canManageTopicAsync(KafkaPrincipal principal, Resource resource);
CompletableFuture<Boolean> canCreateTopicAsync(KafkaPrincipal principal, Resource resource);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.streamnative.pulsar.handlers.kop.security.auth;


import lombok.EqualsAndHashCode;
import lombok.Getter;

/**
* The Authorization resource.
*/
@Getter
@EqualsAndHashCode
public class Resource {

private final ResourceType resourceType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.Joiner;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import java.util.Map;
import java.util.Set;
Expand All @@ -26,14 +27,15 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* Simple acl authorizer.
*/
@Slf4j
public class SimpleAclAuthorizer implements Authorizer {

private static final String POLICY_ROOT = "/admin/policies";
private static final String POLICY_ROOT = "/admin/policies/";

private final PulsarService pulsarService;

Expand All @@ -50,85 +52,90 @@ protected PulsarService getPulsarService() {

private CompletableFuture<Boolean> authorize(KafkaPrincipal principal, AuthAction action, Resource resource) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
String namespaceName = "";
NamespaceName namespace = null;
if (resource.getResourceType() == ResourceType.NAMESPACE) {
namespaceName = NamespaceName.get(resource.getName()).toString();
namespace = NamespaceName.get(resource.getName());
} else if (resource.getResourceType() == ResourceType.TOPIC) {
namespaceName = TopicName.get(resource.getName()).getNamespace();
namespace = TopicName.get(resource.getName()).getNamespaceObject();
}
String policiesPath = String.format("%s/%s", POLICY_ROOT, namespaceName);
isSuperUser(principal.getName()).whenComplete((isSuper, exception) -> {
if (namespace == null) {
permissionFuture.completeExceptionally(
new IllegalArgumentException("Resource name must contains namespace."));
return permissionFuture;
}
String policiesPath = path(namespace.toString());
String tenantName = namespace.getTenant();
isSuperUserOrTenantAdmin(tenantName, principal.getName()).whenComplete((isSuperUserOrAdmin, exception) -> {
if (exception != null) {
log.error("Check super user error: {}", exception.getMessage());
return;
if (log.isDebugEnabled()) {
log.debug("Verify if role {} is allowed to {} to resource {}: isSuperUserOrAdmin={}",
principal.getName(), action, resource.getName(), isSuperUserOrAdmin);
}
isSuperUserOrAdmin = false;
}
if (isSuper) {
if (isSuperUserOrAdmin) {
permissionFuture.complete(true);
} else {
getPulsarService()
.getPulsarResources()
.getNamespaceResources()
.getAsync(policiesPath)
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", principal);
}
} else {
String role = principal.getName();

// Check Topic level policies
if (resource.getResourceType() == ResourceType.TOPIC) {
TopicName topicName = TopicName.get(resource.getName());
Map<String, Set<AuthAction>> topicRoles = policies.get()
.auth_policies
.getTopicAuthentication()
.get(topicName.toString());
if (topicRoles != null && role != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
permissionFuture.complete(true);
return;
}
return;
}
getPulsarService()
.getPulsarResources()
.getNamespaceResources()
.getAsync(policiesPath)
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", principal);
}
} else {
String role = principal.getName();

// Check Topic level policies
if (resource.getResourceType() == ResourceType.TOPIC) {
TopicName topicName = TopicName.get(resource.getName());
Map<String, Set<AuthAction>> topicRoles = policies.get()
.auth_policies
.getTopicAuthentication()
.get(topicName.toString());
if (topicRoles != null && role != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
permissionFuture.complete(true);
return;
}
}
}

// Check Namespace level policies
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies
.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
permissionFuture.complete(true);
return;
}
// Check Namespace level policies
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies
.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
permissionFuture.complete(true);
return;
}

// Check wildcard policies
if (conf.isAuthorizationAllowWildcardsMatching()
&& checkWildcardPermission(role, action, namespaceRoles)) {
// The role has namespace level permission by wildcard match
permissionFuture.complete(true);
return;
}
// Check wildcard policies
if (conf.isAuthorizationAllowWildcardsMatching()
&& checkWildcardPermission(role, action, namespaceRoles)) {
// The role has namespace level permission by wildcard match
permissionFuture.complete(true);
return;
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Principal - {} failed to get permissions for resource - {}. {}",
principal, resource, ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
}
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Principal - {} failed to get permissions for resource - {}. {}",
principal, resource, ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});

});

return permissionFuture;
}

private CompletableFuture<Boolean> isSuperUser(String role) {
Set<String> superUserRoles = conf.getSuperUserRoles();
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}

private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAction,
Map<String, Set<AuthAction>> permissionMap) {
for (Map.Entry<String, Set<AuthAction>> permissionData : permissionMap.entrySet()) {
Expand All @@ -153,6 +160,51 @@ private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAc
return false;
}

private CompletableFuture<Boolean> isSuperUser(String role) {
Set<String> superUserRoles = conf.getSuperUserRoles();
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}

/**
* Check if specified role is an admin of the tenant or superuser.
*
* @param tenant the tenant to check
* @param role the role to check
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
private CompletableFuture<Boolean> isSuperUserOrTenantAdmin(String tenant, String role) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
isSuperUser(role).whenComplete((isSuperUser, ex) -> {
if (ex != null || !isSuperUser) {
pulsarService.getPulsarResources()
.getTenantResources()
.getAsync(path(tenant))
.thenAccept(tenantInfo -> {
if (!tenantInfo.isPresent()) {
future.complete(false);
return;
}
TenantInfo info = tenantInfo.get();
future.complete(role != null
&& info.getAdminRoles() != null
&& info.getAdminRoles().contains(role));
});
return;
}
future.complete(true);
});
return future;
}

private static String path(String... parts) {
StringBuilder sb = new StringBuilder();
sb.append(POLICY_ROOT);
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}


@Override
public CompletableFuture<Boolean> canLookupAsync(KafkaPrincipal principal, Resource resource) {
CompletableFuture<Boolean> canLookupFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -202,9 +254,11 @@ public CompletableFuture<Boolean> canConsumeAsync(KafkaPrincipal principal, Reso
}

@Override
public CompletableFuture<Boolean> canManageTopicAsync(KafkaPrincipal principal, Resource resource) {
checkArgument(resource.getResourceType() == ResourceType.NAMESPACE,
String.format("Expected resource type is NAMESPACE, but have [%s]", resource.getResourceType()));
public CompletableFuture<Boolean> canCreateTopicAsync(KafkaPrincipal principal, Resource resource) {
checkArgument(resource.getResourceType() == ResourceType.TOPIC
|| resource.getResourceType() == ResourceType.NAMESPACE,
String.format("Expected resource type is TOPIC or NAMESPACE, but have [%s]",
resource.getResourceType()));
return authorize(principal, AuthAction.packages, resource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* Unit test for Authorization with `entryFormat=kafka`.
*/
public class KafkaAuthorizationKafkaTest extends KafkaAuthorizationTestBase{
public class KafkaAuthorizationKafkaTest extends KafkaAuthorizationTestBase {
public KafkaAuthorizationKafkaTest() {
super("kafka");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* Unit test for Authorization with `entryFormat=pulsar`.
*/
public class KafkaAuthorizationPulsarTest extends KafkaAuthorizationTestBase{
public class KafkaAuthorizationPulsarTest extends KafkaAuthorizationTestBase {
public KafkaAuthorizationPulsarTest() {
super("pulsar");
}
Expand Down
Loading

0 comments on commit 3cdac8e

Please sign in to comment.