diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2214be13d0332..e6a8ffd24075c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1014,11 +1014,11 @@ object KafkaConfig {
val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
- val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."
- val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out."
- val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing."
-
- val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition."
+ val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
+ val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
+ val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to transactional.id.expiration.ms passing"
+
+ val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition"
val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " +
"Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " +
@@ -1957,10 +1957,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
-
+
val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
- val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
+ def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
/** ********* Metric Configuration **************/
diff --git a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index 96cc54a623cdc..407c396bd8a82 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
)
// Update the expiration time to a low value again.
- admin.incrementalAlterConfigs(producerIdExpirationConfig("100"))
+ admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get()
+
+ // restart a broker to ensure that dynamic config changes are picked up on restart
+ killBroker(0)
+ restartDeadBrokers()
brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100, "Configuration was not updated."))