From 97a91b4988ff77f93f9f74abd0bae00d960713b4 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 5 Aug 2022 12:04:13 +0800 Subject: [PATCH] [improve][test] Verify the authentication data in the authorization provider (#16945) Signed-off-by: Zixuan Liu --- .../auth/AuthorizationWithAuthDataTest.java | 291 ++++++++++++++++++ .../auth/MockedPulsarServiceBaseTest.java | 36 ++- 2 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java new file mode 100644 index 0000000000000..0f90aff459723 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import javax.crypto.SecretKey; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.authorization.AuthorizationProvider; +import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Sets; + +@Test(groups = "broker") +public class AuthorizationWithAuthDataTest extends MockedPulsarServiceBaseTest { + + private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private static final String ADMIN_ROLE = "admin"; + private static final String ADMIN_TOKEN = Jwts.builder().setSubject(ADMIN_ROLE).signWith(SECRET_KEY).compact(); + + public static class MyAuthorizationProvider implements AuthorizationProvider { + + public MyAuthorizationProvider() { + } + + private void assertRoleAndAuthenticationData(String role, AuthenticationDataSource authenticationData) { + assertEquals(role, ADMIN_ROLE); + if (authenticationData.hasDataFromHttp()) { + String authorization = authenticationData.getHttpHeader("Authorization"); + assertEquals(authorization, "Bearer " + ADMIN_TOKEN); + } else { + assertEquals(authenticationData.getCommandData(), ADMIN_TOKEN); + } + } + + @Override + public CompletableFuture isSuperUser(String role, AuthenticationDataSource authenticationData, + ServiceConfiguration serviceConfiguration) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException { + // noop + } + + @Override + public CompletableFuture canProduceAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture canConsumeAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData, + String subscription) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture canLookupAsync(TopicName topicName, String role, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowFunctionOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowSourceOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowSinkOpsAsync(NamespaceName namespaceName, String role, + AuthenticationDataSource authenticationData) { + assertRoleAndAuthenticationData(role, authenticationData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture grantPermissionAsync(NamespaceName namespace, Set actions, + String role, String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture grantSubscriptionPermissionAsync(NamespaceName namespace, + String subscriptionName, Set roles, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture revokeSubscriptionPermissionAsync(NamespaceName namespace, + String subscriptionName, String role, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture grantPermissionAsync(TopicName topicName, Set actions, String role, + String authDataJson) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture allowTenantOperationAsync(String tenantName, String role, + TenantOperation operation, + AuthenticationDataSource authData) { + assertRoleAndAuthenticationData(role, authData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, String role, + NamespaceOperation operation, + AuthenticationDataSource authData) { + assertRoleAndAuthenticationData(role, authData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, + PolicyName policy, + PolicyOperation operation, String role, + AuthenticationDataSource authData) { + assertRoleAndAuthenticationData(role, authData); + return CompletableFuture.completedFuture(true); + } + + @Override + public CompletableFuture allowTopicOperationAsync(TopicName topic, String role, + TopicOperation operation, + AuthenticationDataSource authData) { + assertRoleAndAuthenticationData(role, authData); + return CompletableFuture.completedFuture(true); + } + + @Override + public void close() throws IOException { + // noop + } + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setAuthenticationEnabled(true); + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + Set superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + + conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName()); + conf.setBrokerClientAuthenticationParameters(ADMIN_TOKEN); + + conf.setAuthorizationEnabled(true); + conf.setAuthorizationProvider(MyAuthorizationProvider.class.getName()); + } + + @SneakyThrows + @Override + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + super.customizeNewPulsarAdminBuilder(pulsarAdminBuilder); + pulsarAdminBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN); + } + + @SneakyThrows + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + super.customizeNewPulsarClientBuilder(clientBuilder); + clientBuilder.authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN); + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + internalSetup(); + setupDefaultTenantAndNamespace(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test + public void testAdmin() throws PulsarAdminException { + TenantInfo tenantInfo = new TenantInfo(); + tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName)); + admin.tenants().createTenant("test-tenant-1", tenantInfo); + admin.namespaces().createNamespace("test-tenant-1/test-namespace-1"); + String partitionedTopic = UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(partitionedTopic,3); + String nonPartitionedTopic = UUID.randomUUID().toString(); + admin.topics().createNonPartitionedTopic(nonPartitionedTopic); + admin.lookups().lookupPartitionedTopic(partitionedTopic); + admin.lookups().lookupTopic(nonPartitionedTopic); + } + + @Test + public void testClient() throws PulsarClientException { + String topic = UUID.randomUUID().toString(); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).create(); + byte[] msg = "Hello".getBytes(StandardCharsets.UTF_8); + producer.send(msg); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test").subscribe(); + Message receive = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals(receive.getData(), msg); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index ed2753db76349..b0e769ded9cd0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -54,6 +54,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.ClusterData; @@ -122,8 +123,15 @@ protected final void internalSetup(boolean isPreciseDispatcherFlowControl) throw pulsarClient = newPulsarClient(lookupUrl.toString(), 0); } + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + + } + protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { - return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + ClientBuilder clientBuilder = + PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS); + customizeNewPulsarClientBuilder(clientBuilder); + return clientBuilder.build(); } protected final void internalSetupForStatsTest() throws Exception { @@ -256,9 +264,14 @@ protected void startBroker() throws Exception { PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()); + customizeNewPulsarAdminBuilder(pulsarAdminBuilder); admin = spy(pulsarAdminBuilder.build()); } + protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) { + + } + protected PulsarService startBroker(ServiceConfiguration conf) throws Exception { boolean isAuthorizationEnabled = conf.isAuthorizationEnabled(); @@ -423,5 +436,26 @@ protected static ServiceConfiguration getDefaultConf() { return configuration; } + protected void setupDefaultTenantAndNamespace() throws Exception { + final String tenant = "public"; + final String namespace = tenant + "/default"; + + if (!admin.clusters().getClusters().contains(configClusterName)) { + ClusterData clusterData = new ClusterData(); + clusterData.setServiceUrl(pulsar.getWebServiceAddress()); + admin.clusters().createCluster(configClusterName, clusterData); + } + + if (!admin.tenants().getTenants().contains(tenant)) { + TenantInfo tenantInfo = new TenantInfo(); + tenantInfo.setAllowedClusters(Sets.newHashSet(configClusterName)); + admin.tenants().createTenant(tenant, tenantInfo); + } + + if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) { + admin.namespaces().createNamespace(namespace); + } + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); }