Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}

@Test // TODO KAFKA-14126 add KRaft support
def testKeyStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)

Expand Down Expand Up @@ -421,8 +423,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}

@Test // TODO KAFKA-14126 add KRaft support
def testTrustStoreAlter(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTrustStoreAlter(quorum: String): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)

// Producer with new keystore should fail to connect before truststore update
Expand Down Expand Up @@ -469,9 +472,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}

val group_id = new AtomicInteger(1)
def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"

// Produce/consume should work with old as well as new client keystore
verifySslProduceConsume(sslProperties1, "alter-truststore-1")
verifySslProduceConsume(sslProperties2, "alter-truststore-2")
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())

// Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
val oldTruststoreProps = new Properties
Expand All @@ -480,38 +486,46 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
verifySslProduceConsume(sslProperties1, "alter-truststore-3")
verifySslProduceConsume(sslProperties1, next_group_name())

// Update same truststore file to contain both certificates without changing any configs.
// Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
Files.copy(Paths.get(combinedStoreProps.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
verifySslProduceConsume(sslProperties1, "alter-truststore-4")
verifySslProduceConsume(sslProperties2, "alter-truststore-5")
TestUtils.retry(30000) {
try {
verifySslProduceConsume(sslProperties1, next_group_name())
verifySslProduceConsume(sslProperties2, next_group_name())
} catch {
case t: Throwable => throw new AssertionError(t)
}
}

// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
// and verify that metadata is propagated for new topic.
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-6")
verifySslProduceConsume(sslProperties2, next_group_name())
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))

val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)

// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
if (!isKRaftTest()) {
val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
brokerStateInfo(0).networkClient.disconnect("0")
TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)

// validate that the brokerToController request works fine
verifyBrokerToControllerCall(controller)
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -228,7 +229,8 @@ private void incrementalAlterConfigResource(ConfigResource configResource,
newValue = String.join(",", oldValueList);
break;
}
if (!Objects.equals(currentValue, newValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
Expand Down Expand Up @@ -317,7 +319,8 @@ private void legacyAlterConfigResource(ConfigResource configResource,
String key = entry.getKey();
String newValue = entry.getValue();
String currentValue = currentConfigs.get(key);
if (!Objects.equals(newValue, currentValue)) {
if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
// KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
Expand Down Expand Up @@ -381,7 +384,11 @@ public void replay(ConfigRecord record) {
if (configs.isEmpty()) {
configData.remove(configResource);
}
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
if (configSchema.isSensitive(record)) {
log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
} else {
log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
}
}

// VisibleForTesting
Expand Down