diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 64f0ed9129bcf..8e7d588e6809c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2142,7 +2142,6 @@ class ReplicaManager(val config: KafkaConfig, ): Unit = { stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " + "local followers.") - val shuttingDown = isShuttingDown.get() val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition] val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean] val followerTopicSet = new mutable.HashSet[String] @@ -2151,28 +2150,24 @@ class ReplicaManager(val config: KafkaConfig, try { followerTopicSet.add(tp.topic) - if (shuttingDown) { - stateChangeLogger.trace(s"Unable to start fetching $tp with topic " + - s"ID ${info.topicId} because the replica manager is shutting down.") - } else { - // We always update the follower state. - // - This ensure that a replica with no leader can step down; - // - This also ensures that the local replica is created even if the leader - // is unavailable. This is required to ensure that we include the partition's - // high watermark in the checkpoint file (see KAFKA-1647). - val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) - val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId)) - - if (isInControlledShutdown && (info.partition.leader == NO_LEADER || - !info.partition.isr.contains(config.brokerId))) { - // During controlled shutdown, replica with no leaders and replica - // where this broker is not in the ISR are stopped. - partitionsToStopFetching.put(tp, false) - } else if (isNewLeaderEpoch) { - // Otherwise, fetcher is restarted if the leader epoch has changed. - partitionsToStartFetching.put(tp, partition) - } + // We always update the follower state. + // - This ensure that a replica with no leader can step down; + // - This also ensures that the local replica is created even if the leader + // is unavailable. This is required to ensure that we include the partition's + // high watermark in the checkpoint file (see KAFKA-1647). + val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) + val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId)) + + if (isInControlledShutdown && (info.partition.leader == NO_LEADER || + !info.partition.isr.contains(config.brokerId))) { + // During controlled shutdown, replica with no leaders and replica + // where this broker is not in the ISR are stopped. + partitionsToStopFetching.put(tp, false) + } else if (isNewLeaderEpoch) { + // Otherwise, fetcher is restarted if the leader epoch has changed. + partitionsToStartFetching.put(tp, partition) } + changedPartitions.add(partition) } catch { case e: KafkaStorageException => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index fc601d35a723e..19199ff71b925 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,7 +21,7 @@ import java.io.File import java.net.InetAddress import java.nio.file.Files import java.util -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.stream.IntStream import java.util.{Collections, Optional, Properties} @@ -2210,7 +2210,8 @@ class ReplicaManagerTest { aliveBrokerIds: Seq[Int] = Seq(0, 1), propsModifier: Properties => Unit = _ => {}, mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None, - mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None + mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None, + isShuttingDown: AtomicBoolean = new AtomicBoolean(false) ): ReplicaManager = { val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath) @@ -2245,6 +2246,7 @@ class ReplicaManagerTest { metadataCache = metadataCache, logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), alterPartitionManager = alterPartitionManager, + isShuttingDown = isShuttingDown, delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), @@ -3868,10 +3870,12 @@ class ReplicaManagerTest { val foo2 = new TopicPartition("foo", 2) val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) + val isShuttingDown = new AtomicBoolean(false) val replicaManager = setupReplicaManagerWithMockedPurgatories( timer = new MockTimer(time), brokerId = localId, - mockReplicaFetcherManager = Some(mockReplicaFetcherManager) + mockReplicaFetcherManager = Some(mockReplicaFetcherManager), + isShuttingDown = isShuttingDown ) try { @@ -3940,6 +3944,10 @@ class ReplicaManagerTest { reset(mockReplicaFetcherManager) + // The broker transitions to SHUTTING_DOWN state. This should not have + // any impact in KRaft mode. + isShuttingDown.set(true) + // The replica begins the controlled shutdown. replicaManager.beginControlledShutdown()