Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2142,7 +2142,6 @@ class ReplicaManager(val config: KafkaConfig,
): Unit = {
stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " +
"local followers.")
val shuttingDown = isShuttingDown.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is a bit brittle, can we have something like the following as the field instead?

private Supplier brokerState = null;

Then we don't have to make sure to update isShuttingDown when the logic changes in KafkaServer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that you are saying that relying on isShuttingDown is brittle, right? If that is the case, I do agree that relying on the broker state is better. I think that we can refactor the remaining usages as a follow-up.

val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
val followerTopicSet = new mutable.HashSet[String]
Expand All @@ -2151,28 +2150,24 @@ class ReplicaManager(val config: KafkaConfig,
try {
followerTopicSet.add(tp.topic)

if (shuttingDown) {
stateChangeLogger.trace(s"Unable to start fetching $tp with topic " +
s"ID ${info.topicId} because the replica manager is shutting down.")
} else {
// We always update the follower state.
// - This ensure that a replica with no leader can step down;
// - This also ensures that the local replica is created even if the leader
// is unavailable. This is required to ensure that we include the partition's
// high watermark in the checkpoint file (see KAFKA-1647).
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))

if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
Copy link
Member

@ijuma ijuma Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, why do we need this special isInControlledShutdown field versus relying on the broker state as I mentioned above? This kind of approach is extremely brittle in my opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to do this only when we are in controlled shutdown and it could be disabled. If you look at the caller side, we aligned this on how we do it for the lifecycleManager as both go together. We could revise this though.

!info.partition.isr.contains(config.brokerId))) {
// During controlled shutdown, replica with no leaders and replica
// where this broker is not in the ISR are stopped.
partitionsToStopFetching.put(tp, false)
} else if (isNewLeaderEpoch) {
// Otherwise, fetcher is restarted if the leader epoch has changed.
partitionsToStartFetching.put(tp, partition)
}
// We always update the follower state.
// - This ensure that a replica with no leader can step down;
// - This also ensures that the local replica is created even if the leader
// is unavailable. This is required to ensure that we include the partition's
// high watermark in the checkpoint file (see KAFKA-1647).
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))

if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
!info.partition.isr.contains(config.brokerId))) {
// During controlled shutdown, replica with no leaders and replica
// where this broker is not in the ISR are stopped.
partitionsToStopFetching.put(tp, false)
} else if (isNewLeaderEpoch) {
// Otherwise, fetcher is restarted if the leader epoch has changed.
partitionsToStartFetching.put(tp, partition)
}

changedPartitions.add(partition)
} catch {
case e: KafkaStorageException =>
Expand Down
14 changes: 11 additions & 3 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.net.InetAddress
import java.nio.file.Files
import java.util
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties}
Expand Down Expand Up @@ -2210,7 +2210,8 @@ class ReplicaManagerTest {
aliveBrokerIds: Seq[Int] = Seq(0, 1),
propsModifier: Properties => Unit = _ => {},
mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None,
mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None
mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None,
isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the use of the field a bit confusing. We don't necessarily have to fix it here, but what do you think about letting ReplicaManager own its own shutdown? Basically create a private field which serves the same purpose and then expose a method to explicitly toggle it. It is not obvious when looking at BrokerServer that altering the local field has this remote effect.

): ReplicaManager = {
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
Expand Down Expand Up @@ -2245,6 +2246,7 @@ class ReplicaManagerTest {
metadataCache = metadataCache,
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
isShuttingDown = isShuttingDown,
delayedProducePurgatoryParam = Some(mockProducePurgatory),
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
Expand Down Expand Up @@ -3868,10 +3870,12 @@ class ReplicaManagerTest {
val foo2 = new TopicPartition("foo", 2)

val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
val isShuttingDown = new AtomicBoolean(false)
val replicaManager = setupReplicaManagerWithMockedPurgatories(
timer = new MockTimer(time),
brokerId = localId,
mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
mockReplicaFetcherManager = Some(mockReplicaFetcherManager),
isShuttingDown = isShuttingDown
)

try {
Expand Down Expand Up @@ -3940,6 +3944,10 @@ class ReplicaManagerTest {

reset(mockReplicaFetcherManager)

// The broker transitions to SHUTTING_DOWN state. This should not have
// any impact in KRaft mode.
isShuttingDown.set(true)

// The replica begins the controlled shutdown.
replicaManager.beginControlledShutdown()

Expand Down