From ac04be05481e4a28f780e6ffeb00922136c4d181 Mon Sep 17 00:00:00 2001 From: David Mao Date: Thu, 11 May 2023 11:46:59 -0700 Subject: [PATCH 1/5] Dynamic producer ID expiration should be applied on a broker restart --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++--- .../integration/kafka/api/ProducerIdExpirationTest.scala | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2214be13d0332..1e604c1c8608d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1017,7 +1017,7 @@ object KafkaConfig { val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads." val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out." val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing." - + val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition." val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " + @@ -1957,10 +1957,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp) val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp) val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp) - + val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp) /** ********* Metric Configuration **************/ diff --git a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala index 96cc54a623cdc..407c396bd8a82 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala @@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness { ) // Update the expiration time to a low value again. - admin.incrementalAlterConfigs(producerIdExpirationConfig("100")) + admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get() + + // restart a broker to ensure that dynamic config changes are picked up on restart + killBroker(0) + restartDeadBrokers() brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100, "Configuration was not updated.")) From 1a127e148b538ff8e2a4309a39dd6ccf74665267 Mon Sep 17 00:00:00 2001 From: David Mao Date: Thu, 11 May 2023 12:39:15 -0700 Subject: [PATCH 2/5] Some debugging logs --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 6 +++++- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 7c6d5284d7121..7542b1668c4b7 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -224,6 +224,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false, None) + info(s"Initializing dynamic configs for ${kafkaConfig.brokerId}") zkClientOpt.foreach { zkClient => val adminZkClient = new AdminZkClient(zkClient) updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default), false) @@ -337,6 +338,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) { try { + info("Updating default config") val props = fromPersistentProps(persistentProps, perBrokerConfig = false) dynamicDefaultConfigs.clear() dynamicDefaultConfigs ++= props.asScala @@ -556,6 +558,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false, doLog) if (newConfig ne currentConfig) { + info("Updating reconfigurables") currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) @@ -567,6 +570,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean, doLog: Boolean = false): (KafkaConfig, List[BrokerReconfigurable]) = { val newConfig = new KafkaConfig(newProps.asJava, doLog, None) val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals) + info(s"Dynamic config updated created $changeMap") if (changeMap.nonEmpty || deletedKeySet.nonEmpty) { try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs @@ -1079,8 +1083,8 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (producerStateManagerConfig.producerIdExpirationMs() != newConfig.producerIdExpirationMs) { info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") - producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } + producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } def validateReconfiguration(newConfig: KafkaConfig): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1e604c1c8608d..2940bf7f108b5 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1960,7 +1960,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp) /** ********* Metric Configuration **************/ From 38a6800e7f7822beb24c004ebc15f59f3cf2e483 Mon Sep 17 00:00:00 2001 From: David Mao Date: Fri, 21 Jul 2023 14:31:33 -0700 Subject: [PATCH 3/5] Remove debugging log statements --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 7542b1668c4b7..de3ca2a1c7bf7 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -338,7 +338,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) { try { - info("Updating default config") val props = fromPersistentProps(persistentProps, perBrokerConfig = false) dynamicDefaultConfigs.clear() dynamicDefaultConfigs ++= props.asScala @@ -558,7 +557,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false, doLog) if (newConfig ne currentConfig) { - info("Updating reconfigurables") currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) @@ -570,7 +568,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean, doLog: Boolean = false): (KafkaConfig, List[BrokerReconfigurable]) = { val newConfig = new KafkaConfig(newProps.asJava, doLog, None) val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals) - info(s"Dynamic config updated created $changeMap") if (changeMap.nonEmpty || deletedKeySet.nonEmpty) { try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs @@ -1081,9 +1078,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - if (producerStateManagerConfig.producerIdExpirationMs() != newConfig.producerIdExpirationMs) { - info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") - } + info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } From c5a1f1b7c9ac27e918833ab7c5ee2f8bcec2a20b Mon Sep 17 00:00:00 2001 From: David Mao Date: Fri, 21 Jul 2023 14:32:22 -0700 Subject: [PATCH 4/5] Dangling info log statement --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index de3ca2a1c7bf7..7c31854c1308b 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -224,7 +224,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false, None) - info(s"Initializing dynamic configs for ${kafkaConfig.brokerId}") zkClientOpt.foreach { zkClient => val adminZkClient = new AdminZkClient(zkClient) updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default), false) From adbd2140f3a43846b1fdb4ee97d2ae9eb8112061 Mon Sep 17 00:00:00 2001 From: David Mao Date: Mon, 24 Jul 2023 10:16:29 -0700 Subject: [PATCH 5/5] Restore original PR --- .../main/scala/kafka/server/DynamicBrokerConfig.scala | 6 ++++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 7c31854c1308b..7c6d5284d7121 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1077,8 +1077,10 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") - producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) + if (producerStateManagerConfig.producerIdExpirationMs() != newConfig.producerIdExpirationMs) { + info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs()} to ${newConfig.producerIdExpirationMs}") + producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) + } } def validateReconfiguration(newConfig: KafkaConfig): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2940bf7f108b5..e6a8ffd24075c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1014,11 +1014,11 @@ object KafkaConfig { val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " + "Internal topic creation will fail until the cluster size meets this replication factor requirement." val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)." - val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads." - val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out." - val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing." + val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" + val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out" + val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing" - val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition." + val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition" val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " + "Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " + @@ -1960,7 +1960,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp) - val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) + def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp) val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp) /** ********* Metric Configuration **************/