-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17750: Extend kafka-consumer-groups command line tool to support new consumer group (part 1) #17958
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the patch. Could you also please look into whether we can extend ConsumerGroupDescribeRequestTest to validate this change?
@@ -18,7 +18,7 @@ | |||
"type": "request", | |||
"listeners": ["broker"], | |||
"name": "ConsumerGroupDescribeRequest", | |||
"validVersions": "0", | |||
"validVersions": "0-1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a note about version 1 here? Take a look at other requests/responses to see how we usually do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR. Thanks for the reminder.
a98d474
to
be89eba
Compare
@FrankYang0529 Could you please extend tests in |
be89eba
to
cae1ca5
Compare
Sorry, I missed this comment yesterday. I update the PR and CI result looks good. Could you review when you have time? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the update. I left a few more minor comments for consideration.
subscribedTopicNames = List(topicName), | ||
topicPartitions = List.empty, | ||
expectedError = Errors.NONE | ||
).memberId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems that we could remove memberId()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also add isClassic
test for member 2. We need to get the memberId
, so I leave it here. Thanks.
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId() == memberId1) | ||
assertNotNull(member1) | ||
assertFalse(member1.get.isClassic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We usually validate the full response like we did in the other tests. Would it be possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't compare full response for two reasons:
-
It looks like we don't guarantee member order if there are multiple members. If we do assertion for group, it may fail cause of different member order.
Lines 954 to 961 in 9d23f89
members.entrySet(committedOffset).forEach( entry -> describedGroup.members().add( entry.getValue().asConsumerGroupDescribeMember( targetAssignment.get(entry.getValue().memberId(), committedOffset), topicsImage ) ) ); -
For uniform assignor, the target assignment result may change.
So I think we can focus on isClassic
field.
// starting from version 1, there is isClassic field in ConsumerGroupDescribeResponse | ||
for (version <- 1.toShort to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) { | ||
val actual = consumerGroupDescribe( | ||
groupIds = List(groupId), | ||
includeAuthorizedOperations = true, | ||
version = version.toShort, | ||
) | ||
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId() == memberId1) | ||
assertNotNull(member1) | ||
assertTrue(member1.get.isClassic) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could merge this with the previous call for version 0. Then we could use define the expected value for isClassic based on the version. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated it. Thanks for the suggestion.
expectedError = Errors.NONE | ||
) | ||
|
||
// there is no classic member in the group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I have noticed that the style of your comments are a bit inconsistent. It would be great if they could all start with a capital letter and finish with a dot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks for the reminder.
cae1ca5
to
6ab3c02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the update. I left a few nits and one question.
{ "name": "IsClassic", "type": "bool", "versions": "1+", "ignorable": true, | ||
"about": "True for classic member." } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there is one case which is not clear in the KIP. What value will the admin client return when the version 1 is not available? If the new version is not available, we don't know whether the member is classic or not. Hence, we cannot use true or false by default. What's your take on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the RPC layer, everything should function correctly since the new broker does not return the IsClassic
field for version 0.
The primary issue lies in how the Java client distinguishes between "false" and "unknown" values, as it always defaults to "false." To address this, we could add a ConsumerGroupDescribeResponse#version
method that returns the response version. This would allow the caller to differentiate between "false" and "unknown" based on the version.
Additionally, the MemberDescription#isClassic
field also needs to differentiate between "false" and "unknown." One possible solution is to use Optional<Boolean>
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dajac / @chia7712, thanks for the review. It looks like we can implement it in Admin side. If ConsumerGroupDescribeResponse#version
is 0
, set MemberDescription#isClassic
as Optional.empty()
. If ConsumerGroupDescribeResponse#version
is 1
, set MemberDescription#isClassic
as Optional.of(value)
. I change the KIP to align this discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an Optional makes total sense. For the RPC, I was wondering whether we want to make it more explicit in the schema. For instance, we could use a int8 with three values: -1, 0, 1. -1 would be used by default and would mean no provided. Relying on the version also works but it may be easy to miss for people implementing other clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, we could use a int8 with three values: -1, 0, 1. -1 would be used by default and would mean no provided.
While this approach benefits the Java client, it introduces some drawbacks for the Java server and RPC layer:
-
Server Implementation Complexity: The server must pass the correct integer value to
setIsClassic(int)
, which can be easily overlooked by developers implementing the server. -
Documentation Requirements: We need to update the documentation to specify that only 0 and 1 are valid values in RPC version 1 and above. In contrast, using a boolean with version control would be more readable and straightforward for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dajac / @chia7712, I updated the PR to use int8
. Although this approach is more complicated, it provides more information than boolean. For both types, we need to update document to clarify that isClassic
only works for RPC version 1 and above. I think we can focus on whether server complexity is acceptable or not. For me, it's ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer to have a bit more complexity on the server rather than relying on all the developers of clients to do the right thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer to have a bit more complexity on the server rather than relying on all the developers of clients to do the right thing.
I tried to imagine the code for both schemas.
int8
client code
switch (data.isClassic()) {
case 0: return Optional.of(false);
case 1: return Optional.of(true);
default: return Optional.empty();
}
server code
data.setIsClassic(useClassicProtocol ? 1 : 0);
bool
client code
if (version < 1) return Optional.empty();
return Optional.of(data.isClassic());
server
data.setIsClassic(useClassicProtocol);
if I'm imagining correctly, it seems that using a bool could simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 It is not about simplifying the code. It is about ensuring that we have a well defined protocol and that folks implementing the protocol on the client side do the right thing. With the byte, is is explicit in the protocol that the value maybe undefined. With the boolean, they have to think about taking into account the version in that decision. It is really easy to miss it in my opinion. We use sentinels as default values in many places in the protocol for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the response data may be propagated without version information, using the data itself to distinguish "undefined" values is safer. Additionally, I support using int8
.
subscribedTopicNames = List(topicName), | ||
topicPartitions = List.empty, | ||
expectedError = Errors.NONE | ||
).memberId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
.
) | ||
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId() == memberId1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
after members
.
// After version 1, there is isClassic field and it should be true for member 1. | ||
assertEquals(version > 0.toShort, member1.get.isClassic) | ||
|
||
val member2 = group.members().asScala.find(_.memberId() == memberId2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
) | ||
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId() == memberId1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
assertFalse(member1.isEmpty) | ||
assertFalse(member1.get.isClassic) | ||
|
||
val member2 = group.members().asScala.find(_.memberId() == memberId2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
486e3e4
to
26ff139
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks. Overall, LGTM. I left a few nits. @chia7712 Are you OK with the change that we discussed? If so, @FrankYang0529 could you please update the KIP and notify the thread about the change?
) | ||
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId == memberId1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
after members
.
// After version 1, there is isClassic field and it should be true for member 1. | ||
assertEquals(if (version == 0) -1.toByte else 1.toByte, member1.get.isClassic) | ||
|
||
val member2 = group.members().asScala.find(_.memberId == memberId2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
after members
.
) | ||
assertEquals(1, actual.size) | ||
val group = actual.head | ||
val member1 = group.members().asScala.find(_.memberId == memberId1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
after members
.
assertFalse(member1.isEmpty) | ||
assertEquals(if (version == 0) -1.toByte else 0.toByte, member1.get.isClassic) | ||
|
||
val member2 = group.members().asScala.find(_.memberId == memberId2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can omit the ()
after members
.
26ff139
to
9780477
Compare
@@ -69,7 +70,9 @@ | |||
{ "name": "Assignment", "type": "Assignment", "versions": "0+", | |||
"about": "The current assignment." }, | |||
{ "name": "TargetAssignment", "type": "Assignment", "versions": "0+", | |||
"about": "The target assignment." } | |||
"about": "The target assignment." }, | |||
{ "name": "IsClassic", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider renaming it back to "memberType" after the type is changed to int8
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712, I prefer to use IsClassic
because it's more Intuitive. Also, we don't plan to expand the field as protocol
, so I think we can keep it as IsClassic
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to use IsClassic because it's more Intuitive.
I agree that using IsClassic=false
and IsClassic=true
would be more intuitive. However, currently, IsClassic
can take the values -1
, 0
, and 1
. These "magic numbers" are unclear without referring to the documentation.
Additionally, the codebase uses these magic numbers inconsistently across different classes. For example, several classes such as ScramMechanism
, EndpointType
, ResourceType
, and AclOperation
use 0
to represent "unknown" or "none". MemberState
use 127
for unknown.
In summary, it seems to me the name IsClassic
is no longer intuitive after adopting the int8 type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712, thanks for the suggestion. Yes, it's good to follow naming convention. I update both PR and KIP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @chia7712. Small nit. I wonder if we should now use 0 for classic member and 1 for consumer member as classic is the older one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should now use 0 for classic member and 1 for consumer member as classic is the older one.
+1, that order is intuitive to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Updated both PR and KIP.
Hi @dajac, I just sent a mail to vote thread and updated KIP. Thanks. |
9780477
to
8cccd91
Compare
…t new consumer group (part 1) Signed-off-by: PoAn Yang <payang@apache.org>
8cccd91
to
9cab234
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@FrankYang0529 Could you please update the description?
Yes, updated it. Thanks. |
… new consumer group (part 1) (apache#17958) 1) Bump validVersions of ConsumerGroupDescribeRequest.json and ConsumerGroupDescribeResponse.json to "0-1". 2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member. 3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType field as 0. Otherwise, return 1. Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
validVersions
ofConsumerGroupDescribeRequest.json
andConsumerGroupDescribeResponse.json
to"0-1"
.MemberType
field toConsumerGroupDescribeResponse.json
. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member.ConsumerGroupMember#useClassicProtocol
is true, returnMemberType
field as0
. Otherwise, return1
.Committer Checklist (excluded from commit message)