From 33f5d40a07a81de5f67e6e18e24abd5593675a10 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Aug 2022 16:07:31 -0400 Subject: [PATCH 1/3] KAFKA-14136 Generate ConfigRecord even if the value is unchanged Also enable keystore and truststore reconfiguration tests Also fix KAFKA-14115 --- .../DynamicBrokerReconfigurationTest.scala | 54 ++++++++++++------- .../ConfigurationControlManager.java | 33 ++++++------ 2 files changed, 50 insertions(+), 37 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index aeeabcc008baa..295ad061211b8 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn import scala.collection._ import scala.collection.mutable.ArrayBuffer @@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } } - @Test // TODO KAFKA-14126 add KRaft support - def testKeyStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testKeyStoreAlter(quorum: String): Unit = { val topic2 = "testtopic2" TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) @@ -421,8 +423,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup stopAndVerifyProduceConsume(producerThread, consumerThread) } - @Test // TODO KAFKA-14126 add KRaft support - def testTrustStoreAlter(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testTrustStoreAlter(quorum: String): Unit = { val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL) // Producer with new keystore should fail to connect before truststore update @@ -469,9 +472,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertFalse(response.wasDisconnected(), "Request failed because broker is not available") } + val group_id = new AtomicInteger(1) + def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}" + // Produce/consume should work with old as well as new client keystore - verifySslProduceConsume(sslProperties1, "alter-truststore-1") - verifySslProduceConsume(sslProperties2, "alter-truststore-2") + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) // Revert to old truststore with only one certificate and update. Clients should connect only with old keystore. val oldTruststoreProps = new Properties @@ -480,7 +486,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reconfigureServers(oldTruststoreProps, perBrokerConfig = true, (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG))) verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build()) - verifySslProduceConsume(sslProperties1, "alter-truststore-3") + verifySslProduceConsume(sslProperties1, next_group_name()) // Update same truststore file to contain both certificates without changing any configs. // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes. @@ -488,8 +494,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)), StandardCopyOption.REPLACE_EXISTING) TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get() - verifySslProduceConsume(sslProperties1, "alter-truststore-4") - verifySslProduceConsume(sslProperties2, "alter-truststore-5") + TestUtils.retry(30000) { + try { + verifySslProduceConsume(sslProperties1, next_group_name()) + verifySslProduceConsume(sslProperties2, next_group_name()) + } catch { + case t: Throwable => throw new AssertionError(t) + } + } // Update internal keystore/truststore and validate new client connections from broker (e.g. controller). // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection @@ -497,21 +509,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix) props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-6") + verifySslProduceConsume(sslProperties2, next_group_name()) props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix) TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS) - verifySslProduceConsume(sslProperties2, "alter-truststore-7") + verifySslProduceConsume(sslProperties2, next_group_name()) waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1)) - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] - val controllerChannelManager = controller.kafkaController.controllerChannelManager - val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = - JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") - brokerStateInfo(0).networkClient.disconnect("0") - TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) - - // validate that the brokerToController request works fine - verifyBrokerToControllerCall(controller) + if (!isKRaftTest()) { + val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer] + val controllerChannelManager = controller.kafkaController.controllerChannelManager + val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] = + JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo") + brokerStateInfo(0).networkClient.disconnect("0") + TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers) + + // validate that the brokerToController request works fine + verifyBrokerToControllerCall(controller) + } } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 4b8561a4d9089..59fe8467d12ae 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; @@ -42,7 +43,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; @@ -228,13 +228,11 @@ private void incrementalAlterConfigResource(ConfigResource configResource, newValue = String.join(",", oldValueList); break; } - if (!Objects.equals(currentValue, newValue)) { - newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(configResource.type().id()). - setResourceName(configResource.name()). - setName(key). - setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); - } + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); } ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource); if (error.isFailure()) { @@ -316,14 +314,11 @@ private void legacyAlterConfigResource(ConfigResource configResource, for (Entry entry : newConfigs.entrySet()) { String key = entry.getKey(); String newValue = entry.getValue(); - String currentValue = currentConfigs.get(key); - if (!Objects.equals(newValue, currentValue)) { - newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(configResource.type().id()). - setResourceName(configResource.name()). - setName(key). - setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); - } + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); } for (String key : currentConfigs.keySet()) { if (!newConfigs.containsKey(key)) { @@ -381,7 +376,11 @@ public void replay(ConfigRecord record) { if (configs.isEmpty()) { configData.remove(configResource); } - log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + if (configSchema.isSensitive(record)) { + log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN); + } else { + log.info("{}: set configuration {} to {}", configResource, record.name(), record.value()); + } } // VisibleForTesting From b086f7dcf076b4e27b4eed8abcc8a01e44034d89 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 4 Aug 2022 09:59:29 -0400 Subject: [PATCH 2/3] Only generate redundant records for broker configs --- .../ConfigurationControlManager.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 59fe8467d12ae..5dcfc71496b25 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; @@ -228,11 +229,14 @@ private void incrementalAlterConfigResource(ConfigResource configResource, newValue = String.join(",", oldValueList); break; } - newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(configResource.type().id()). - setResourceName(configResource.name()). - setName(key). - setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // We need to generate records even if the value is unchanged to trigger reloads on the brokers + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); + } } ApiError error = validateAlterConfig(configResource, newRecords, newlyCreatedResource); if (error.isFailure()) { @@ -314,11 +318,15 @@ private void legacyAlterConfigResource(ConfigResource configResource, for (Entry entry : newConfigs.entrySet()) { String key = entry.getKey(); String newValue = entry.getValue(); - newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). - setResourceType(configResource.type().id()). - setResourceName(configResource.name()). - setName(key). - setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); + String currentValue = currentConfigs.get(key); + if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { + // We need to generate records even if the value is unchanged to trigger reloads on the brokers + newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). + setResourceType(configResource.type().id()). + setResourceName(configResource.name()). + setName(key). + setValue(newValue), CONFIG_RECORD.highestSupportedVersion())); + } } for (String key : currentConfigs.keySet()) { if (!newConfigs.containsKey(key)) { From bb2988ff6aa73a7065b287765bfdcde76d50c4fc Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 4 Aug 2022 15:08:14 -0400 Subject: [PATCH 3/3] Add reference to the JIRA --- .../apache/kafka/controller/ConfigurationControlManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index 5dcfc71496b25..4d6736b878d5b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -230,7 +230,7 @@ private void incrementalAlterConfigResource(ConfigResource configResource, break; } if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { - // We need to generate records even if the value is unchanged to trigger reloads on the brokers + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()). @@ -320,7 +320,7 @@ private void legacyAlterConfigResource(ConfigResource configResource, String newValue = entry.getValue(); String currentValue = currentConfigs.get(key); if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) { - // We need to generate records even if the value is unchanged to trigger reloads on the brokers + // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers newRecords.add(new ApiMessageAndVersion(new ConfigRecord(). setResourceType(configResource.type().id()). setResourceName(configResource.name()).