diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java index 20769c75c524a..ef31ab44b3460 100644 --- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java +++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java @@ -995,7 +995,7 @@ protected CompletableFuture handleSaslAuthenticate(KafkaHead Map> permissions = getAdmin() .namespaces().getPermissions(saslAuth.getUsername()); if (!permissions.containsKey(authRole)) { - throw new AuthorizationException("Not allowed on this namespace"); + throw new AuthorizationException("Role: " + authRole + " Not allowed on this namespace"); } log.debug("successfully authenticate user " + authRole); diff --git a/src/test/java/io/streamnative/kop/SaslPlainTest.java b/src/test/java/io/streamnative/kop/SaslPlainTest.java index 93649b53e9ff0..2b0eab834071d 100644 --- a/src/test/java/io/streamnative/kop/SaslPlainTest.java +++ b/src/test/java/io/streamnative/kop/SaslPlainTest.java @@ -28,6 +28,7 @@ import java.util.Properties; import javax.crypto.SecretKey; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -46,14 +47,16 @@ * Testing the SASL-PLAIN features on KoP. */ @Test +@Slf4j public class SaslPlainTest extends MockKafkaServiceBaseTest { private static final String SIMPLE_USER = "muggle_user"; + private static final String TENANT = "testTenant"; private static final String ANOTHER_USER = "death_eater_user"; private static final String ADMIN_USER = "admin_user"; private static final String NAMESPACE = "ns1"; private static final String KAFKA_TOPIC = "topic1"; - private static final String PULSAR_TOPIC_NAME = "persistent://" + SIMPLE_USER + private static final String PULSAR_TOPIC_NAME = "persistent://" + TENANT + "/" + NAMESPACE + "/" + KAFKA_TOPIC; private static final String CLUSTER_NAME = "c1"; private String adminToken; @@ -99,12 +102,12 @@ protected void setup() throws Exception { admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) .authentication(AuthenticationToken.class.getName(), "token:" + adminToken).build()); - getAdmin().tenants().createTenant(SIMPLE_USER, + getAdmin().tenants().createTenant(TENANT, new TenantInfo(Sets.newHashSet(ADMIN_USER), Sets.newHashSet(CLUSTER_NAME))); - getAdmin().namespaces().createNamespace(SIMPLE_USER + "/" + NAMESPACE); + getAdmin().namespaces().createNamespace(TENANT + "/" + NAMESPACE); getAdmin().topics().createPartitionedTopic(PULSAR_TOPIC_NAME, 1); getAdmin() - .namespaces().grantPermissionOnNamespace(SIMPLE_USER + "/" + NAMESPACE, SIMPLE_USER, + .namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER, Sets.newHashSet(AuthAction.consume, AuthAction.produce)); } @@ -115,7 +118,7 @@ protected void cleanup() throws Exception { @Test(timeOut = 40000) void simpleProduceAndConsume() throws Exception { KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(), - SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken); + TENANT + "/" + NAMESPACE, "token:" + userToken); int totalMsgs = 10; String messageStrPrefix = PULSAR_TOPIC_NAME + "_message_"; @@ -125,7 +128,7 @@ void simpleProduceAndConsume() throws Exception { } KConsumer kConsumer = new KConsumer(KAFKA_TOPIC, "localhost", getKafkaBrokerPort(), false, - SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer"); + TENANT + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer"); kConsumer.getConsumer().subscribe(Collections.singleton(KAFKA_TOPIC)); int i = 0; @@ -156,7 +159,7 @@ void badCredentialFail() throws Exception { try { @Cleanup KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(), - SIMPLE_USER + "/" + NAMESPACE, "token:dsa"); + TENANT + "/" + NAMESPACE, "token:dsa"); kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get(); fail("should have failed"); } catch (Exception e) { @@ -169,7 +172,7 @@ void badUserFail() throws Exception { try { @Cleanup KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(), - SIMPLE_USER + "/" + NAMESPACE, "token:" + anotherToken); + TENANT + "/" + NAMESPACE, "token:" + anotherToken); kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get(); fail("should have failed"); } catch (Exception e) { @@ -181,7 +184,7 @@ void badUserFail() throws Exception { void badNamespaceProvided() throws Exception { try { KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(), - SIMPLE_USER + "/ns2", "token:" + userToken); + TENANT + "/ns2", "token:" + userToken); kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get(); fail("should have failed"); } catch (Exception e) {