diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b2f944ad5d2e5..9965c27ff6e86 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -807,6 +807,9 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); + // consumer didn't get assignment in this generation, so we need to reset generation + // to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance + resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false); future.raise(error); } else if (error == Errors.FENCED_INSTANCE_ID) { // for sync-group request, even if the generation has changed we would not expect the instance id diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 48ed136ebc4cb..45094ee030354 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -488,6 +489,54 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } + @Test + public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException { + setupCoordinator(); + + String memberId = "memberId"; + int generation = 5; + + // Rebalance once to initialize the generation and memberId + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + expectJoinGroup("", generation, memberId); + expectSyncGroup(generation, memberId); + ensureActiveGroup(generation, memberId); + + // Force a rebalance + coordinator.requestRejoin("Manual test trigger"); + assertTrue(coordinator.rejoinNeededOrPending()); + + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + // Return RebalanceInProgress in syncGroup + int rejoinedGeneration = 10; + expectJoinGroup(memberId, rejoinedGeneration, memberId); + expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId); + Future secondJoin = executor.submit(() -> + coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE))); + + TestUtils.waitForCondition(() -> { + AbstractCoordinator.Generation currentGeneration = coordinator.generation(); + return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId && + currentGeneration.memberId.equals(memberId); + }, 2000, "Generation should be reset"); + + rejoinedGeneration = 20; + expectSyncGroup(rejoinedGeneration, memberId); + mockClient.respond(joinGroupFollowerResponse( + rejoinedGeneration, + memberId, + "leaderId", + Errors.NONE, + PROTOCOL_TYPE + )); + assertTrue(secondJoin.get()); + } finally { + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + } + @Test public void testRejoinReason() { setupCoordinator(); @@ -566,6 +615,22 @@ private void expectDisconnectInSyncGroup( }, null, true); } + private void expectRebalanceInProgressForSyncGroup( + int expectedGeneration, + String expectedMemberId + ) { + mockClient.prepareResponse(body -> { + if (!(body instanceof SyncGroupRequest)) { + return false; + } + SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data(); + return syncGroupRequest.generationId() == expectedGeneration + && syncGroupRequest.memberId().equals(expectedMemberId) + && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE) + && syncGroupRequest.protocolName().equals(PROTOCOL_NAME); + }, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME)); + } + private void expectDisconnectInJoinGroup( String expectedMemberId ) {