Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,6 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
// consumer didn't get assignment in this generation, so we need to reset generation
// to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance
resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has changed we would not expect the instance id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -504,54 +503,6 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
ensureActiveGroup(rejoinedGeneration, memberId);
}

@Test
public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException {
setupCoordinator();

String memberId = "memberId";
int generation = 5;

// Rebalance once to initialize the generation and memberId
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
expectJoinGroup("", generation, memberId);
expectSyncGroup(generation, memberId);
ensureActiveGroup(generation, memberId);

// Force a rebalance
coordinator.requestRejoin("Manual test trigger");
assertTrue(coordinator.rejoinNeededOrPending());

ExecutorService executor = Executors.newFixedThreadPool(1);
try {
// Return RebalanceInProgress in syncGroup
int rejoinedGeneration = 10;
expectJoinGroup(memberId, rejoinedGeneration, memberId);
expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId);
Future<Boolean> secondJoin = executor.submit(() ->
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));

TestUtils.waitForCondition(() -> {
AbstractCoordinator.Generation currentGeneration = coordinator.generation();
return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId &&
currentGeneration.memberId.equals(memberId);
}, 2000, "Generation should be reset");

rejoinedGeneration = 20;
expectSyncGroup(rejoinedGeneration, memberId);
mockClient.respond(joinGroupFollowerResponse(
rejoinedGeneration,
memberId,
"leaderId",
Errors.NONE,
PROTOCOL_TYPE
));
assertTrue(secondJoin.get());
} finally {
executor.shutdownNow();
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
}

@Test
public void testRejoinReason() {
setupCoordinator();
Expand Down Expand Up @@ -639,22 +590,6 @@ private void expectDisconnectInSyncGroup(
}, null, true);
}

private void expectRebalanceInProgressForSyncGroup(
int expectedGeneration,
String expectedMemberId
) {
mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
return syncGroupRequest.generationId() == expectedGeneration
&& syncGroupRequest.memberId().equals(expectedMemberId)
&& syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME));
}

private void expectDisconnectInJoinGroup(
String expectedMemberId
) {
Expand Down