Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: Add some missing admin methods #1178

Merged
merged 4 commits into from
Feb 8, 2019
Merged

admin: Add some missing admin methods #1178

merged 4 commits into from
Feb 8, 2019

Conversation

birdayz
Copy link
Contributor

@birdayz birdayz commented Sep 27, 2018

This PR adds the following admin client methods:
DescribeTopic
DescribeConsumerGroup
ListConsumerGroups
ListConsumerGroupOffsets

For ListConsumerGroups, all brokers are queried (the java client does it the same way). Since this will be super slow with multiple brokers, i made an implementation with parallel requests.
The others methods are rather straight forward.
I'll be happy about comments on the API, is it fine?
I'm not 100% sure - for the groups i chose it to be 'singular' - each method call is for one consumer group. For topics it's a slice of topics as parameter - in my case (building a kafka cli tool) it's very often the case that multiple topics are requested, so i kept it this way.

In #1155 (another addition to the admin client) i wondered how the API should look like. I kept it simple here. and reused types from the response messages.

I also added mock tests for these methods.

@birdayz
Copy link
Contributor Author

birdayz commented Sep 28, 2018

i renamed it to DescribeTopics, so it's consistent with its parameters and return value.

@birdayz
Copy link
Contributor Author

birdayz commented Oct 14, 2018

@bai can you have a look?

@jeffwidman
Copy link
Contributor

jeffwidman commented Jan 4, 2019

I'm just checking out Sarama for the first time, so not sure how closely you try to follow the Java reference clients. However, I added these same methods to kafka-python, so had one design comment based on having already played with the kafka protocol here:

  • A common scenario in monitoring situations is to fetch the offsets/calculate consumer lag for many/all consumer groups in the cluster.
  • ListConsumerGroupOffsets is a very chatty call because it will first lookup the group's coordinator. So it requires two network round trips.
  • However, when listing consumer groups, if a broker returns a consumer group, you know it's the coordinator.
  • If you allow ListConsumerGroups to have an optional param of specifying a single broker and ListConsumerGroupOffsets to have an optional param of specifying a known group coordinator than you can leverage this to nearly halve the number of network calls required to fetch offsets for all consumer groups:
    1. Loop through all brokers, calling ListConsumerGroups against each one.
    2. For the consumer groups returned, call ListConsumerGroupOffsets, using the group coordinator identified above.

Java doesn't currently allow these overrides, but they were useful enough that we chose to include them in kafka-python, even though in general we try to follow the Java clients very closely: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafka.admin.KafkaAdminClient.list_consumer_group_offsets

You can see how I then leveraged this when patching the Datadog check here:
https://github.com/DataDog/integrations-core/pull/2730/files#diff-deed983f73a04522ac621d7f6d0c8403R248

Note that I'm a total n00b to Go, so when I didn't find these overrides, it may be simply my misreading of the code and they may already be there...

@birdayz
Copy link
Contributor Author

birdayz commented Jan 20, 2019

@bai Sorry for being annoying, but how could we move this forward? Or is this feature not desirable in sarama?
@jeffwidman i'm happy to discuss the specifics of an implementation if this going anywhere, especially since some aspects of the java reference implementation of the admin client are rather unlucky, in my opinion (thus, it makes sense to do some things different)

@bai bai requested a review from varun06 January 21, 2019 03:21
admin.go Outdated
ListConsumerGroups() (map[string]string, error)

// Describe some group IDs in the cluster.
DescribeConsumerGroup(group string) (*GroupDescription, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update the comment, it is not very clear here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

admin.go Outdated
@@ -380,3 +417,87 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
}
return mAcls, nil
}

func (ca *clusterAdmin) DescribeConsumerGroup(group string) (*GroupDescription, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it works better as DescribeConsumerGroups(groups []string), coz controller takes a slice of string anyway, and that way if you only want it for one group, you can only pass that too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree. i now improved it to accept a slice of group names. in addition, the logic is now quite smart to first build a map of brokers -> []string groups, so each group coordinator is contacted only once. in a trivial test / cluster with just 8 groups this saves me a second (50% savings)..which will have much a bigger impact on larger clusters.

wg.Add(1)
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do

Suggested change
_ = b.Open(conf) // Ensure that broker is opened
b.Open(conf) // Ensure that broker is opened

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can, but then we don't document that there's an error (deliberately) ignored here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact errcheck will fail travis if i do what you suggest.. https://api.travis-ci.org/v3/job/490310704/log.txt i reverted that.

@ghost ghost added cla-needed and removed cla-needed labels Feb 7, 2019
@birdayz
Copy link
Contributor Author

birdayz commented Feb 7, 2019

@dnwe i added your commits of your PR #1155 ..since my work is based on that / related. since your PR is also pending and i require it as well, do you mind pushing this forward together? would be nice if we could get this finally merged :) it's becoming quite tedious to maintain my own fork for so long :(
if it's not OK, i can remove it from my PR.

@birdayz
Copy link
Contributor Author

birdayz commented Feb 8, 2019

Ci failed with a strange error:

# cd .; git clone https://go.googlesource.com/tools /home/travis/gopath/src/golang.org/x/tools
Cloning into '/home/travis/gopath/src/golang.org/x/tools'...
fatal: remote error: Access denied to IP 35.188.1.99
package golang.org/x/tools/go/packages: exit status 128

how can i trigger a re-run?

DescribeCluster
DescribeTopic
DescribeConsumerGroups
ListConsumerGroups
ListConsumerGroupOffsets
@varun06
Copy link
Contributor

varun06 commented Feb 8, 2019 via email

@birdayz
Copy link
Contributor Author

birdayz commented Feb 8, 2019

great. it's green now :)

@bai
Copy link
Contributor

bai commented Feb 8, 2019

Thanks!

@bai
Copy link
Contributor

bai commented Feb 8, 2019

This is lovely work 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants