Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1014,11 +1014,11 @@ object KafkaConfig {
val TransactionsTopicReplicationFactorDoc = "The replication factor for the transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the transaction topic (should not change after deployment)."
val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out."
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing."
val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition."
val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"

val TransactionPartitionVerificationEnableDoc = "Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition"

val ProducerIdExpirationMsDoc = "The time in ms that a topic partition leader will wait before expiring producer IDs. Producer IDs will not expire while a transaction associated to them is still ongoing. " +
"Note that producer IDs may expire sooner if the last write from the producer ID is deleted due to the topic's retention settings. Setting this value the same or higher than " +
Expand Down Expand Up @@ -1957,10 +1957,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val transactionTopicSegmentBytes = getInt(KafkaConfig.TransactionsTopicSegmentBytesProp)
val transactionAbortTimedOutTransactionCleanupIntervalMs = getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)

val transactionPartitionVerificationEnable = getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)

val producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the other dynamic configs defs here? Just trying to figure out how this was missed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question. It is probably worth making sure we have similar coverage for other dynamic broker configs. Looking at some other dynamic configs, eg: LogCleanerThreadsProp, I would expect them to run into similar issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic broker config code is quite hairy. I took a look and my high-level understanding what it does is the following:

DynamicBrokerConfig.updateCurrentConfig: Try to generate a new KafkaConfig from the set of properties persisted to Zookeeper. If the current config is equal to the new config, no-op. Otherwise, determine the set of reconfigurables that need to be updated based on the currently registered set of reconfigurables, apply those updates. Then update the current config.

I added some logging and it looks like what is happening is the following:

  1. During KafkaServer.startup() we call config.dynamicConfig.initialize(Some(zkClient)). At this point, the set of recconfigurables is empty.
  2. Many lines of code later, we call:
        /* Add all reconfigurables for config change notification before starting config handlers */
        config.dynamicConfig.addReconfigurables(this)
        Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
  1. Eventually we start up the config manager which tries to reload props from Zookeeper.

Step #1 loads broker overrides from Zookeeper, but doesn't apply any changes since we have not added the reconfigurables yet. This means that the props just get applied to the current KafkaConfig, and the reconfiguration hooks defined in DynamicBrokerConfig don't fire. However, we do update the current KafkaConfig to include the updated props.
Step #2 adds the reconfigurables so that post-startup configuration changes alter components.
Step #3 tries to load from Zookeeper the base props, but because #1 has already updated the current KafkaConfig to match the existing ZK state, we no-op again.

val producerIdExpirationCheckIntervalMs = getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)

/** ********* Metric Configuration **************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness {
)

// Update the expiration time to a low value again.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100"))
admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the all + get ensure the call actually completes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes


// restart a broker to ensure that dynamic config changes are picked up on restart
killBroker(0)
restartDeadBrokers()

brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100, "Configuration was not updated."))

Expand Down