-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-12434; Admin support for DescribeProducers API
#10275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for this patch. I will give another look tomorrow as I need more time to study this patch :)
BTW, do you plan to apply the new mechanism to other requests?
clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersOptions.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersOptions.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersOptions.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersOptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change the modifier from public to package-private? Exposing this constructor to users is useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need it public because the handler is in admin.internals. This might be a pattern that has to give way if we want to externalize some of the admin logic rather than packing it all into KafkaAdminClient. I'll try to consider some other options.
I am not sure I would call it "useless" by the way. It is helpful when you need to mock the admin client. Of course users can always mock PartitionProducerState as well, though it gets tedious for property classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree it is useful for testing but he public constructors need release cycles to deprecate. If we want to add more data in the future, the (exposed) public constructor can make trouble to us.
I need it public because the handler is in admin.internals. This might be a pattern that has to give way if we want to externalize some of the admin logic rather than packing it all into KafkaAdminClient. I'll try to consider some other options.
I like to use annotation @InterfaceStability.Evolving if we all agree that it give us "rights" to break APIs in minor release. For example, add @InterfaceStability.Evolving to the public constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is actually a KIP debating this tradeoff: https://cwiki.apache.org/confluence/display/KAFKA/KIP-692%3A+Make+AdminClient+value+object+constructors+public. Personally, I am a bit conservative on this one for the reason mentioned by @chia7712.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just notice that there is a class AdminClientTestUtils which is used to create those results for testing. Maybe we can change the modifier from public to package-private and then add a helper method to AdminClientTestUtils to create PartitionProducerState/DescribeProducersResult if test code needs it.
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
|
@chia7712 Thanks for the comments. My initial goal is just the KIP-664 APIs since they cover most (maybe even all) of the complex admin workflows. After that, if we like the new pattern, I think it would be good to convert at least all the APIs which have similarly complicated logic (such as all of the group coordinator APIs). Even the simpler APIs could probably benefit by externalizing the logic out of |
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for updating code. I'd like this approve of handling complex workflows :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It groups values by hash code of key. Hence, callers must implements the hash code for key type. Otherwise, it can't group values as default hash is memory-based. Is it error-prone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I follow the point. My expectation is that both key and value types have reasonable hashCode/equals implementations. Would it help if I document that expectation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My expectation is that both key and value types have reasonable hashCode/equals implementations. Would it help if I document that expectation?
I feel the document is good as the requisite is not obvious. BTW, what will happen if custom key/value type do not offer hashCode/equals implementations? performance regression or a quick failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is reasonable to assume that both key and value types have a reasonable hashCode/equals implementations in this case.
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
Outdated
Show resolved
Hide resolved
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for the PR! I just made a first past over it. My intent was to mainly understand the mechanics of the pattern that you propose. Overall, I like it. The major downside is that it brings a bit of complexity. I have been thinking about alternatives and I haven't really found any that would be as complete and that are simpler.
I have left few comments while reading the code. I will make another pass over it later this week (I need to sleep over it :) ).
clients/src/main/java/org/apache/kafka/clients/admin/DescribeProducersOptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is actually a KIP debating this tradeoff: https://cwiki.apache.org/confluence/display/KAFKA/KIP-692%3A+Make+AdminClient+value+object+constructors+public. Personally, I am a bit conservative on this one for the reason mentioned by @chia7712.
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is reasonable to assume that both key and value types have a reasonable hashCode/equals implementations in this case.
|
@dajac Thanks for reviewing. I think you have to compare the complexity with (for example) the |
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Yeah, I totally agree with your point. I really like your approach now that I understand it better. I don't think that we could handle re-coalescing of requests differently. I have left few minor comments/questions.
I also looked at the tests and they are really good. Well done!
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you decided to put this interface here because it is mainly used by the AdminApiLookupStrategy? Intuitively, I would have placed it in the driver because it is used by few other classes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about it. I wasn't entirely happy with this location either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to pull this up to the top level. Let me know if that seems ok.
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
Outdated
Show resolved
Hide resolved
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji thanks for this patch and it is a great patch to me. just leave some trivial comments.
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
Show resolved
Hide resolved
chia7712
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall +1
| */ | ||
| private class RequestState { | ||
| private Optional<RequestSpec<K>> inflightRequest = Optional.empty(); | ||
| private int tries = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this tries? We already trace the tries of Call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lookup requests can often result in ping-pong behavior. For example, we use metadata to find partition leader, then we send a request to the leader and get a NOT_LEADER error, which sends us back to the metadata request, and so on. I was not too concerned that the number of tries would be reflected perfectly (as I noted in the javadoc, the behavior is ambigious for these cases), but I wanted to at least ensure that the count continued growing. So if a user does configure a retries limit, then eventually we'll hit it even in these ping-pong scenarios if we cannot satisfy the request.
clients/src/main/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategy.java
Outdated
Show resolved
Hide resolved
dajac
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for the PR. LGTM! I think that it is good to be merged. We will refine it, if necessary, as we use it. I just left one minor comment.
clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminApiDriverTest.java
Show resolved
Hide resolved
|
+1 to this nice patch. It would be great to rewrite other requests by this new infra. @hachikuji Have you opened jira as follow-up? I would like to take over some tickets :) |
|
Merging to trunk. @chia7712 Yeah, we should create some jiras. I was going to wait until the other use cases in KIP-664 had been fleshed out, but I guess there's no harm converting other APIs. The ListOffsets API would be a good place to start. I'll create a JIRA tomorrow if you don't get there first. |
This patch adds the new `Admin` API to describe producer state as described by KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. The three new APIs added by KIP-664 require different lookup and request patterns: - DescribeProducers: send to partition leaders - DescribeTransactions: send to coordinators - ListTransactions: send to all brokers Our method of handling complex workflows such as these in `KafkaAdminClient` by chaining together `Call` instances has been clumsy and error-prone at best. I have attempted to introduce a new pattern which separates the lookup stage (e.g. finding partition leaders) from the fulfillment stage (e.g. sending `DescribeProducers`). The lookup stage is implemented by `AdminApiLookupStrategy` and the fulfillment stage is implemented by `AdminApiHandler`. There is a new class `AdminApiDriver` which manages the bookkeeping for these two stages. See the corresponding javadocs for more detail. This PR provides an example of usage through `DescribeProducersHandler`, which is an implementation of `AdminApiHandler`. It relies on `PartitionLeaderStrategy` which implements `AdminApiLookupStrategy`. In addition to allowing for easier reuse of lookup strategies, this approach provides a more convenient way for testing since all of the logic is not crammed into `KafkaAdminClient`. Follow-up PRs for the rest of KIP-664 will flesh out additional lookup strategies such as for coordinator APIs. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
I created a JIRA and a PR #10467 since you haven't done this.😄 |
This patch adds the new
AdminAPI to describe producer state as described by KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.The three new APIs added by KIP-664 require different lookup and request patterns:
Our method of handling complex workflows such as these in
KafkaAdminClientby chaining togetherCallinstances has been clumsy and error-prone at best. I have attempted to introduce a new pattern which separates the lookup stage (e.g. finding partition leaders) from the fulfillment stage (e.g. sendingDescribeProducers). The lookup stage is implemented byAdminApiLookupStrategyand the fulfillment stage is implemented byAdminApiHandler. There is a new classAdminApiDriverwhich manages the bookkeeping for these two stages. See the corresponding javadocs for more detail.This PR provides an example of usage through
DescribeProducersHandler, which is an implementation ofAdminApiHandler. It relies onPartitionLeaderStrategywhich implementsAdminApiLookupStrategy. One of the benefits of this approach is that it provides a more convenient way for testing since all of the logic is not crammed intoKafkaAdminClient.In the following PRs to implement the other two APIs introduced by KIP-664, I will also provide a coordinator lookup strategy and an "all broker" lookup strategy. You can get an idea of the usage from the previously closed PR: #9268.
Committer Checklist (excluded from commit message)