-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error when describing consumer groups #2244
Comments
@danielli-ziprecruiter hmm I'm unable to reproduce this using v1.34.0 against either kafka 2.4.1 or kafka 3.1.0 backends and V2_4_1_0 for config.Version. Can you confirm what config.Version you were using and if there's anything special about your consumer groups — are they all Sarama-based, are they a mixture of Java+Sarama etc. I was using the following code that I happened to have lying around to describe consumer groups and output the details — can you compare to your code to see if you are doing anything differently? desc, err := admin.DescribeConsumerGroups(groups)
if err != nil {
log.Fatal(err)
}
sort.Slice(desc, func(i, j int) bool {
return desc[i].GroupId < desc[j].GroupId
})
for _, details := range desc {
if len(details.Members) == 0 {
continue
}
fmt.Println("########################################################################")
fmt.Printf("#### Group: %s\n", details.GroupId)
if !strings.EqualFold(details.State, "stable") {
fmt.Printf("⚠️⚠️⚠️⚠️ State: %s\n", details.State)
}
for memberID, desc := range details.Members {
var metadata *sarama.ConsumerGroupMemberMetadata
if len(desc.MemberMetadata) > 0 {
metadata, err = desc.GetMemberMetadata()
if err != nil {
log.Printf("failed to get member metadata - got %v", err)
os.Exit(1)
continue
}
}
var assignment *sarama.ConsumerGroupMemberAssignment
if len(desc.MemberAssignment) > 0 {
assignment, err = desc.GetMemberAssignment()
if err != nil {
log.Printf("failed to get member assignment - got %v", err)
os.Exit(1)
continue
}
}
fmt.Printf("\tMember: %s @ %s (%s)\n", desc.ClientId, desc.ClientHost, memberID)
if assignment == nil || metadata == nil {
fmt.Printf("\t\t <no assigned topic partitions>\n")
continue
}
sort.Strings(metadata.Topics)
for _, topic := range metadata.Topics {
if strings.TrimSpace(topic) == "" {
continue
}
partitions, ok := assignment.Topics[topic]
if !ok {
fmt.Printf("\t\tTopic: %s\tPartitions: []\n", topic)
continue
}
sort.Slice(partitions, func(i, j int) bool {
return partitions[i] < partitions[j]
})
fmt.Printf("\t\tTopic: %s\tPartitions: %s\n", topic, strings.Join(strings.Fields(fmt.Sprint(partitions)), ","))
}
}
fmt.Println()
fmt.Println("########################################################################")
fmt.Println()
} |
@dnwe We have a mix of consumer groups (Sarama, Kafka Connect, AWS Canary). I have done some more testing and believe this occurs when attempting to describe multiple consumer groups. We are passing in a slice of about 10 consumer groups. I can describe each consumer group individually, but get this error when attempting to describe 2 or 3 groups (depending on the combination of consumer groups). |
eh.. seems to be a bug. i can work on a fix for it. |
@danielli-ziprecruiter please can you test 1.34.1? |
It's working now for me with 1.34.1. Thank you! |
Thanks for reporting! |
Versions
Logs
kafka: insufficient data to decode packet, more bytes expected
Problem Description
Upgraded from v1.33.0 to v1.34.0. Now when doing
admin.DescribeConsumerGroups
, it get the following errorinsufficient data to decode packet, more bytes expected
. I believe this issue was introduced in 59a3d39.The text was updated successfully, but these errors were encountered: