Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class KafkaServer(
def kafkaController: KafkaController = _kafkaController

var lifecycleManager: BrokerLifecycleManager = _
private var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _

@volatile var brokerEpochManager: ZkBrokerEpochManager = _

Expand Down Expand Up @@ -413,7 +414,7 @@ class KafkaServer(
// If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
RaftConfig.parseVoterConnections(config.quorumVoters))
val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
new MetadataRecordSerde,
Expand Down Expand Up @@ -1008,6 +1009,9 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()

if (raftManager != null)
CoreUtils.swallow(raftManager.shutdown(), this)

if (lifecycleManager != null) {
lifecycleManager.close()
}
Expand Down