Skip to content

Commit

Permalink
[improve][admin] Add authorization test for schema and align auth for…
Browse files Browse the repository at this point in the history
… transaction (apache#22399)
  • Loading branch information
liangyepianzhou authored Apr 9, 2024
1 parent 6de711d commit 9555504
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@
package org.apache.pulsar.broker.admin;

import io.jsonwebtoken.Jwts;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -62,7 +70,9 @@ public class TopicAuthZTest extends MockedPulsarStandalone {
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
enableTransaction();
start();
createTransactionCoordinatorAssign(16);
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
Expand All @@ -74,8 +84,18 @@ public void before() {
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();

superUserAdmin.tenants().createTenant("pulsar", tenantInfo);
superUserAdmin.namespaces().createNamespace("pulsar/system");
}

protected void createTransactionCoordinatorAssign(int numPartitionsOfTC) throws MetadataStoreException {
getPulsarService().getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(numPartitionsOfTC));
}

@SneakyThrows
@AfterClass(alwaysRun = true)
Expand Down Expand Up @@ -1086,6 +1106,235 @@ public void testExpireMessageByPosition() {
deleteTopic(topic, false);
}

public enum OperationAuthType {
Lookup,
Produce,
Consume,
AdminOrSuperUser,
NOAuth
}

private final String testTopic = "persistent://public/default/" + UUID.randomUUID().toString();
@FunctionalInterface
public interface ThrowingBiConsumer<T> {
void accept(T t) throws PulsarAdminException;
}

@DataProvider(name = "authFunction")
public Object[][] authFunction () throws Exception {
String sub = "my-sub";
createTopic(testTopic, false);
@Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarService().getBrokerServiceUrl())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.enableTransaction(true)
.build();
@Cleanup final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(testTopic).create();

@Cleanup final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(testTopic)
.subscriptionName(sub)
.subscribe();

Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().value("test message").send();

consumer.acknowledgeAsync(messageId, transaction).get();

return new Object[][]{
// SCHEMA
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo(testTopic),
OperationAuthType.Lookup
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getSchemaInfo(
testTopic, 0),
OperationAuthType.Lookup
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().getAllSchemas(
testTopic),
OperationAuthType.Lookup
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().createSchema(testTopic,
SchemaInfo.builder().type(SchemaType.STRING).build()),
OperationAuthType.Produce
},
// TODO: improve the authorization check for testCompatibility and deleteSchema
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().testCompatibility(
testTopic, SchemaInfo.builder().type(SchemaType.STRING).build()),
OperationAuthType.AdminOrSuperUser
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.schemas().deleteSchema(
testTopic),
OperationAuthType.AdminOrSuperUser
},

// TRANSACTION

// Modify transaction coordinator
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.abortTransaction(transaction.getTxnID()),
OperationAuthType.AdminOrSuperUser
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.scaleTransactionCoordinators(17),
OperationAuthType.AdminOrSuperUser
},
// TODO: fix authorization check of check transaction coordinator stats.
// Check transaction coordinator stats
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getCoordinatorInternalStats(1, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getCoordinatorStats(),
OperationAuthType.AdminOrSuperUser
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getSlowTransactionsByCoordinatorId(1, 5, TimeUnit.SECONDS),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionMetadata(transaction.getTxnID()),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.listTransactionCoordinators(),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getSlowTransactions(5, TimeUnit.SECONDS),
OperationAuthType.AdminOrSuperUser
},

// TODO: Check the authorization of the topic when get stats of TB or TP
// Check stats related to transaction buffer and transaction pending ack
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getPendingAckInternalStats(testTopic, sub, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getPendingAckStats(testTopic, sub, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getPositionStatsInPendingAck(testTopic, sub, messageId.getLedgerId(),
messageId.getEntryId(), null),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionBufferInternalStats(testTopic, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionBufferStats(testTopic, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionBufferStats(testTopic, false),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionInBufferStats(transaction.getTxnID(), testTopic),
OperationAuthType.NOAuth
},
new Object[] {
(ThrowingBiConsumer<PulsarAdmin>) (admin) -> admin.transactions()
.getTransactionInPendingAckStats(transaction.getTxnID(), testTopic, sub),
OperationAuthType.NOAuth
},
};
}

@Test(dataProvider = "authFunction")
public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer<PulsarAdmin> adminConsumer, OperationAuthType topicOpType)
throws Exception {
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// test tenant manager
if (topicOpType != OperationAuthType.AdminOrSuperUser) {
adminConsumer.accept(tenantManagerAdmin);
}

if (topicOpType != OperationAuthType.NOAuth) {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> adminConsumer.accept(subAdmin));
}

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(testTopic, subject, Set.of(action));

if (authActionMatchOperation(topicOpType, action)) {
adminConsumer.accept(subAdmin);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> adminConsumer.accept(subAdmin));
}
superUserAdmin.topics().revokePermissions(testTopic, subject);
}
}


private boolean authActionMatchOperation(OperationAuthType operationAuthType, AuthAction action) {
switch (operationAuthType) {
case Lookup -> {
if (AuthAction.consume == action || AuthAction.produce == action) {
return true;
}
}
case Consume -> {
if (AuthAction.consume == action) {
return true;
}
}
case Produce -> {
if (AuthAction.produce == action) {
return true;
}
}
case AdminOrSuperUser -> {
return false;
}
case NOAuth -> {
return true;
}
}
return false;
}

private void createTopic(String topic, boolean partitioned) throws Exception {
if (partitioned) {
superUserAdmin.topics().createPartitionedTopic(topic, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ protected void configureTokenAuthentication() {

}


protected void enableTransaction() {
serviceConfiguration.setTransactionCoordinatorEnabled(true);
}

protected void configureDefaultAuthorization() {
serviceConfiguration.setAuthorizationEnabled(true);
Expand Down

0 comments on commit 9555504

Please sign in to comment.