Skip to content

Conversation

@mimaison
Copy link
Member

@mimaison mimaison commented May 21, 2021

This PR implements KIP-699

It updates FindCoordinator request and response to support resolving multiple coordinators at a time. If a broker does not support the new FindCoordinator version, clients can revert to the previous behaviour and use a request for each coordinator.

All methods in Admin that require looking up coordinators have been updated to use the new AdminApiDriver logic.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@skaundinya15
Copy link
Contributor

@mimaison Should be worth noting that there already exists a PR to simplify the ListOffsets API here: #10467, so if that gets merged soon, you may need to rebase your implementation on top of that.

@mimaison
Copy link
Member Author

@skaundinya15 Yeah we may have to rebase but that shouldn't be an issue. I did not touch ListOffsets in this PR, I'm only updating methods that interact with coordinators

@mimaison mimaison force-pushed the kip-699 branch 3 times, most recently from f14f506 to fb65fd3 Compare June 4, 2021 17:30
@mimaison mimaison marked this pull request as ready for review June 4, 2021 17:33
@mimaison
Copy link
Member Author

mimaison commented Jun 4, 2021

@rajinisivaram @tombentley @dajac Can you take a look? Thanks

@dajac
Copy link
Member

dajac commented Jun 5, 2021

@mimaison Could you briefly describe the core changes that you have made in the PR? That would be helpful to dive into it. Thanks!

@mimaison mimaison changed the title KIP-699: Work in progress KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time Jun 6, 2021
@mimaison
Copy link
Member Author

mimaison commented Jun 6, 2021

@dajac Right, I've updated the description to give some context

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first pass with a few questions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some reason why we can't pass the ApiVersions into the coordinator, so we can get the batching right without needing to retry like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only taken a very brief look and I think this approach would work well for Connect, Producer and Consumer, however it's a bit more complicated with Admin.

In Admin, requests are built by lookup strategies. Lookups can be sent to any broker so knowing the max version for a specific call is not completely trivial. That said, it's not impossible either so if there's concensus it would be preferable I can give that a try.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This AdminApiDriver abstraction is pretty new to me, so I might be wide of the mark, but it doesn't appear to handle this very well. The lookup strategy has to build a request without knowing either the broker or the API versions. It would be possible to pass the ApiVersions to the CoordinatorStrategy, which should let you do the right thing based on the minimum of the FindCoordinator API version supported in the whole cluster. (That's not completely perfect, since really you'd want to decide on a per-broker basis, but I think it would be good enough). Sadly it's not quite enough to pass just ApiVersions, since it doesn't really know about the nodes in the cluster, so you'd need to pass Metadata too, which is quite a lot of work. So I can understand why it makes sense to do it like this, since it lets you benefit from the existing logic for figuring out request versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley That's a really good point, perhaps we can file a JIRA in the future to add support to the AdminApiDrive to support passing in versions for different API calls. I'm currently working on KIP-709 where I'm trying to introduce batching to the fetch offsets API, and I'll need to do something similar as Mickael has done here to be able to support all versions, but having some native support to be able to know the version beforehand would be much more desirable.

Copy link
Contributor

@skaundinya15 skaundinya15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mimaison, fantastic job with the refactor - makes so much of the code much more readable! I made a pass through the non test files and left some comments. Will take another pass at the test files soon.

Comment on lines 138 to 152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's a copy paste error, should be AlterConsumerGroupOffsets. Also, when I compare to the handleError() in DescribeConsumerGroupsHandler, it seems to be slightly different:

    private void handleError(CoordinatorKey groupId, Errors error, Map<CoordinatorKey, Throwable> failed, List<CoordinatorKey> unmapped) {
        switch (error) {
            case GROUP_AUTHORIZATION_FAILED:
                log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId,
                        error.exception());
                failed.put(groupId, error.exception());
                break;
            case COORDINATOR_LOAD_IN_PROGRESS:
            case COORDINATOR_NOT_AVAILABLE:
                break;
            case NOT_COORDINATOR:
                log.debug("DescribeGroups request for group {} returned error {}. Will retry",
                        groupId, error);
                unmapped.add(groupId);
                break;
            default:
                log.error("Received unexpected error for group {} in `DescribeGroups` response", 
                        groupId, error.exception());
                failed.put(groupId, error.exception(
                        "Unexpected error during DescribeGroups lookup for " + groupId));
        }
    }

In the AlterConsumerGroupOffsetsHandler we don't break on COORDINATOR_NOT_AVAILABLE, but we do in DescribeConsumerGroupsHandler - any reason for this? I'd imagine the error handling across all the consumer groups handler would be the same. Perhaps we could factor this out and put it in some generic ConsumerGroupHandler class that implements this, and takes in a request name so it can be used across all the consumer groups handlers. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes some of these handlers different in pretty subtle ways. Unfortunately if we remove this break statement, some of the existing tests stop working. In this PR, I've aimed at not touching the exiting tests to ensure my refactoring don't change the current behaviour.

I agree, this is very confusing and I think we should align behaviour for all these similar calls and have a generic error handling logic. But I would prefer to not do it as part of this PR as it's already pretty big.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison makes sense, it would be good to file a JIRA for this so we can address it in a future PR to ensure we have consistent error handling across all consumer group related issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we consider something like COORDINATOR_NOT_AVAILABLE a transient exception that we should retry on? I'm a bit confused since we don't log anything or do a break on this case. In either case, I think we should log + put it in the failedKeys map if we think that that kind of exception should be considered a failure, or let it be if we think it should be a more transient error.

Copy link
Member Author

@mimaison mimaison Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the current behaviour, when hitting these errors, we retry the find coordinator request. The way to signal we want to retry is to omit the key in the return value of handleResponse(), see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java#L71-L74

So I think we want to keep it as it is.

Copy link
Contributor

@skaundinya15 skaundinya15 Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison Gotcha, makes sense. Can we still we can still log at DEBUG level? In case we need to debug, we have evidence of this in the logs.

@mimaison
Copy link
Member Author

@skaundinya15 Thanks for the review. I believe I've addressed all your comments.

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good a few more nits and questions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessarily a broker, or could it be a kraft controller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure about the new naming in place now, but does that still count as a broker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well process.roles is documented like this in config/kraft/README.md:

  • If process.roles is set to broker, the server acts as a broker in KRaft mode.
  • If process.roles is set to controller, the server acts as a controller in KRaft mode.
  • If process.roles is set to broker,controller, the server acts as both a broker and a controller in KRaft mode.

which suggests to me that we're using the term "broker" to mean "a thing which handles Produce and Fetch etc". (However, "controller" is a bit confusing there, since while we might have several servers in the "controller" role only one will be the controller at any one time. I'm not aware of a good term for "server that is participating in the raft cluster, but might not be the current controller right now").

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for the clarifications. I feel like broker is still fine here. Otherwise, maybe node?

@mimaison
Copy link
Member Author

@tombentley Thanks for the review! I've addressed your findings

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I have made a pass over it. Overall, it looks good. A small suggestion: It would be great if we could unify the formatting style. For instance, the style and the indentation of the method declarations is not consistent accros the board.

@mimaison
Copy link
Member Author

Thanks @dajac for the review, I've addressed your comments

Copy link
Contributor

@skaundinya15 skaundinya15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @mimaison! Just left a couple minor comments, other than that LGTM! Really excited for this patch to make it in :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call the prepareOldResponse in getErrorResponse? Should we be doing a version check here?

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison Thanks for the updates. I left a few more comments/questions.

Comment on lines 61 to 65
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if differentiating the two is required here. It seems that we don't use them anywhere else so we could use one common REQUEST_SCOPE for both cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I merged both as BATCH_REQUEST_SCOPE

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, this new exception was not specified in the KIP. Should we update it and notify the thread in the mailing list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'll update the KIP and thread

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a mistake to add this to the public errors package when it should never propagate outside the client. An alternative could to be make it a static member class of FindCoordinatorRequest, or put it in some other package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was also thinking about this. It is definitely better if we could keep it private.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I'll move it to an inner class of FindCoordinatorRequest and update the KIP accordingly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss this further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a change that moves NoBatchedFindCoordinatorsException into FindCoordinatorRequest. Do you have any further concerns?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's fine. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should catch UnsupportedVersionException here to handle the general case. Have you considered it? Also, the name uble looks weird. Is it intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I looked into it but I think there's 1 test in TransactionManagerTest that expects UnsupportedVersionException to be thrown.
MockClient works in slightly different ways than the real client and it would be good to address this but I'd rather defer to a follow up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Should we file a Jira about this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison
Copy link
Member Author

@tombentley @dajac @skaundinya15 Thanks for the reviews! I believe I've addressed all your comments now.

Copy link
Contributor

@skaundinya15 skaundinya15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few very minor nit comments, other than that this looks great to me!

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I left a few more questions/comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird to handle a special case like this one in the driver. It is probably OK for the time being but we don't want to add more custom cases like this in the driver, I think. I wonder if we could delegate the decision to the handler. We could add an handleUnsupportedVersionException method to the AdminApiHandler for this purpose to delegate the decision. That method could basically return the keys to unmap and the keys to complete with the exception.

An alternative would be to rely on the the handleUnsupportedVersionException method in Call. The driver could also implement it and still delegate the decision to the handler. The advantage of using this method is that downgrade would not be counted as failures and thus does not count for the retries.

Have you considered something like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdminApiDriver was still evolving rapidly while I was implementing this KIP so I went for the straighforward approach.

But I agree, it would be best to avoid this type of logic here. The goal would be to find a mechanism that works for all clients. @tombentley suggested an alternative option in #10743 (comment).

I've not had the time to look into better alternatives yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file a Jira to not forget about improving this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation seems off here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to discuss this further.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we add the KIP number here? I would also explain a bit more the change. The comment you have in the response is much better for instance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we add the KIP number here?

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @mimaison Thanks for the PR! Could you file the follow-up Jiras that have been discussed in the comments? I have spotted two remaining ones: 1) one to address the error handling in the driver; 2) one to armonize the error handling in the handlers.

@mimaison
Copy link
Member Author

@dajac Thanks, I opened JIRAs for all follow up work items we identified.

@mimaison
Copy link
Member Author

mimaison commented Jul 1, 2021

I've rebased on trunk to resolve conflicts and I've had to make small changes due to updates on trunk. All new changes are in ae09d74.

I'll let Jenkins run now and I'll merge later if there's no issues.

@mimaison mimaison merged commit f5d5f65 into apache:trunk Jul 1, 2021
@mimaison mimaison deleted the kip-699 branch July 1, 2021 21:05
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
apache#10743)

This implements KIP-699: https://cwiki.apache.org/confluence/display/KAFKA/KIP-699%3A+Update+FindCoordinator+to+resolve+multiple+Coordinators+at+a+time

It updates FindCoordinator request and response to support resolving multiple coordinators at a time. If a broker does not support the new FindCoordinator version, clients can revert to the previous behaviour and use a request for each coordinator.

Reviewers: David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>, Sanjana Kaundinya <skaundinya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants