diff --git a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala index 2d63e77b119a5..40ad090b46ea0 100644 --- a/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala @@ -90,6 +90,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) val privilegedClientLoginContext = JaasTestUtils.tokenClientLoginModule(privilegedToken.tokenInfo().tokenId(), privilegedToken.hmacAsBase64String()) privilegedAdminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, privilegedClientLoginContext) + superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, privilegedClientLoginContext) } @Test diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 1b3da8584441e..ad4b66e20a3f2 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -21,7 +21,6 @@ import com.yammer.metrics.core.Gauge import java.util.{Collections, Properties} import java.util.concurrent.ExecutionException -import kafka.admin.AclCommand import kafka.security.authorizer.AclAuthorizer import kafka.security.authorizer.AclEntry.WildcardHost import kafka.server._ @@ -40,7 +39,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -64,15 +63,10 @@ import scala.jdk.CollectionConverters._ * SaslTestHarness here directly because it extends QuorumTestHarness, and we * would end up with QuorumTestHarness twice. */ +@Timeout(60) abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { override val brokerCount = 3 - override def configureSecurityBeforeServersStart(): Unit = { - AclCommand.main(clusterActionArgs) - AclCommand.main(clusterAlterArgs) - AclCommand.main(topicBrokerReadAclArgs) - } - val numRecords = 1 val groupPrefix = "gr" val group = s"${groupPrefix}oup" @@ -96,96 +90,45 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def clientPrincipal: KafkaPrincipal def kafkaPrincipal: KafkaPrincipal - // Arguments to AclCommand to set ACLs. - def clusterActionArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipal") - // necessary to create SCRAM credentials via the admin client using the broker's credentials - // without this we would need to create the SCRAM credentials via ZooKeeper - def clusterAlterArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Alter", - s"--allow-principal=$kafkaPrincipal") - def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipal") - def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--producer", - s"--allow-principal=$clientPrincipal") - def describeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$clientPrincipal") - def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--group=$group", - s"--operation=Read", - s"--allow-principal=$clientPrincipal") - def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topicPrefix", - s"--group=$groupPrefix", - s"--resource-pattern-type=prefixed", - s"--consumer", - s"--producer", - s"--allow-principal=$clientPrincipal") - - def ClusterActionAndClusterAlterAcls = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW), - new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, ALTER, ALLOW)) - def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW)) def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW)) def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW)) def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW)) def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW)) - // The next two configuration parameters enable ZooKeeper secure ACLs - // and sets the Kafka authorizer, both necessary to enable security. - this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") - this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) + + def AclTopicWrite(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclTopicCreate(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclTopicDescribe(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclTopicRead(topicResource : ResourcePattern = topicResource) = new AclBinding(topicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclGroupRead = new AclBinding(groupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + + def AclWildcardTopicWrite = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclWildcardTopicCreate = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclWildcardTopicDescribe = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclWildcardTopicRead = new AclBinding(wildcardTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclWildcardGroupRead = new AclBinding(wildcardGroupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + + def AclPrefixedTopicWrite = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.WRITE, AclPermissionType.ALLOW)) + def AclPrefixedTopicCreate = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.CREATE, AclPermissionType.ALLOW)) + def AclPrefixedTopicDescribe = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + def AclPrefixedTopicRead = new AclBinding(prefixedTopicResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + def AclPrefixedGroupRead = new AclBinding(prefixedGroupResource, + new AccessControlEntry(clientPrincipal.toString, "*", AclOperation.READ, AclPermissionType.ALLOW)) + // Some needed configuration for brokers, producers, and consumers this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") @@ -200,11 +143,17 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + + // The next two configuration parameters enable ZooKeeper secure ACLs + // and sets the Kafka authorizer, both necessary to enable security. + this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") + this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) + + // Set the specific principal that can update ACLs. + this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) + super.setUp(testInfo) - servers.foreach { s => - TestUtils.waitAndVerifyAcls(ClusterActionAndClusterAlterAcls, s.dataPlaneRequestProcessor.authorizer.get, clusterResource) - TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, "*", LITERAL)) - } + // create the test topic with all the brokers as replicas createTopic(topic, 1, 3) } @@ -292,15 +241,21 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def setWildcardResourceAcls(): Unit = { - AclCommand.main(produceConsumeWildcardAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclWildcardTopicWrite, AclWildcardTopicCreate, AclWildcardTopicDescribe, AclWildcardTopicRead).asJava).values + superuserAdminClient.createAcls(List(AclWildcardGroupRead).asJava).values + servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource) } } private def setPrefixedResourceAcls(): Unit = { - AclCommand.main(produceConsumePrefixedAclsArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclPrefixedTopicWrite, AclPrefixedTopicCreate, AclPrefixedTopicDescribe, AclPrefixedTopicRead).asJava).values + superuserAdminClient.createAcls(List(AclPrefixedGroupRead).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource) @@ -308,8 +263,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def setReadAndWriteAcls(tp: TopicPartition): Unit = { - AclCommand.main(produceAclArgs(tp.topic)) - AclCommand.main(consumeAclArgs(tp.topic)) + val topicResource = new ResourcePattern(TOPIC, tp.topic, LITERAL) + val superuserAdminClient = createSuperuserAdminClient() + + superuserAdminClient.createAcls(List(AclTopicWrite(topicResource), AclTopicCreate(topicResource), AclTopicDescribe(topicResource)).asJava).values + superuserAdminClient.createAcls(List(AclTopicRead(topicResource)).asJava).values + superuserAdminClient.createAcls(List(AclGroupRead).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, new ResourcePattern(TOPIC, tp.topic, LITERAL)) @@ -324,7 +284,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def setConsumerGroupAcls(): Unit = { - AclCommand.main(groupAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclGroupRead).asJava).values servers.foreach { s => TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } @@ -405,7 +366,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testNoProduceWithDescribeAcl(isIdempotenceEnabled: Boolean): Unit = { - AclCommand.main(describeAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclTopicDescribe()).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) } @@ -457,8 +420,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithoutDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs(tp.topic)) - AclCommand.main(groupAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values + superuserAdminClient.createAcls(List(AclGroupRead).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) @@ -467,8 +432,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val producer = createProducer() sendRecords(producer, numRecords, tp) - AclCommand.main(deleteDescribeAclArgs) - AclCommand.main(deleteWriteAclArgs) + superuserAdminClient.deleteAcls(List(AclTopicDescribe().toFilter).asJava).values + superuserAdminClient.deleteAcls(List(AclTopicWrite().toFilter).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } @@ -497,8 +463,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs(tp.topic)) - AclCommand.main(groupAclArgs) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values + superuserAdminClient.createAcls(List(AclGroupRead).asJava).values + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) @@ -513,7 +481,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testNoGroupAcl(): Unit = { - AclCommand.main(produceAclArgs(tp.topic)) + val superuserAdminClient = createSuperuserAdminClient() + superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) } diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 8c85956ea04b8..7ccce46665c46 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -45,6 +45,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val producerConfig = new Properties val consumerConfig = new Properties val adminClientConfig = new Properties + val superuserClientConfig = new Properties val serverConfig = new Properties private val consumers = mutable.Buffer[KafkaConsumer[_, _]]() @@ -102,12 +103,22 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { doSetup(testInfo, createOffsetsTopic = true) } + /* + * The superuser by default is set up the same as the admin. + * Some tests need a separate principal for superuser operations. + * These tests may need to override the config before creating the offset topic. + */ + protected def doSuperuserSetup(testInfo: TestInfo): Unit = { + superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + } + def doSetup(testInfo: TestInfo, createOffsetsTopic: Boolean): Unit = { // Generate client security properties before starting the brokers in case certs are needed producerConfig ++= clientSecurityProps("producer") consumerConfig ++= clientSecurityProps("consumer") adminClientConfig ++= clientSecurityProps("adminClient") + superuserClientConfig ++= superuserSecurityProps("superuserClient") super.setUp(testInfo) @@ -124,8 +135,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + doSuperuserSetup(testInfo) + if (createOffsetsTopic) { - super.createOffsetsTopic(listenerName, adminClientConfig) + super.createOffsetsTopic(listenerName, superuserClientConfig) } } @@ -134,6 +147,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { clientSaslProperties) } + def superuserSecurityProps(certAlias: String): Properties = { + clientSecurityProps(certAlias) + } + def createProducer[K, V](keySerializer: Serializer[K] = new ByteArraySerializer, valueSerializer: Serializer[V] = new ByteArraySerializer, configOverrides: Properties = new Properties): KafkaProducer[K, V] = { @@ -170,6 +187,18 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { admin } + def createSuperuserAdminClient( + listenerName: ListenerName = listenerName, + configOverrides: Properties = new Properties + ): Admin = { + val props = new Properties + props ++= superuserClientConfig + props ++= configOverrides + val admin = TestUtils.createAdminClient(brokers, listenerName, props) + adminClients += admin + admin + } + @AfterEach override def tearDown(): Unit = { producers.foreach(_.close(Duration.ZERO)) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala index 8e69a2de3152b..19fea48d6f087 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth._ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder +import org.apache.kafka.clients.admin.AdminClientConfig import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} import org.junit.jupiter.api.Assertions._ import org.apache.kafka.common.errors.TopicAuthorizationException @@ -76,6 +77,15 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest { super.setUp(testInfo) } + /* + * The principal used for all authenticated connections to listenerName is always clientPrincipal. + * The super user runs as kafkaPrincipal so we set the superuser admin client to connect directly to + * the interBrokerListenerName for superuser operations. + */ + override def doSuperuserSetup(testInfo: TestInfo): Unit = { + superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(interBrokerListenerName)) + } + @Test def testListenerName(): Unit = { // To check the client listener name, establish a session on the server by sending any request eg sendRecords diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index e4064503eeb5c..cc67d01546a18 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -43,6 +43,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) + + val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism) + superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext) super.setUp(testInfo) } diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala index 17e39f608a778..5311f4f63557a 100644 --- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala @@ -42,5 +42,6 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe assertNull(producerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertNull(consumerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertNull(adminClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + assertNull(superuserClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) } diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index b8cb83d133bf3..4d0936dcdd897 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -117,10 +117,7 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes this.producerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName) this.consumerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName) this.adminClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName) - private val plainLogin = s"org.apache.kafka.common.security.plain.PlainLoginModule username=$KafkaPlainUser required;" - this.producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin) - this.consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin) - this.adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin) + this.superuserClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName) override protected def kafkaClientSaslMechanism = "PLAIN" override protected def kafkaServerSaslMechanisms = List("PLAIN") diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 01bb6d8150488..3648e596b0b32 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -154,6 +154,13 @@ trait SaslSetup { JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) } + def jaasAdminLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = { + if (serviceName.isDefined) + JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile, serviceName.get) + else + JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile) + } + def jaasScramClientLoginModule(clientSaslScramMechanism: String, scramUser: String, scramPassword: String): String = { JaasTestUtils.scramClientLoginModule(clientSaslScramMechanism, scramUser, scramPassword) } diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 850d99f8d64e6..272b0cfb3c0b2 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -29,6 +29,8 @@ import org.apache.kafka.common.utils.Java import org.junit.jupiter.api.{BeforeEach, TestInfo} object SslEndToEndAuthorizationTest { + val superuserCn = "super-user" + class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) { private val Pattern = "O=A (.*?),CN=(.*?)".r @@ -37,8 +39,8 @@ object SslEndToEndAuthorizationTest { override def build(context: AuthenticationContext): KafkaPrincipal = { val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName peerPrincipal match { - case Pattern(name, _) => - val principal = if (name == "server") name else peerPrincipal + case Pattern(name, cn) => + val principal = if ((name == "server") || (cn == superuserCn)) "server" else peerPrincipal new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal) case _ => KafkaPrincipal.ANONYMOUS @@ -49,7 +51,7 @@ object SslEndToEndAuthorizationTest { class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { - import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder + import kafka.api.SslEndToEndAuthorizationTest.{TestPrincipalBuilder,superuserCn} override protected def securityProtocol = SecurityProtocol.SSL // Since there are other E2E tests that enable SSL, running this test with TLSv1.3 if supported @@ -81,4 +83,12 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) props } + // This test doesn't really care about matching the SSL certificate to a particular principal + // We can override the CN and create a principal based on it or on the server SSL + override def superuserSecurityProps(certAlias: String): Properties = { + val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, + certAlias, superuserCn, clientSaslProperties, tlsProtocol) + props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG) + props + } } diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 3d257bae8e9cb..c0d9f7e493422 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -184,6 +184,11 @@ object JaasTestUtils { def clientLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String = kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, KafkaOAuthBearerUser, serviceName).toString + // Returns the dynamic configuration, using credentials for admin + def adminLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String = + kafkaClientModule(mechanism, keytabLocation, KafkaServerPrincipal, KafkaPlainAdmin, KafkaPlainAdminPassword, + KafkaScramAdmin, KafkaScramAdminPassword, KafkaOAuthBearerAdmin, serviceName).toString + def tokenClientLoginModule(tokenId: String, password: String): String = { ScramLoginModule( tokenId,