Skip to content

Conversation

@Parkerhiphop
Copy link
Contributor

@Parkerhiphop Parkerhiphop commented Jan 4, 2026

Refer to
#20082 (comment).

Refactored the test to fix a race condition caused by dynamic Mockito
stubbing during test execution.

The previous implementation used doReturn(false) and reset() on a
spy object while a background thread was running, causing a
ClassCastException.

This patch replaces that logic with a thread-safe AtomicBoolean and
doAnswer approach to toggle the mock's behavior safely.

Test Command

N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests
ReplicaManagerTest -PmaxParallelForks=4 \
; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done

Test Result

BUILD SUCCESSFUL in 12s
151 actionable tasks: 2 executed, 149 up-to-date
Consider enabling configuration cache to speed up this build:
https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html
Completed run: 100

Reviewers: Gaurav Narula gaurav_narula2@apple.com, Chia-Ping Tsai
chia7712@gmail.com, PoAn Yang payang@apache.org

Refer to apache#20082 (comment).

Refactored the test to fix a race condition caused by dynamic Mockito stubbing during test execution.

The previous implementation used `doReturn(false)` and `reset()` on a spy object while a background thread was running, causing a `ClassCastException`. This patch replaces that logic with a thread-safe `AtomicBoolean` and `doAnswer` approach to toggle the mock's behavior safely.
@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) small Small PRs labels Jan 4, 2026
try {
val spiedPartition = spy(Partition(tpId, time, replicaManager))

val blockPromotion = new AtomicBoolean(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the following changes be sufficient?

      val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == firstLogDir).head
      // Prevent promotion of future replica
      doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()

      replicaManager.alterReplicaLogDirs(Map(tp -> newReplicaFolder.getAbsolutePath))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the spied call might still not be visible to the ReplicaAlterLogDirsThread. IIUC, the visibility across threads can only be enforced through some synchronization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gaurav-narula for the explanation on visibility!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change @Parkerhiphop. You might want to initialise this as new AtomicBoolean(true) and avoid invoking blockPromotion.set(true) explicitly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula Thanks! That's a great refinement! It makes the code much cleaner.

I have updated the code to new AtomicBoolean(true) and removed the explicit set(true) call.

The logic remains the same (blocking the first attempt via CAS), and I verified it with the loop test again (same result as before)

Copy link
Contributor

@gaurav-narula gaurav-narula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments


// Prevent promotion of future replica
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
blockPromotion.set(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is racey. What if the test thread is interrupted and ReplicaAlterDirsThread invokes spiedPartition.maybeReplaceCurrentWithFutureReplica() before this line executes?

Perhaps consider a CAS within doAnswer instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use CAS if (blockPromotion.compareAndSet(true, false))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the CAS suggestion.

I have addressed the race condition and simplified the logic as suggested:

  1. Execution Order: Moved blockPromotion.set(true) beforealterReplicaLogDirs to ensure the flag is set before the background thread starts.
  2. CAS: Adopted compareAndSet(true, false) within doAnswer to automatically unblock the promotion after the first attempt.

To verify the stability, I ran the following test command with no failures (also updated in the PR description):

Test Command

N=100; I=0; while [ $I -lt $N ] && ./gradlew cleanTest core:test --tests ReplicaManagerTest -PmaxParallelForks=4 \
; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done

Test Result

BUILD SUCCESSFUL in 12s
151 actionable tasks: 2 executed, 149 up-to-date
Consider enabling configuration cache to speed up this build: https://docs.gradle.org/9.2.1/userguide/configuration_cache_enabling.html
Completed run: 100

try {
val spiedPartition = spy(Partition(tpId, time, replicaManager))

val blockPromotion = new AtomicBoolean(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the spied call might still not be visible to the ReplicaAlterLogDirsThread. IIUC, the visibility across threads can only be enforced through some synchronization.

@github-actions github-actions bot removed the triage PRs from the community label Jan 5, 2026
@github-actions github-actions bot added streams consumer tools connect performance kraft storage Pull requests that target the storage module KIP-932 Queues for Kafka transactions Transactions and EOS clients group-coordinator and removed small Small PRs labels Jan 6, 2026
@Parkerhiphop Parkerhiphop force-pushed the minor-fix-replica-manager-test branch from 8351ce6 to 99dc779 Compare January 6, 2026 14:22
@github-actions github-actions bot added the small Small PRs label Jan 6, 2026
@Parkerhiphop
Copy link
Contributor Author

Parkerhiphop commented Jan 6, 2026

Apologies for the force push.
I messed up the git merge locally which caused unrelated commits from trunk to appear in this PR.
I have cleaned up the commit history and re-synced with trunk.
The actual code changes remain the same.

@chia7712
Copy link
Member

chia7712 commented Jan 6, 2026

the fatal error is caused by ReconfigurableQuorumIntegrationTest.testRemoveAndAddSameController. open https://issues.apache.org/jira/browse/KAFKA-20044

@chia7712 chia7712 merged commit 04e3acb into apache:trunk Jan 6, 2026
26 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients connect consumer core Kafka Broker group-coordinator KIP-932 Queues for Kafka kraft performance small Small PRs storage Pull requests that target the storage module streams tests Test fixes (including flaky tests) tools transactions Transactions and EOS

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants