Skip to content

Commit

Permalink
Merge pull request #1055 from chandradeepak/master
Browse files Browse the repository at this point in the history
ClusterAdmin Support
  • Loading branch information
eapache authored Jun 15, 2018
2 parents 9f7c4d9 + 3202295 commit 46cf3e2
Show file tree
Hide file tree
Showing 4 changed files with 1,069 additions and 6 deletions.
375 changes: 375 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,375 @@
package sarama

import "errors"

// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
// Methods with stricter requirements will specify the minimum broker version required.
// You MUST call Close() on a client to avoid leaks
type ClusterAdmin interface {
// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
// It may take several seconds after CreateTopic returns success for all the brokers
// to become aware that the topic has been created. During this time, listTopics
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

// Delete a topic. It may take several seconds after the DeleteTopic to returns success
// and for all the brokers to become aware that the topics are gone.
// During this time, listTopics may continue to return information about the deleted topic.
// If delete.topic.enable is false on the brokers, deleteTopic will mark
// the topic for deletion, but not actually delete them.
// This operation is supported by brokers with version 0.10.1.0 or higher.
DeleteTopic(topic string) error

// Increase the number of partitions of the topics according to the corresponding values.
// If partitions are increased for a topic that has a key, the partition logic or ordering of
// the messages will be affected. It may take several seconds after this method returns
// success for all the brokers to become aware that the partitions have been created.
// During this time, ClusterAdmin#describeTopics may not return information about the
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error

// Get the configuration for the specified resources.
// The returned configuration includes default values and the Default is true
// can be used to distinguish them from user supplied values.
// Config entries where ReadOnly is true cannot be updated.
// The value of config entries where Sensitive is true is always nil so
// sensitive information is not disclosed.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)

// Update the configuration for the specified resources with the default options.
// This operation is supported by brokers with version 0.11.0.0 or higher.
// The resources with their configs (topic is the only resource type with configs
// that can be updated currently Updates are not transactional so they may succeed
// for some resources while fail for others. The configs for a particular resource are updated automatically.
AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

// Creates access control lists (ACLs) which are bound to specific resources.
// This operation is not transactional so it may succeed for some ACLs while fail for others.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
CreateACL(resource Resource, acl Acl) error

// Lists access control lists (ACLs) according to the supplied filter.
// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
// This operation is supported by brokers with version 0.11.0.0 or higher.
ListAcls(filter AclFilter) ([]ResourceAcls, error)

// Deletes access control lists (ACLs) according to the supplied filters.
// This operation is not transactional so it may succeed for some ACLs while fail for others.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

// Close shuts down the admin and closes underlying client.
Close() error
}

type clusterAdmin struct {
client Client
conf *Config
}

// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}

//make sure we can retrieve the controller
_, err = client.Controller()
if err != nil {
return nil, err
}

ca := &clusterAdmin{
client: client,
conf: client.Config(),
}
return ca, nil
}

func (ca *clusterAdmin) Close() error {
return ca.client.Close()
}

func (ca *clusterAdmin) Controller() (*Broker, error) {
return ca.client.Controller()
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {

if topic == "" {
return ErrInvalidTopic
}

if detail == nil {
return errors.New("You must specify topic details")
}

topicDetails := make(map[string]*TopicDetail)
topicDetails[topic] = detail

request := &CreateTopicsRequest{
TopicDetails: topicDetails,
ValidateOnly: validateOnly,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
request.Version = 2
}

b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreateTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr.Err
}

return nil
}

func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
return ErrInvalidTopic
}

request := &DeleteTopicsRequest{Topics: []string{topic}}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}

b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.DeleteTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrorCodes[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr != ErrNoError {
return topicErr
}
return nil
}

func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}

topicPartitions := make(map[string]*TopicPartition)
topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}

request := &CreatePartitionsRequest{
TopicPartitions: topicPartitions,
}

b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreatePartitions(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicPartitionErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr.Err
}

return nil
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {

if topic == "" {
return ErrInvalidTopic
}

topics := make(map[string]*DeleteRecordsRequestTopic)
topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
request := &DeleteRecordsRequest{
Topics: topics}

b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.DeleteRecords(request)
if err != nil {
return err
}

_, ok := rsp.Topics[topic]
if !ok {
return ErrIncompleteResponse
}

//todo since we are dealing with couple of partitions it would be good if we return slice of errors
//for each partition instead of one error
return nil
}

func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {

var entries []ConfigEntry
var resources []*ConfigResource
resources = append(resources, &resource)

request := &DescribeConfigsRequest{
Resources: resources,
}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
}

for _, rspResource := range rsp.Resources {
if rspResource.Name == resource.Name {
if rspResource.ErrorMsg != "" {
return nil, errors.New(rspResource.ErrorMsg)
}
for _, cfgEntry := range rspResource.Configs {
entries = append(entries, *cfgEntry)
}
}
}
return entries, nil
}

func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {

var resources []*AlterConfigsResource
resources = append(resources, &AlterConfigsResource{
Type: resourceType,
Name: name,
ConfigEntries: entries,
})

request := &AlterConfigsRequest{
Resources: resources,
ValidateOnly: validateOnly,
}

b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.AlterConfigs(request)
if err != nil {
return err
}

for _, rspResource := range rsp.Resources {
if rspResource.Name == name {
if rspResource.ErrorMsg != "" {
return errors.New(rspResource.ErrorMsg)
}
}
}
return nil
}

func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
var acls []*AclCreation
acls = append(acls, &AclCreation{resource, acl})
request := &CreateAclsRequest{AclCreations: acls}

b, err := ca.Controller()
if err != nil {
return err
}

_, err = b.CreateAcls(request)
return err
}

func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {

request := &DescribeAclsRequest{AclFilter: filter}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.DescribeAcls(request)
if err != nil {
return nil, err
}

var lAcls []ResourceAcls
for _, rAcl := range rsp.ResourceAcls {
lAcls = append(lAcls, *rAcl)
}
return lAcls, nil
}

func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
var filters []*AclFilter
filters = append(filters, &filter)
request := &DeleteAclsRequest{Filters: filters}

b, err := ca.Controller()
if err != nil {
return nil, err
}

rsp, err := b.DeleteAcls(request)
if err != nil {
return nil, err
}

var mAcls []MatchingAcl
for _, fr := range rsp.FilterResponses {
for _, mACL := range fr.MatchingAcls {
mAcls = append(mAcls, *mACL)
}

}
return mAcls, nil
}
Loading

0 comments on commit 46cf3e2

Please sign in to comment.