diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index aeeabcc008baa..295ad061211b8 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn import scala.collection._ import scala.collection.mutable.ArrayBuffer @@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } } - @Test // TODO KAFKA-14126 add KRaft support - def testKeyStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testKeyStoreAlter(quorum: String): Unit = { val topic2 = "testtopic2" TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) @@ -421,8 +423,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup stopAndVerifyProduceConsume(producerThread, consumerThread) } - @Test // TODO KAFKA-14126 add KRaft support - def testTrustStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTrustStoreAlter(quorum: String): Unit = { val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL) // Producer with new keystore should fail to connect before truststore update @@ -469,9 +472,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertFalse(response.wasDisconnected(), "Request failed because broker is not available") } + val group_id = new AtomicInteger(1) + def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}" + // Produce/consume should work with old as well as new client keystore - verifySslProduceConsume(sslProperties1, "alter-truststore-1") - verifySslProduceConsume(sslProperties2, "alter-truststore-2") + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) // Revert to old truststore with only one certificate and update. Clients should connect only with old keystore. val oldTruststoreProps = new Properties @@ -480,7 +486,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reconfigureServers(oldTruststoreProps, perBrokerConfig = true, (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))) verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build()) - verifySslProduceConsume(sslProperties1, "alter-truststore-3") + verifySslProduceConsume(sslProperties1, next_group_name()) // Update same truststore file to contain both certificates without changing any configs. // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes. @@ -488,8 +494,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), StandardCopyOption.REPLACE_EXISTING) TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() - verifySslProduceConsume(sslProperties1, "alter-truststore-4") - verifySslProduceConsume(sslProperties2, "alter-truststore-5") + TestUtils.retry(30000) { + try { + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) + } catch { + case t: Throwable => throw new AssertionError(t) + } + } // Update internal keystore/truststore and validate new client connections from broker (e.g. controller). // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection @@ -497,21 +509,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix) props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-6") + verifySslProduceConsume(sslProperties2, next_group_name()) props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-7") + verifySslProduceConsume(sslProperties2, next_group_name()) waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] - val controllerChannelManager = controller.kafkaController.controllerChannelManager - val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = - JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") - brokerStateInfo(0).networkClient.disconnect("0") - TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) - - // validate that the brokerToController request works fine - verifyBrokerToControllerCall(controller) + if (!isKRaftTest()) { + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] + val controllerChannelManager = controller.kafkaController.controllerChannelManager + val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = + JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") + brokerStateInfo(0).networkClient.disconnect("0") + TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) + + // validate that the brokerToController request works fine + verifyBrokerToControllerCall(controller) + } } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4b8561a4d9089..4d6736b878d5b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; @@ -228,7 +229,8 @@ private void incrementalAlterConfigResource(ConfigResource configResource, newValue = String.join(",", oldValueList); break; } - if (!Objects.equals(currentValue, newValue)) { + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()). @@ -317,7 +319,8 @@ private void legacyAlterConfigResource(ConfigResource configResource, String key = entry.getKey(); String newValue = entry.getValue(); String currentValue = currentConfigs.get(key); - if (!Objects.equals(newValue, currentValue)) { + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()). @@ -381,7 +384,11 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + if (configSchema.isSensitive(record)) { + log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN); + } else { + log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + } } // VisibleForTesting