Skip to content

Commit

Permalink
KAFKA-16816: Remove unneeded FencedInstanceId support on commit path …
Browse files Browse the repository at this point in the history
…for new consumer (apache#17559)

Reviewers: Lianet Magrans <lmagrans@confluent.io>
  • Loading branch information
TaiJuWu authored Nov 5, 2024
1 parent c91243a commit ee3cea0
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,8 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
* before successful completion of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync() {
Expand Down Expand Up @@ -916,7 +917,8 @@ public void commitSync() {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(Duration timeout) {
Expand Down Expand Up @@ -964,7 +966,8 @@ public void commitSync(Duration timeout) {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
Expand Down Expand Up @@ -1012,7 +1015,8 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
Expand All @@ -1022,7 +1026,8 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, fin
/**
* Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync() {
Expand All @@ -1045,7 +1050,8 @@ public void commitAsync() {
* (and variants) returns.
*
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync(OffsetCommitCallback callback) {
Expand All @@ -1072,7 +1078,8 @@ public void commitAsync(OffsetCommitCallback callback) {
* @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
* and this instance gets fenced by broker.
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
Expand Down Expand Up @@ -117,7 +116,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -249,7 +247,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final AtomicBoolean asyncCommitFenced;
// Last triggered async commit future. Used to wait until all previous async commits are completed.
// We only need to keep track of the last one, since they are guaranteed to complete in order.
private CompletableFuture<Void> lastPendingAsyncCommit = null;
Expand Down Expand Up @@ -336,7 +333,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata.set(initializeGroupMetadata(config, groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time,
logContext,
Expand Down Expand Up @@ -448,7 +444,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
}

AsyncKafkaConsumer(LogContext logContext,
Expand Down Expand Up @@ -511,7 +506,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
backgroundEventHandler
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
this.asyncCommitFenced = new AtomicBoolean(false);
Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(
time,
logContext,
Expand Down Expand Up @@ -766,10 +760,6 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
}

if (t instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
}

if (callback == null) {
if (t != null) {
log.error("Offset commit with offsets {} failed", offsets, t);
Expand All @@ -786,7 +776,6 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo

private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();

Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
Expand Down Expand Up @@ -1657,7 +1646,6 @@ private void sendPrefetches(Timer timer) {

@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
Expand Down Expand Up @@ -1940,20 +1928,6 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
return kafkaConsumerMetrics;
}

private void maybeThrowFencedInstanceException() {
if (asyncCommitFenced.get()) {
String groupInstanceId = "unknown";
if (!groupMetadata.get().isPresent()) {
log.error("No group metadata found although a group ID was provided. This is a bug!");
} else if (!groupMetadata.get().get().groupInstanceId().isPresent()) {
log.error("No group instance ID found although the consumer is fenced. This is a bug!");
} else {
groupInstanceId = groupMetadata.get().get().groupInstanceId().get();
}
throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId);
}
}

// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,11 +735,6 @@ public void onResponse(final ClientResponse response) {
coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs);
future.completeExceptionally(error.exception());
return;
} else if (error == Errors.FENCED_INSTANCE_ID) {
String fencedError = "OffsetCommit failed due to group instance id fenced: " + groupInstanceId;
log.error(fencedError);
future.completeExceptionally(new CommitFailedException(fencedError));
return;
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
future.completeExceptionally(error.exception());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
Expand Down Expand Up @@ -334,23 +333,6 @@ private static Stream<Exception> commitExceptionSupplier() {
new GroupAuthorizationException("Group authorization exception"));
}

@Test
public void testCommitAsyncWithFencedException() {
consumer = newConsumer();
completeCommitSyncApplicationEventSuccessfully();
final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
MockCommitCallback callback = new MockCommitCallback();

assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));

final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());

assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync());
}

@Test
public void testCommitted() {
time = new MockTime(1);
Expand Down Expand Up @@ -610,52 +592,6 @@ public void testCommitAsyncLeaderEpochUpdate() {
}
}

@Test
public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());

Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitAsync());
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}

@Test
public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());

Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.commitSync());
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}

@Test
public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
final TopicPartition tp = new TopicPartition("foo", 0);
Expand Down Expand Up @@ -739,29 +675,6 @@ private <T> CompletableApplicationEvent<T> addAndGetLastEnqueuedEvent() {
return allValues.get(allValues.size() - 1);
}

@Test
public void testPollTriggersFencedExceptionFromCommitAsync() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);

assertDoesNotThrow(() -> consumer.commitAsync());

Exception e = assertThrows(FencedInstanceIdException.class, () -> consumer.poll(Duration.ZERO));
assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage());
}

@Test
public void testEnsurePollExecutedCommitAsyncCallbacks() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,27 +340,8 @@ public void testCommitSyncRetriedAfterExpectedRetriableException(Errors error) {
assertExceptionHandling(commitRequestManager, error, true);
}

@ParameterizedTest
@MethodSource("commitSyncExpectedExceptions")
public void testCommitSyncFailsWithExpectedException(Errors commitError,
Class<? extends Exception> expectedException) {
CommitRequestManager commitRequestManager = create(false, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));

Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(
new TopicPartition("topic", 1),
new OffsetAndMetadata(0));

// Send sync offset commit that fails and verify it propagates the expected exception.
long deadlineMs = time.milliseconds() + retryBackoffMs;
CompletableFuture<Void> commitResult = commitRequestManager.commitSync(offsets, deadlineMs);
completeOffsetCommitRequestWithError(commitRequestManager, commitError);
assertFutureThrows(commitResult, expectedException);
}

private static Stream<Arguments> commitSyncExpectedExceptions() {
return Stream.of(
Arguments.of(Errors.FENCED_INSTANCE_ID, CommitFailedException.class),
Arguments.of(Errors.UNKNOWN_MEMBER_ID, CommitFailedException.class),
Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()),
Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()),
Expand Down Expand Up @@ -985,10 +966,6 @@ private void assertExceptionHandling(CommitRequestManager commitRequestManager,
case INVALID_COMMIT_OFFSET_SIZE:
assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
break;
case FENCED_INSTANCE_ID:
// This is a fatal failure, so we should not retry
assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
break;
default:
if (errors.exception() instanceof RetriableException && requestShouldBeRetried) {
assertRetryBackOff(commitRequestManager, remainBackoffMs);
Expand Down Expand Up @@ -1279,7 +1256,6 @@ private static Stream<Arguments> offsetCommitExceptionSupplier() {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.STALE_MEMBER_EPOCH),
Arguments.of(Errors.UNKNOWN_MEMBER_ID));
Expand All @@ -1299,7 +1275,6 @@ private static Stream<Arguments> offsetFetchExceptionSupplier() {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.UNKNOWN_MEMBER_ID),
// Adding STALE_MEMBER_EPOCH as non-retriable here because it is only retried if a new
Expand Down

0 comments on commit ee3cea0

Please sign in to comment.