diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 0d1f79a09dc14..ff32e41977aaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,7 +63,7 @@ public class TenantsBase extends PulsarWebResource { @ApiResponse(code = 404, message = "Tenant doesn't exist")}) public void getTenants(@Suspended final AsyncResponse asyncResponse) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS) .thenCompose(__ -> tenantResources().listTenantsAsync()) .thenAccept(tenants -> { // deep copy the tenants to avoid concurrent sort exception @@ -84,7 +85,7 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) { public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.GET_TENANT) .thenCompose(__ -> tenantResources().getTenantAsync(tenant)) .thenApply(tenantInfo -> { if (!tenantInfo.isPresent()) { @@ -121,7 +122,7 @@ public void createTenant(@Suspended final AsyncResponse asyncResponse, asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid")); return; } - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.CREATE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> validateClustersAsync(tenantInfo)) .thenCompose(__ -> validateAdminRoleAsync(tenantInfo)) @@ -169,7 +170,7 @@ public void updateTenant(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant, @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.UPDATE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> validateClustersAsync(newTenantAdmin)) .thenCompose(__ -> validateAdminRoleAsync(newTenantAdmin)) @@ -206,7 +207,7 @@ public void deleteTenant(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant, @QueryParam("force") @DefaultValue("false") boolean force) { final String clientAppId = clientAppId(); - validateSuperUserAccessAsync() + validateBothSuperUserAndTenantOperation(tenant, TenantOperation.DELETE_TENANT) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> internalDeleteTenant(tenant, force)) .thenAccept(__ -> { @@ -304,4 +305,41 @@ private CompletableFuture validateAdminRoleAsync(TenantInfoImpl info) { } return CompletableFuture.completedFuture(null); } + + private CompletableFuture validateBothSuperUserAndTenantOperation(String tenant, + TenantOperation operation) { + final var superUserValidationFuture = validateSuperUserAccessAsync(); + final var tenantOperationValidationFuture = validateTenantOperationAsync(tenant, operation); + return CompletableFuture.allOf(superUserValidationFuture, tenantOperationValidationFuture) + .handle((__, err) -> { + if (!superUserValidationFuture.isCompletedExceptionally() + || !tenantOperationValidationFuture.isCompletedExceptionally()) { + return true; + } + if (log.isDebugEnabled()) { + Throwable superUserValidationException = null; + try { + superUserValidationFuture.join(); + } catch (Throwable ex) { + superUserValidationException = FutureUtil.unwrapCompletionException(ex); + } + Throwable brokerOperationValidationException = null; + try { + tenantOperationValidationFuture.join(); + } catch (Throwable ex) { + brokerOperationValidationException = FutureUtil.unwrapCompletionException(ex); + } + log.debug("validateBothTenantOperationAndSuperUser failed." + + " originalPrincipal={} clientAppId={} operation={} " + + "superuserValidationError={} tenantOperationValidationError={}", + originalPrincipal(), clientAppId(), operation.toString(), + superUserValidationException, brokerOperationValidationException); + } + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBothTenantOperationAndSuperUser for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] ", + originalPrincipal(), clientAppId(), operation.toString())); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java new file mode 100644 index 0000000000000..2cf3ea374c33c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TenantEndpointsAuthorizationTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.Set; +import java.util.UUID; + +@Test(groups = "broker-admin") +public class TenantEndpointsAuthorizationTest extends MockedPulsarStandalone { + + private AuthorizationService orignalAuthorizationService; + private AuthorizationService spyAuthorizationService; + + private PulsarAdmin superUserAdmin; + private PulsarAdmin nobodyAdmin; + + @SneakyThrows + @BeforeClass(alwaysRun = true) + public void setup() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + this.nobodyAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(NOBODY_TOKEN)) + .build(); + } + + @BeforeMethod(alwaysRun = true) + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + spyAuthorizationService = spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + spyAuthorizationService, true); + } + + @AfterMethod(alwaysRun = true) + public void after() throws IllegalAccessException { + if (orignalAuthorizationService != null) { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", orignalAuthorizationService, true); + } + } + + @SneakyThrows + @AfterClass(alwaysRun = true) + public void cleanup() { + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + spyAuthorizationService = null; + orignalAuthorizationService = null; + super.close(); + } + + @Test + public void testListTenants() throws PulsarAdminException { + superUserAdmin.tenants().getTenants(); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(isNull(), Mockito.eq(TenantOperation.LIST_TENANTS), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().getTenants()); + } + + + @Test + public void testGetTenant() throws PulsarAdminException { + String tenantName = "public"; + superUserAdmin.tenants().getTenantInfo(tenantName); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.GET_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().getTenantInfo(tenantName)); + } + + @Test + public void testUpdateTenant() throws PulsarAdminException { + String tenantName = "public"; + superUserAdmin.tenants().updateTenant(tenantName, TenantInfo.builder() + .allowedClusters(Set.of(getPulsarService().getConfiguration().getClusterName())) + .adminRoles(Set.of("example")).build()); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.UPDATE_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants() + .updateTenant(tenantName, TenantInfo.builder().adminRoles(Set.of("example")).build())); + } + + @Test + public void testDeleteTenant() throws PulsarAdminException { + String tenantName = UUID.randomUUID().toString(); + superUserAdmin.tenants().createTenant(tenantName, TenantInfo.builder() + .allowedClusters(Set.of(getPulsarService().getConfiguration().getClusterName())) + .adminRoles(Set.of("example")).build()); + + Mockito.clearInvocations(spyAuthorizationService); + superUserAdmin.tenants().deleteTenant(tenantName); + // test allow broker operation + verify(spyAuthorizationService) + .allowTenantOperationAsync(eq(tenantName), Mockito.eq(TenantOperation.DELETE_TENANT), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, () -> nobodyAdmin.tenants().deleteTenant(tenantName)); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java index 1c52f69006403..e0518e510f9dc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TenantOperation.java @@ -25,4 +25,10 @@ public enum TenantOperation { CREATE_NAMESPACE, DELETE_NAMESPACE, LIST_NAMESPACES, + + LIST_TENANTS, + GET_TENANT, + CREATE_TENANT, + UPDATE_TENANT, + DELETE_TENANT, }