Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
val privilegedClientLoginContext = JaasTestUtils.tokenClientLoginModule(privilegedToken.tokenInfo().tokenId(), privilegedToken.hmacAsBase64String())
privilegedAdminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, privilegedClientLoginContext)
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, privilegedClientLoginContext)
}

@Test
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val producerConfig = new Properties
val consumerConfig = new Properties
val adminClientConfig = new Properties
val superuserClientConfig = new Properties
val serverConfig = new Properties

private val consumers = mutable.Buffer[KafkaConsumer[_, _]]()
Expand Down Expand Up @@ -102,12 +103,22 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
doSetup(testInfo, createOffsetsTopic = true)
}

/*
* The superuser by default is set up the same as the admin.
* Some tests need a separate principal for superuser operations.
* These tests may need to override the config before creating the offset topic.
*/
protected def doSuperuserSetup(testInfo: TestInfo): Unit = {
superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
}

def doSetup(testInfo: TestInfo,
createOffsetsTopic: Boolean): Unit = {
// Generate client security properties before starting the brokers in case certs are needed
producerConfig ++= clientSecurityProps("producer")
consumerConfig ++= clientSecurityProps("consumer")
adminClientConfig ++= clientSecurityProps("adminClient")
superuserClientConfig ++= superuserSecurityProps("superuserClient")

super.setUp(testInfo)

Expand All @@ -124,8 +135,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {

adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())

doSuperuserSetup(testInfo)

if (createOffsetsTopic) {
super.createOffsetsTopic(listenerName, adminClientConfig)
super.createOffsetsTopic(listenerName, superuserClientConfig)
}
}

Expand All @@ -134,6 +147,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
clientSaslProperties)
}

def superuserSecurityProps(certAlias: String): Properties = {
clientSecurityProps(certAlias)
}

def createProducer[K, V](keySerializer: Serializer[K] = new ByteArraySerializer,
valueSerializer: Serializer[V] = new ByteArraySerializer,
configOverrides: Properties = new Properties): KafkaProducer[K, V] = {
Expand Down Expand Up @@ -170,6 +187,18 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
admin
}

def createSuperuserAdminClient(
listenerName: ListenerName = listenerName,
configOverrides: Properties = new Properties
): Admin = {
val props = new Properties
props ++= superuserClientConfig
props ++= configOverrides
val admin = TestUtils.createAdminClient(brokers, listenerName, props)
adminClients += admin
admin
}

@AfterEach
override def tearDown(): Unit = {
producers.foreach(_.close(Duration.ZERO))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth._
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.clients.admin.AdminClientConfig
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.errors.TopicAuthorizationException
Expand Down Expand Up @@ -76,6 +77,15 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
super.setUp(testInfo)
}

/*
* The principal used for all authenticated connections to listenerName is always clientPrincipal.
* The super user runs as kafkaPrincipal so we set the superuser admin client to connect directly to
* the interBrokerListenerName for superuser operations.
*/
override def doSuperuserSetup(testInfo: TestInfo): Unit = {
superuserClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(interBrokerListenerName))
}

@Test
def testListenerName(): Unit = {
// To check the client listener name, establish a session on the server by sending any request eg sendRecords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)

val superuserLoginContext = jaasAdminLoginModule(kafkaClientSaslMechanism)
superuserClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, superuserLoginContext)
super.setUp(testInfo)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
assertNull(producerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertNull(consumerConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertNull(adminClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertNull(superuserClientConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
this.producerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
this.consumerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
this.adminClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
private val plainLogin = s"org.apache.kafka.common.security.plain.PlainLoginModule username=$KafkaPlainUser required;"
this.producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
this.consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
this.adminClientConfig.put(SaslConfigs.SASL_JAAS_CONFIG, plainLogin)
this.superuserClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)

override protected def kafkaClientSaslMechanism = "PLAIN"
override protected def kafkaServerSaslMechanisms = List("PLAIN")
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/integration/kafka/api/SaslSetup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ trait SaslSetup {
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
}

def jaasAdminLoginModule(clientSaslMechanism: String, serviceName: Option[String] = None): String = {
if (serviceName.isDefined)
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile, serviceName.get)
else
JaasTestUtils.adminLoginModule(clientSaslMechanism, serverKeytabFile)
}

def jaasScramClientLoginModule(clientSaslScramMechanism: String, scramUser: String, scramPassword: String): String = {
JaasTestUtils.scramClientLoginModule(clientSaslScramMechanism, scramUser, scramPassword)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.kafka.common.utils.Java
import org.junit.jupiter.api.{BeforeEach, TestInfo}

object SslEndToEndAuthorizationTest {
val superuserCn = "super-user"

class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
private val Pattern = "O=A (.*?),CN=(.*?)".r

Expand All @@ -37,8 +39,8 @@ object SslEndToEndAuthorizationTest {
override def build(context: AuthenticationContext): KafkaPrincipal = {
val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
peerPrincipal match {
case Pattern(name, _) =>
val principal = if (name == "server") name else peerPrincipal
case Pattern(name, cn) =>
val principal = if ((name == "server") || (cn == superuserCn)) "server" else peerPrincipal
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal)
case _ =>
KafkaPrincipal.ANONYMOUS
Expand All @@ -49,7 +51,7 @@ object SslEndToEndAuthorizationTest {

class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {

import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder
import kafka.api.SslEndToEndAuthorizationTest.{TestPrincipalBuilder,superuserCn}

override protected def securityProtocol = SecurityProtocol.SSL
// Since there are other E2E tests that enable SSL, running this test with TLSv1.3 if supported
Expand Down Expand Up @@ -81,4 +83,12 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
// This test doesn't really care about matching the SSL certificate to a particular principal
// We can override the CN and create a principal based on it or on the server SSL
override def superuserSecurityProps(certAlias: String): Properties = {
val props = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile,
certAlias, superuserCn, clientSaslProperties, tlsProtocol)
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
}
5 changes: 5 additions & 0 deletions core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ object JaasTestUtils {
def clientLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String =
kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword, KafkaOAuthBearerUser, serviceName).toString

// Returns the dynamic configuration, using credentials for admin
def adminLoginModule(mechanism: String, keytabLocation: Option[File], serviceName: String = serviceName): String =
kafkaClientModule(mechanism, keytabLocation, KafkaServerPrincipal, KafkaPlainAdmin, KafkaPlainAdminPassword,
KafkaScramAdmin, KafkaScramAdminPassword, KafkaOAuthBearerAdmin, serviceName).toString

def tokenClientLoginModule(tokenId: String, password: String): String = {
ScramLoginModule(
tokenId,
Expand Down