Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManagerTest).java"/>
files="(RecordHelpersTest|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
Expand Down Expand Up @@ -369,9 +370,31 @@ public CompletableFuture<HeartbeatResponseData> heartbeat(
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}

return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new HeartbeatResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code()));
}

// Using a read operation is okay here as we ignore the last committed offset in the snapshot registry.
// This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves.
return runtime.scheduleReadOperation("generic-group-heartbeat",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a comment explaining why using a read operation is fine here?

topicPartitionFor(request.groupId()),
(coordinator, __) -> coordinator.genericGroupHeartbeat(context, request)
).exceptionally(exception -> {
if (!(exception instanceof KafkaException)) {
log.error("Heartbeat request {} hit an unexpected exception: {}",
request, exception.getMessage());
}

if (exception instanceof CoordinatorLoadInProgressException) {
// The group is still loading, so blindly respond
return new HeartbeatResponseData()
.setErrorCode(Errors.NONE.code());
}

return new HeartbeatResponseData()
.setErrorCode(Errors.forException(exception).code());
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupRequestData;
Expand Down Expand Up @@ -90,6 +94,7 @@
import java.util.stream.Collectors;

import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
Expand Down Expand Up @@ -2835,6 +2840,77 @@ private void removePendingSyncMember(
}
}

/**
* Handle a generic group HeartbeatRequest.
*
* @param context The request context.
* @param request The actual Heartbeat request.
*
* @return The Heartbeat response.
*/
public HeartbeatResponseData genericGroupHeartbeat(
RequestContext context,
HeartbeatRequestData request
) {
GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false);

validateGenericGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId());

switch (group.currentState()) {
case EMPTY:
return new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());

case PREPARING_REBALANCE:
rescheduleGenericGroupMemberHeartbeat(group, group.member(request.memberId()));
return new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code());

case COMPLETING_REBALANCE:
case STABLE:
// Consumers may start sending heartbeats after join-group response, while the group
// is in CompletingRebalance state. In this case, we should treat them as
// normal heartbeat requests and reset the timer
rescheduleGenericGroupMemberHeartbeat(group, group.member(request.memberId()));
return new HeartbeatResponseData();

default:
throw new IllegalStateException("Reached unexpected state " +
group.currentState() + " for group " + group.groupId());
}
}

/**
* Validates a generic group heartbeat request.
*
* @param group The group.
* @param memberId The member id.
* @param groupInstanceId The group instance id.
* @param generationId The generation id.
*
* @throws CoordinatorNotAvailableException If group is Dead.
* @throws IllegalGenerationException If the generation id in the request and the generation id of the
* group does not match.
*/
private void validateGenericGroupHeartbeat(
GenericGroup group,
String memberId,
String groupInstanceId,
int generationId
) throws CoordinatorNotAvailableException, IllegalGenerationException {
if (group.isInState(DEAD)) {
throw COORDINATOR_NOT_AVAILABLE.exception();
} else {
group.validateMember(
memberId,
groupInstanceId,
"heartbeat"
);

if (generationId != group.generationId()) {
throw ILLEGAL_GENERATION.exception();
}
}
}

/**
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
Expand Down Expand Up @@ -249,6 +251,24 @@ public CoordinatorResult<Void, Record> genericGroupSync(
);
}

/**
* Handles a generic group HeartbeatRequest.
*
* @param context The request context.
* @param request The actual Heartbeat request.
*
* @return The HeartbeatResponse.
*/
public HeartbeatResponseData genericGroupHeartbeat(
RequestContext context,
HeartbeatRequestData request
) {
return groupMetadataManager.genericGroupHeartbeat(
context,
request
);
}

/**
* Handles a OffsetCommit request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
Expand Down Expand Up @@ -495,4 +499,100 @@ public void testSyncGroupInvalidGroupId() throws Exception {

assertEquals(expectedResponse, response.get());
}

@Test
public void testHeartbeat() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("generic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new HeartbeatResponseData()
));

CompletableFuture<HeartbeatResponseData> future = service.heartbeat(
requestContext(ApiKeys.HEARTBEAT),
request
);

assertTrue(future.isDone());
assertEquals(new HeartbeatResponseData(), future.get());
}

@Test
public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("generic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new CoordinatorLoadInProgressException(null)
));

CompletableFuture<HeartbeatResponseData> future = service.heartbeat(
requestContext(ApiKeys.HEARTBEAT),
request
);

assertTrue(future.isDone());
assertEquals(new HeartbeatResponseData(), future.get());
}

@Test
public void testHeartbeatCoordinatorException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

HeartbeatRequestData request = new HeartbeatRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("generic-group-heartbeat"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new RebalanceInProgressException()
));

CompletableFuture<HeartbeatResponseData> future = service.heartbeat(
requestContext(ApiKeys.HEARTBEAT),
request
);

assertTrue(future.isDone());
assertEquals(
new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()),
future.get()
);
}
}
Loading