Skip to content

Commit

Permalink
Enhance Authorization by adding TenantAdmin interface (apache#6487)
Browse files Browse the repository at this point in the history
* Enhance Authorization by adding TenantAdmin interface

* Remove debugging comment

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and huangdx0726 committed Aug 24, 2020
1 parent cd5dfd8 commit 4febe4a
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* Provider of authorization mechanism
Expand All @@ -46,6 +47,18 @@ default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration
return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
}

/**
* Check if specified role is an admin of the tenant
* @param tenant the tenant to check
* @param role the role to check
* @return a CompletableFuture containing a boolean in which true means the role is an admin user
* and false if it is not
*/
default CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
return CompletableFuture.completedFuture(role != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) ? true : false);
}

/**
* Perform initialization for the authorization provider
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +77,14 @@ public CompletableFuture<Boolean> isSuperUser(String user) {
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

public CompletableFuture<Boolean> isTenantAdmin(String tenant, String role, TenantInfo tenantInfo,
AuthenticationDataSource authenticationData) {
if (provider != null) {
return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData);
}
return FutureUtil.failedFuture(new IllegalStateException("No authorization provider configured"));
}

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,7 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant());
validateAdminAccessForTenant(pulsar, clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -224,7 +226,7 @@ protected void validateSuperUserAccess() {
*/
protected void validateAdminAccessForTenant(String tenant) {
try {
validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant);
validateAdminAccessForTenant(pulsar(), clientAppId(), originalPrincipal(), tenant, clientAuthData());
} catch (RestException e) {
throw e;
} catch (Exception e) {
Expand All @@ -234,7 +236,8 @@ protected void validateAdminAccessForTenant(String tenant) {
}

protected static void validateAdminAccessForTenant(PulsarService pulsar, String clientAppId,
String originalPrincipal, String tenant)
String originalPrincipal, String tenant,
AuthenticationDataSource authenticationData)
throws RestException, Exception {
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} -- role: {}", tenant,
Expand All @@ -259,22 +262,17 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), clientAppId, originalPrincipal);

if (pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {

CompletableFuture<Boolean> isProxySuperUserFuture;
CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
try {
isProxySuperUserFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(clientAppId);
AuthorizationService authorizationService = pulsar.getBrokerService().getAuthorizationService();
isProxySuperUserFuture = authorizationService.isSuperUser(clientAppId);

isOriginalPrincipalSuperUserFuture = pulsar.getBrokerService()
.getAuthorizationService()
.isSuperUser(originalPrincipal);
isOriginalPrincipalSuperUserFuture = authorizationService.isSuperUser(originalPrincipal);

Set<String> adminRoles = tenantInfo.getAdminRoles();
boolean proxyAuthorized = isProxySuperUserFuture.get() || adminRoles.contains(clientAppId);
boolean originalPrincipalAuthorized
= isOriginalPrincipalSuperUserFuture.get() || adminRoles.contains(originalPrincipal);
boolean proxyAuthorized = isProxySuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get();
boolean originalPrincipalAuthorized
= isOriginalPrincipalSuperUserFuture.get() || authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo, authenticationData).get();
if (!proxyAuthorized || !originalPrincipalAuthorized) {
throw new RestException(Status.UNAUTHORIZED,
String.format("Proxy not authorized to access resource (proxy:%s,original:%s)",
Expand All @@ -290,7 +288,7 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
.getAuthorizationService()
.isSuperUser(clientAppId)
.join()) {
if (!tenantInfo.getAdminRoles().contains(clientAppId)) {
if (!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, clientAppId, tenantInfo, authenticationData).get()) {
throw new RestException(Status.UNAUTHORIZED,
"Don't have permission to administrate resources on this tenant");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void setup() throws Exception {
doReturn(false).when(namespaces).isRequestHttps();
doReturn("test").when(namespaces).clientAppId();
doReturn(null).when(namespaces).originalPrincipal();
doReturn(null).when(namespaces).clientAuthData();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
Expand Down Expand Up @@ -987,6 +988,7 @@ public void testValidateTopicOwnership() throws Exception {
doReturn(false).when(topics).isRequestHttps();
doReturn("test").when(topics).clientAppId();
doReturn(null).when(topics).originalPrincipal();
doReturn(null).when(topics).clientAuthData();
mockWebUrl(localWebServiceUrl, testNs);
doReturn("persistent").when(topics).domain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -109,13 +111,16 @@ void setup(Method method) throws Exception {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
config.setAuthenticationProviders(providers);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
functionsWorkerService = spy(createPulsarFunctionWorker(config));
AuthenticationService authenticationService = new AuthenticationService(config);
AuthorizationService authorizationService = new AuthorizationService(config, mock(ConfigurationCacheService.class));
when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
when(functionsWorkerService.isInitialized()).thenReturn(true);

PulsarAdmin admin = mock(PulsarAdmin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected static void checkAuthorization(DiscoveryService service, TopicName top
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
if (!tenantInfo.getAdminRoles().contains(role)) {
if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this property");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,10 +1449,10 @@ public boolean isAuthorizedRole(String tenant, String namespace, String clientRo
if (clientRole != null) {
try {
TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
if (tenantInfo != null && tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(clientRole)) {
if (tenantInfo != null && worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, tenantInfo, authenticationData).get()) {
return true;
}
} catch (PulsarAdminException.NotFoundException e) {
} catch (PulsarAdminException.NotFoundException | InterruptedException | ExecutionException e) {

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
Expand Down Expand Up @@ -230,51 +232,56 @@ public void testMetricsEmpty() {
}

@Test
public void testIsAuthorizedRole() throws PulsarAdminException {

public void testIsAuthorizedRole() throws PulsarAdminException, InterruptedException, ExecutionException {

TenantInfo tenantInfo = new TenantInfo();
AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
AuthorizationService authorizationService = mock(AuthorizationService.class);
doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
workerConfig.setSuperUserRoles(Collections.singleton(superUser));
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();

// test super user
AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class);
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource));

// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
Tenants tenants = mock(Tenants.class);
when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
PulsarAdmin admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// if user is tenant admin
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any());
tenants = mock(Tenants.class);
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(Collections.singleton("test-user"));
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);

admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// test user allow function action
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
doReturn(true).when(functionImpl).allowFunctionOps(any(), any(), any());
tenants = mock(Tenants.class);
when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
tenantInfo.setAdminRoles(Collections.emptySet());
when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);

admin = mock(PulsarAdmin.class);
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource));

// test role is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected static void checkAuthorization(ProxyService service, TopicName topicNa
throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s",
topicName.getTenant(), e.getMessage()));
}
if (!tenantInfo.getAdminRoles().contains(role)) {
if (!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()) {
throw new IllegalAccessException("Don't have permission to administrate resources on this tenant");
}
}
Expand Down

0 comments on commit 4febe4a

Please sign in to comment.