Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void reconfigure(Map<String, ?> configs) throws KafkaException {
SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore;
SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore;
this.sslContext = createSSLContext(keystore, truststore);
log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore);
log.info("Created new {} SSL context with keystore {} truststore {}", mode, keystore, truststore);
this.keystore = keystore;
this.truststore = truststore;
} catch (Exception e) {
Expand Down
12 changes: 12 additions & 0 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -464,4 +465,15 @@ public static <T> void assertOptional(Optional<T> optional, Consumer<T> assertio
fail("Missing value from Optional");
}
}

@SuppressWarnings("unchecked")
public static <T> T fieldValue(Object o, Class<?> clazz, String fieldName) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(o);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import org.apache.kafka.common.{KafkaException, Node, Reconfigurable, TopicPartition}

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -111,7 +111,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
val (networkClient, reconfigurableChannelBuilder) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerToBrokerSecurityProtocol,
JaasContext.Type.SERVER,
Expand All @@ -121,6 +121,12 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
time,
config.saslInterBrokerHandshakeRequestEnable
)
val reconfigurableChannelBuilder = channelBuilder match {
case reconfigurable: Reconfigurable =>
config.addReconfigurable(reconfigurable)
Some(reconfigurable)
case _ => None
}
val selector = new Selector(
NetworkReceive.UNLIMITED,
Selector.NO_IDLE_TIMEOUT_MS,
Expand All @@ -132,7 +138,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
channelBuilder,
logContext
)
new NetworkClient(
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
Expand All @@ -148,6 +154,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
new ApiVersions,
logContext
)
(networkClient, reconfigurableChannelBuilder)
}
val threadName = threadNamePrefix match {
case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
Expand All @@ -171,7 +178,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
)

brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
}

private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
Expand All @@ -182,6 +189,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
// non-threadsafe classes as described in KAFKA-4959.
// The call to shutdownLatch.await() in ShutdownableThread.shutdown() serves as a synchronization barrier that
// hands off the NetworkClient from the RequestSendThread to the ZkEventThread.
brokerState.reconfigurableChannelBuilder.foreach(config.removeReconfigurable)
brokerState.requestSendThread.shutdown()
brokerState.networkClient.close()
brokerState.messageQueue.clear()
Expand Down Expand Up @@ -531,7 +539,8 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread,
queueSizeGauge: Gauge[Int],
requestRateAndTimeMetrics: Timer)
requestRateAndTimeMetrics: Timer,
reconfigurableChannelBuilder: Option[Reconfigurable])

case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
Expand Down Expand Up @@ -54,6 +54,10 @@ object TransactionMarkerChannelManager {
time,
config.saslInterBrokerHandshakeRequestEnable
)
channelBuilder match {
case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
case _ =>
}
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.Node
import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.requests.AbstractRequest.Builder

import scala.collection.JavaConverters._
Expand All @@ -51,7 +51,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs

private val networkClient = {
private val (networkClient, reconfigurableChannelBuilder) = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
brokerConfig.interBrokerSecurityProtocol,
JaasContext.Type.SERVER,
Expand All @@ -61,6 +61,12 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
time,
brokerConfig.saslInterBrokerHandshakeRequestEnable
)
val reconfigurableChannelBuilder = channelBuilder match {
case reconfigurable: Reconfigurable =>
brokerConfig.addReconfigurable(reconfigurable)
Some(reconfigurable)
case _ => None
}
val selector = new Selector(
NetworkReceive.UNLIMITED,
brokerConfig.connectionsMaxIdleMs,
Expand All @@ -72,7 +78,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
channelBuilder,
logContext
)
new NetworkClient(
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(),
clientId,
Expand All @@ -88,6 +94,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
new ApiVersions,
logContext
)
(networkClient, reconfigurableChannelBuilder)
}

override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
Expand All @@ -108,6 +115,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
}

override def initiateClose(): Unit = {
reconfigurableChannelBuilder.foreach(brokerConfig.removeReconfigurable)
networkClient.initiateClose()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._

import javax.management.ObjectName

import com.yammer.metrics.Metrics
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.network.{Processor, RequestChannel}
Expand All @@ -56,7 +57,7 @@ import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PR
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestSslUtils
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.Assert._
import org.junit.{After, Before, Ignore, Test}
import org.scalatest.Assertions.intercept
Expand Down Expand Up @@ -334,6 +335,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")

// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
// and verify that metadata is propagated for new topic.
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-6")
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))

val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
}

@Test
Expand Down Expand Up @@ -1078,6 +1098,17 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}

private def waitForAuthenticationFailure(producerBuilder: ProducerBuilder): Unit = {
TestUtils.waitUntilTrue(() => {
try {
verifyAuthenticationFailure(producerBuilder.build())
true
} catch {
case e: Error => false
}
}, "Did not fail authentication with invalid config")
}

private def describeConfig(adminClient: AdminClient, servers: Seq[KafkaServer] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
Expand Down