-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-19458: resume cleaning on future replica dir change #20082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
28dc019 to
83b70cf
Compare
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaurav-narula : Thanks for the PR. Nice catch! Left a couple of comments.
| if (partition.futureReplicaDirChanged(destinationDir)) { | ||
| replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) | ||
| partition.removeFutureLocalReplica() | ||
| logManager.resumeCleaning(topicPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it's possible that immediately after the partition.futureReplicaDirChanged check, the future partition catches up and is replaced as the current partition. During that process, logManager.resumeCleaning() has already been called. Calling it a second time here will lead to IllegalStateException. We could potentially check for the future replica again after replicaAlterLogDirsManager.removeFetcherForPartitions(). At that point, if the future replica still exist, it's guaranteed to be there afterward.
| remoteStorageSystemEnable: Boolean, | ||
| val initialTaskDelayMs: Long) extends Logging { | ||
| val initialTaskDelayMs: Long, | ||
| cleanerFactory: (CleanerConfig, util.List[File], ConcurrentMap[TopicPartition, UnifiedLog], LogDirFailureChannel, Time) => LogCleaner = (cleanerConfig, files, map, logDirFailureChannel, time) => new LogCleaner(cleanerConfig, files, map, logDirFailureChannel, time)) extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite a long time. Could we format it better?
`ReplicaManager#alterReplicaLogDirs` does not resume log cleaner while handling an `AlterReplicaLogDirs` request for a topic partition which already has an `AlterReplicaLogDirs` in progress, leading to a resource leak where the cleaning for topic partitions remains paused even after the log directory has been altered. This change ensures we invoke `LogManager#resumeCleaning` if the future replica directory has changed.
83b70cf to
0b427e4
Compare
0b427e4 to
9773727
Compare
junrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaurav-narula : Thanks for the updated PR. LGTM
`ReplicaManager#alterReplicaLogDirs` does not resume log cleaner while handling an `AlterReplicaLogDirs` request for a topic partition which already has an `AlterReplicaLogDirs` in progress, leading to a resource leak where the cleaning for topic partitions remains paused even after the log directory has been altered. This change ensures we invoke `LogManager#resumeCleaning` if the future replica directory has changed. Reviewers: Jun Rao <junrao@gmail.com>
| replicaManager.alterReplicaLogDirs(Map(tp -> newReplicaFolder.getAbsolutePath)) | ||
|
|
||
| // Prevent promotion of future replica | ||
| doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi all,
java.lang.ClassCastException: class java.lang.Boolean cannot be cast to class org.apache.kafka.storage.internals.log.UnifiedLog (java.lang.Boolean is in module java.base of loader 'bootstrap'; org.apache.kafka.storage.internals.log.UnifiedLog is in unnamed module of loader 'app')
at kafka.cluster.Partition.futureLocalLogOrException(Partition.scala:408)
at kafka.server.ReplicaManager.futureLocalLogOrException(ReplicaManager.scala:587)
at kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner(ReplicaManagerTest.scala:5534)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
The replica thread interfered with the doReturn(false) stubbing, causing Mockito to incorrectly apply the return value to the wrong method.
You can reproduce the error with the following command
N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests ReplicaManagerTest -PmaxParallelForks=4 \
; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
Will file a patch for it 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I've opened #21244 to fix this flaky test.
Thanks for the detailed explanation here, it was very helpful!
Refer to apache#20082 (comment). Refactored the test to fix a race condition caused by dynamic Mockito stubbing during test execution. The previous implementation used `doReturn(false)` and `reset()` on a spy object while a background thread was running, causing a `ClassCastException`. This patch replaces that logic with a thread-safe `AtomicBoolean` and `doAnswer` approach to toggle the mock's behavior safely.
Refer to #20082 (comment). Refactored the test to fix a race condition caused by dynamic Mockito stubbing during test execution. The previous implementation used `doReturn(false)` and `reset()` on a spy object while a background thread was running, causing a `ClassCastException`. This patch replaces that logic with a thread-safe `AtomicBoolean` and `doAnswer` approach to toggle the mock's behavior safely. ## Test Command ``` N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests ReplicaManagerTest -PmaxParallelForks=4 \ ; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done ``` ## Test Result ``` BUILD SUCCESSFUL in 12s 151 actionable tasks: 2 executed, 149 up-to-date Consider enabling configuration cache to speed up this build: https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html Completed run: 100 ``` Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
ReplicaManager#alterReplicaLogDirsdoes not resume log cleaner while handling anAlterReplicaLogDirsrequest for a topic partition which already has anAlterReplicaLogDirsin progress, leading to a resource leak where the cleaning for topic partitions remains paused even after the log directory has been altered.This change ensures we invoke
LogManager#resumeCleaningif the future replica directory has changed.