Skip to content

Conversation

@showuon
Copy link
Member

@showuon showuon commented Jul 12, 2021

Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error

old handlResponse logic:

void handleResponse(AbstractResponse abstractResponse) {
      final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;

      // If coordinator changed since we fetched it, retry
      if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) {
          Call call = getDeleteConsumerGroupsCall(context);
          rescheduleFindCoordinatorTask(context, () -> call, this);
          return;
      }

      final Errors groupError = response.get(context.groupId());
      if (handleGroupRequestError(groupError, context.future()))
          return;

      context.future().complete(null);
  }

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Member Author

showuon commented Jul 12, 2021

@dajac , please take a look. Thanks.

}
return new ApiResult<>(completed, failed, unmapped);

if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems incorrect to do this here. We were able to do so in the other because they were expecting only one group at the time. This one is different. The driver will retry if the group is not completed nor failed. It seems to me that we could keep the existing code, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! Updated.

case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
log.error("Received non retriable failure for group {} in `{}` response", groupId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also try to uniformize the logs and would use debug all the time except for the unexpected errors.

}

}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we revert this?

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Thanks for the update. I left few minor comments and one question.

In the end, the handling of COORDINATOR_NOT_AVAILABLE is the only main difference in this PR. Should we reflect this in the title perhaps?

case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We should use groupId.idValue here and in the others.

case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DeleteConsumerGroups` request for group {} failed because the coordinator " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group -> group id?

unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DeleteConsumerGroups` request for group {} returned error {}. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group -> group id?

Comment on lines -3189 to -3196
final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
errorResponse1.add(new DeletableGroupResult()
.setGroupId("groupId")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(errorResponse1)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we moving this to later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is testing "retriable" errors should be retried. Before the change, COORDINATOR_NOT_AVAILABLE is considered as retriable error. But after this PR, it'll considered as unmapped error, so it is moved to later, to test when receiving the error, we should re-find coordinator, and then re-send request.

@showuon showuon changed the title KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests KAFKA-13062: Make DeleteConsumerGroupsHandler unmap for COORDINATOR_NOT_AVAILABLE error Jul 14, 2021
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dajac
Copy link
Member

dajac commented Jul 15, 2021

Failures are not related:

Build / JDK 16 and Scala 2.13 / shouldBeAbleToQueryFilterState – org.apache.kafka.streams.integration.QueryableStateIntegrationTest
43s
Build / JDK 11 and Scala 2.13 / remoteCloseWithoutBufferedReceives() – kafka.network.SocketServerTest
<1s
Build / JDK 11 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest
36s
Build / JDK 8 and Scala 2.12 / shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() – kafka.server.epoch.LeaderEpochIntegrationTest
36s
Build / JDK 8 and Scala 2.12 / shouldInnerJoinMultiPartitionQueryable – org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest

@dajac dajac merged commit f7cf4a4 into apache:trunk Jul 15, 2021
dajac pushed a commit that referenced this pull request Jul 15, 2021
…OT_AVAILABLE error (#11021)

This patch improve the error handling in `DeleteConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
@dajac
Copy link
Member

dajac commented Jul 15, 2021

Merged to trunk and to 3.0. cc @kkonstantine

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…OT_AVAILABLE error (apache#11021)

This patch improve the error handling in `DeleteConsumerGroupsHandler` and ensure that `COORDINATOR_NOT_AVAILABLE` is unmapped in order to look up the coordinator again.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants