diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 73d92103f6a38..c31d3e5383d1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -175,7 +175,7 @@ public void reconfigure(Map configs) throws KafkaException { SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore; SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore; this.sslContext = createSSLContext(keystore, truststore); - log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore); + log.info("Created new {} SSL context with keystore {} truststore {}", mode, keystore, truststore); this.keystore = keystore; this.truststore = truststore; } catch (Exception e) { diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 58d900dba5728..250ebc0da08ec 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; @@ -464,4 +465,15 @@ public static void assertOptional(Optional optional, Consumer assertio fail("Missing value from Optional"); } } + + @SuppressWarnings("unchecked") + public static T fieldValue(Object o, Class clazz, String fieldName) { + try { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(o); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ca6c00a7f55e3..bba483db7e42b 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.common.{KafkaException, Node, TopicPartition} +import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap @@ -111,7 +111,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) val brokerNode = broker.node(controllerToBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") - val networkClient = { + val (networkClient, reconfigurableChannelBuilder) = { val channelBuilder = ChannelBuilders.clientChannelBuilder( controllerToBrokerSecurityProtocol, JaasContext.Type.SERVER, @@ -121,6 +121,12 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf time, config.saslInterBrokerHandshakeRequestEnable ) + val reconfigurableChannelBuilder = channelBuilder match { + case reconfigurable: Reconfigurable => + config.addReconfigurable(reconfigurable) + Some(reconfigurable) + case _ => None + } val selector = new Selector( NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, @@ -132,7 +138,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf channelBuilder, logContext ) - new NetworkClient( + val networkClient = new NetworkClient( selector, new ManualMetadataUpdater(Seq(brokerNode).asJava), config.brokerId.toString, @@ -148,6 +154,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf new ApiVersions, logContext ) + (networkClient, reconfigurableChannelBuilder) } val threadName = threadNamePrefix match { case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread" @@ -171,7 +178,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf ) brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, - requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics)) + requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder)) } private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString) @@ -182,6 +189,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf // non-threadsafe classes as described in KAFKA-4959. // The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization barrier that // hands off the NetworkClient from the RequestSendThread to the ZkEventThread. + brokerState.reconfigurableChannelBuilder.foreach(config.removeReconfigurable) brokerState.requestSendThread.shutdown() brokerState.networkClient.close() brokerState.messageQueue.clear() @@ -531,7 +539,8 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient, messageQueue: BlockingQueue[QueueItem], requestSendThread: RequestSendThread, queueSizeGauge: Gauge[Int], - requestRateAndTimeMetrics: Timer) + requestRateAndTimeMetrics: Timer, + reconfigurableChannelBuilder: Option[Reconfigurable]) case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index f49fa7b2a34d9..436ea2ea115a2 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.clients._ -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network._ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest} @@ -54,6 +54,10 @@ object TransactionMarkerChannelManager { time, config.saslInterBrokerHandshakeRequestEnable ) + channelBuilder match { + case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable) + case _ => + } val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala index 924111c4a862f..8e631fec00f7a 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.requests.AbstractRequest import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient} -import org.apache.kafka.common.Node +import org.apache.kafka.common.{Node, Reconfigurable} import org.apache.kafka.common.requests.AbstractRequest.Builder import scala.collection.JavaConverters._ @@ -51,7 +51,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port) private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs - private val networkClient = { + private val (networkClient, reconfigurableChannelBuilder) = { val channelBuilder = ChannelBuilders.clientChannelBuilder( brokerConfig.interBrokerSecurityProtocol, JaasContext.Type.SERVER, @@ -61,6 +61,12 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, time, brokerConfig.saslInterBrokerHandshakeRequestEnable ) + val reconfigurableChannelBuilder = channelBuilder match { + case reconfigurable: Reconfigurable => + brokerConfig.addReconfigurable(reconfigurable) + Some(reconfigurable) + case _ => None + } val selector = new Selector( NetworkReceive.UNLIMITED, brokerConfig.connectionsMaxIdleMs, @@ -72,7 +78,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, channelBuilder, logContext ) - new NetworkClient( + val networkClient = new NetworkClient( selector, new ManualMetadataUpdater(), clientId, @@ -88,6 +94,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, new ApiVersions, logContext ) + (networkClient, reconfigurableChannelBuilder) } override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { @@ -108,6 +115,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, } override def initiateClose(): Unit = { + reconfigurableChannelBuilder.foreach(brokerConfig.removeReconfigurable) networkClient.initiateClose() } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index ed6638cd425a5..f08450a2807c0 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -26,12 +26,13 @@ import java.time.Duration import java.util import java.util.{Collections, Properties} import java.util.concurrent._ - import javax.management.ObjectName + import com.yammer.metrics.Metrics import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} +import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager} import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec import kafka.network.{Processor, RequestChannel} @@ -56,7 +57,7 @@ import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PR import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} -import org.apache.kafka.test.TestSslUtils +import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} import org.junit.Assert._ import org.junit.{After, Before, Ignore, Test} import org.scalatest.Assertions.intercept @@ -334,6 +335,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() verifySslProduceConsume(sslProperties1, "alter-truststore-4") verifySslProduceConsume(sslProperties2, "alter-truststore-5") + + // Update internal keystore/truststore and validate new client connections from broker (e.g. controller). + // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection + // and verify that metadata is propagated for new topic. + val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix) + props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix) + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) + verifySslProduceConsume(sslProperties2, "alter-truststore-6") + props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix) + TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) + verifySslProduceConsume(sslProperties2, "alter-truststore-7") + waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) + + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get + val controllerChannelManager = controller.kafkaController.controllerChannelManager + val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = + JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") + brokerStateInfo(0).networkClient.disconnect("0") + TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) } @Test @@ -1078,6 +1098,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } } + private def waitForAuthenticationFailure(producerBuilder: ProducerBuilder): Unit = { + TestUtils.waitUntilTrue(() => { + try { + verifyAuthenticationFailure(producerBuilder.build()) + true + } catch { + case e: Error => false + } + }, "Did not fail authentication with invalid config") + } + private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = { val configResources = servers.map { server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)