Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[FEATURE] Add authorization to handle topic metadata request (#662)
Browse files Browse the repository at this point in the history
Add authorization to handleTopicMetadataRequest(#236 ).

Fix #415 and #571 

## Motivation
When client fetch metadata need check topic permission, so we need add authorization in handleTopicMetadataRequest, and do not perform role verification in authentication.

## Modifications
Add a common method in `KafkaRequestHandler#authorize` , this method use `authorizer` to authorization.
Modify the authentication behavior, and do not verify the role during authentication, verify the role in fetch metadata(#571  )
  • Loading branch information
Demogorgon314 committed Aug 21, 2021
1 parent 12f7808 commit 26dc138
Show file tree
Hide file tree
Showing 13 changed files with 993 additions and 156 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import javax.naming.AuthenticationException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.policies.data.AuthAction;

/**
* The SaslServer implementation for SASL/PLAIN.
Expand Down Expand Up @@ -72,19 +69,15 @@ public byte[] evaluateResponse(byte[] response) throws SaslException {
try {
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(saslAuth.getAuthData().getBytes(StandardCharsets.UTF_8)), null, null);
// TODO: we need to let KafkaRequestHandler do the authorization works. Here we just check the permissions
// of the namespace, which is the namespace. See https://github.com/streamnative/kop/issues/236
final String namespace = saslAuth.getUsername();
Map<String, Set<AuthAction>> permissions = admin.namespaces().getPermissions(namespace);
final String role = authState.getAuthRole();
if (!permissions.containsKey(role)) {
throw new AuthenticationException("Role: " + role + " is not allowed on namespace " + namespace);
if (StringUtils.isEmpty(role)) {
throw new AuthenticationException("Role cannot be empty.");
}

authorizationId = role;
authorizationId = authState.getAuthRole();
complete = true;
return null;
} catch (AuthenticationException | PulsarAdminException e) {
} catch (AuthenticationException e) {
throw new SaslException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.streamnative.pulsar.handlers.kop.security.auth;


import lombok.EqualsAndHashCode;
import lombok.Getter;

/**
* The Authorization resource.
*/
@Getter
@EqualsAndHashCode
public class Resource {

private final ResourceType resourceType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum ResourceType {
* A Pulsar topic.
*/
TOPIC((byte) 2),

;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,36 @@
package io.streamnative.pulsar.handlers.kop.security.auth;


import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.Joiner;
import io.streamnative.pulsar.handlers.kop.security.KafkaPrincipal;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* Simple acl authorizer.
*/
@Slf4j
public class SimpleAclAuthorizer implements Authorizer {

private static final String POLICY_ROOT = "/admin/policies";
private static final String POLICY_ROOT = "/admin/policies/";

private final PulsarService pulsarService;

private final ServiceConfiguration conf;

public SimpleAclAuthorizer(PulsarService pulsarService) {
this.pulsarService = pulsarService;
this.conf = pulsarService.getConfiguration();
}

protected PulsarService getPulsarService() {
Expand All @@ -44,66 +53,158 @@ protected PulsarService getPulsarService() {
private CompletableFuture<Boolean> authorize(KafkaPrincipal principal, AuthAction action, Resource resource) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
TopicName topicName = TopicName.get(resource.getName());
isSuperUser(principal.getName()).whenComplete((isSuper, exception) -> {
NamespaceName namespace = topicName.getNamespaceObject();
if (namespace == null) {
permissionFuture.completeExceptionally(
new IllegalArgumentException("Resource name must contains namespace."));
return permissionFuture;
}
String policiesPath = path(namespace.toString());
String tenantName = namespace.getTenant();
isSuperUserOrTenantAdmin(tenantName, principal.getName()).whenComplete((isSuperUserOrAdmin, exception) -> {
if (exception != null) {
log.error("Check super user error: {}", exception.getMessage());
return;
if (log.isDebugEnabled()) {
log.debug("Verify if role {} is allowed to {} to resource {}: isSuperUserOrAdmin={}",
principal.getName(), action, resource.getName(), isSuperUserOrAdmin);
}
isSuperUserOrAdmin = false;
}
if (isSuper) {
if (isSuperUserOrAdmin) {
permissionFuture.complete(true);
} else {
getPulsarService()
.getPulsarResources()
.getNamespaceResources()
.getAsync(String.format("%s/%s", POLICY_ROOT, topicName.getNamespace()))
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", principal);
}
} else {
String role = principal.getName();
Map<String, Set<AuthAction>> topicRoles = policies.get()
.auth_policies
.getTopicAuthentication()
.get(topicName.toString());
if (topicRoles != null && role != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
permissionFuture.complete(true);
return;
}
}
return;
}
getPulsarService()
.getPulsarResources()
.getNamespaceResources()
.getAsync(policiesPath)
.thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", principal);
}
} else {
String role = principal.getName();

Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies
.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// Check Topic level policies
Map<String, Set<AuthAction>> topicRoles = policies.get()
.auth_policies
.getTopicAuthentication()
.get(topicName.toString());
if (topicRoles != null && role != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
permissionFuture.complete(true);
return;
}
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Principal - {} failed to get permissions for resource - {}. {}",
principal, topicName, ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
}

// Check Namespace level policies
Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies
.getNamespaceAuthentication();
Set<AuthAction> namespaceActions = namespaceRoles.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
permissionFuture.complete(true);
return;
}

// Check wildcard policies
if (conf.isAuthorizationAllowWildcardsMatching()
&& checkWildcardPermission(role, action, namespaceRoles)) {
// The role has namespace level permission by wildcard match
permissionFuture.complete(true);
return;
}
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
if (log.isDebugEnabled()) {
log.debug("Client with Principal - {} failed to get permissions for resource - {}. {}",
principal, resource, ex.getMessage());
}
permissionFuture.completeExceptionally(ex);
return null;
});

});

return permissionFuture;
}

private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAction,
Map<String, Set<AuthAction>> permissionMap) {
for (Map.Entry<String, Set<AuthAction>> permissionData : permissionMap.entrySet()) {
String permittedRole = permissionData.getKey();
Set<AuthAction> permittedActions = permissionData.getValue();

// Prefix match
if (checkedRole != null) {
if (permittedRole.charAt(permittedRole.length() - 1) == '*'
&& checkedRole.startsWith(permittedRole.substring(0, permittedRole.length() - 1))
&& permittedActions.contains(checkedAction)) {
return true;
}

// Suffix match
if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
&& permittedActions.contains(checkedAction)) {
return true;
}
}
}
return false;
}

private CompletableFuture<Boolean> isSuperUser(String role) {
Set<String> superUserRoles = getPulsarService().getConfiguration().getSuperUserRoles();
Set<String> superUserRoles = conf.getSuperUserRoles();
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role));
}

/**
* Check if specified role is an admin of the tenant or superuser.
*
* @param tenant the tenant to check
* @param role the role to check
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
private CompletableFuture<Boolean> isSuperUserOrTenantAdmin(String tenant, String role) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
isSuperUser(role).whenComplete((isSuperUser, ex) -> {
if (ex != null || !isSuperUser) {
pulsarService.getPulsarResources()
.getTenantResources()
.getAsync(path(tenant))
.thenAccept(tenantInfo -> {
if (!tenantInfo.isPresent()) {
future.complete(false);
return;
}
TenantInfo info = tenantInfo.get();
future.complete(role != null
&& info.getAdminRoles() != null
&& info.getAdminRoles().contains(role));
});
return;
}
future.complete(true);
});
return future;
}

private static String path(String... parts) {
StringBuilder sb = new StringBuilder();
sb.append(POLICY_ROOT);
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}


@Override
public CompletableFuture<Boolean> canLookupAsync(KafkaPrincipal principal, Resource resource) {
checkArgument(resource.getResourceType() == ResourceType.TOPIC,
String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType()));

CompletableFuture<Boolean> canLookupFuture = new CompletableFuture<>();
authorize(principal, AuthAction.produce, resource).whenComplete((hasProducePermission, ex) -> {
if (ex != null) {
Expand All @@ -113,7 +214,7 @@ public CompletableFuture<Boolean> canLookupAsync(KafkaPrincipal principal, Resou
+ "check Produce permissions. {}",
resource, principal, ex.getMessage());
}
return;
hasProducePermission = false;
}
if (hasProducePermission) {
canLookupFuture.complete(true);
Expand All @@ -138,11 +239,15 @@ public CompletableFuture<Boolean> canLookupAsync(KafkaPrincipal principal, Resou

@Override
public CompletableFuture<Boolean> canProduceAsync(KafkaPrincipal principal, Resource resource) {
checkArgument(resource.getResourceType() == ResourceType.TOPIC,
String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType()));
return authorize(principal, AuthAction.produce, resource);
}

@Override
public CompletableFuture<Boolean> canConsumeAsync(KafkaPrincipal principal, Resource resource) {
checkArgument(resource.getResourceType() == ResourceType.TOPIC,
String.format("Expected resource type is TOPIC, but have [%s]", resource.getResourceType()));
return authorize(principal, AuthAction.consume, resource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -52,6 +53,7 @@
public class CustomOAuthBearerCallbackHandlerTest extends KopProtocolHandlerTestBase {

private static final String ADMIN_USER = "admin_user";
private static final String USER = "user";

public CustomOAuthBearerCallbackHandlerTest() {
super("kafka");
Expand All @@ -78,6 +80,11 @@ protected void setup() throws Exception {
conf.setKopOauth2AuthenticateCallbackHandler(MockedAuthenticateCallbackHandler.class.getName());

super.internalSetup();
admin.namespaces().grantPermissionOnNamespace(
tenant + "/" + namespace,
USER,
Sets.newHashSet(AuthAction.consume, AuthAction.produce)
);
}

@AfterClass
Expand All @@ -99,21 +106,20 @@ protected void createAdmin() throws Exception {
@Test(timeOut = 10000)
public void testNumAuthenticateSuccess() throws Exception {
final String topic = "testNumAuthenticateSuccess";
final String user = "user";

final Properties props = newKafkaProducerProperties();
final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"
+ " required unsecuredLoginStringClaim_sub=\"%s\";";
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "OAUTHBEARER");
props.setProperty("sasl.jaas.config", String.format(jaasTemplate, user));
props.setProperty("sasl.jaas.config", String.format(jaasTemplate, USER));

@Cleanup
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

assertTrue(MockedAuthenticateCallbackHandler.getAuthenticatedUsers().isEmpty());
producer.send(new ProducerRecord<>(topic, "hello")).get();
assertEquals(MockedAuthenticateCallbackHandler.getAuthenticatedUsers(), Sets.newHashSet(user));
assertEquals(MockedAuthenticateCallbackHandler.getAuthenticatedUsers(), Sets.newHashSet(USER));
}

/**
Expand Down
Loading

0 comments on commit 26dc138

Please sign in to comment.