diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c78caaba0596d..80b515d601b2a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -825,9 +825,6 @@ public void handle(SyncGroupResponse syncResponse, } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); - // consumer didn't get assignment in this generation, so we need to reset generation - // to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance - resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false); future.raise(error); } else if (error == Errors.FENCED_INSTANCE_ID) { // for sync-group request, even if the generation has changed we would not expect the instance id diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 4471b6f88cf2e..0745b99749f51 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -67,7 +67,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -504,54 +503,6 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() { ensureActiveGroup(rejoinedGeneration, memberId); } - @Test - public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException { - setupCoordinator(); - - String memberId = "memberId"; - int generation = 5; - - // Rebalance once to initialize the generation and memberId - mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - expectJoinGroup("", generation, memberId); - expectSyncGroup(generation, memberId); - ensureActiveGroup(generation, memberId); - - // Force a rebalance - coordinator.requestRejoin("Manual test trigger"); - assertTrue(coordinator.rejoinNeededOrPending()); - - ExecutorService executor = Executors.newFixedThreadPool(1); - try { - // Return RebalanceInProgress in syncGroup - int rejoinedGeneration = 10; - expectJoinGroup(memberId, rejoinedGeneration, memberId); - expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId); - Future secondJoin = executor.submit(() -> - coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE))); - - TestUtils.waitForCondition(() -> { - AbstractCoordinator.Generation currentGeneration = coordinator.generation(); - return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId && - currentGeneration.memberId.equals(memberId); - }, 2000, "Generation should be reset"); - - rejoinedGeneration = 20; - expectSyncGroup(rejoinedGeneration, memberId); - mockClient.respond(joinGroupFollowerResponse( - rejoinedGeneration, - memberId, - "leaderId", - Errors.NONE, - PROTOCOL_TYPE - )); - assertTrue(secondJoin.get()); - } finally { - executor.shutdownNow(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); - } - } - @Test public void testRejoinReason() { setupCoordinator(); @@ -639,22 +590,6 @@ private void expectDisconnectInSyncGroup( }, null, true); } - private void expectRebalanceInProgressForSyncGroup( - int expectedGeneration, - String expectedMemberId - ) { - mockClient.prepareResponse(body -> { - if (!(body instanceof SyncGroupRequest)) { - return false; - } - SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data(); - return syncGroupRequest.generationId() == expectedGeneration - && syncGroupRequest.memberId().equals(expectedMemberId) - && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE) - && syncGroupRequest.protocolName().equals(PROTOCOL_NAME); - }, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME)); - } - private void expectDisconnectInJoinGroup( String expectedMemberId ) {