From 320229591a532ab7500d654165dd46748c925cf4 Mon Sep 17 00:00:00 2001 From: chandra deepak kasiraju Date: Thu, 15 Feb 2018 19:07:26 -0800 Subject: [PATCH] support of admin operations in sarama --- admin.go | 375 +++++++++++++++++++++++++++++++++++ admin_test.go | 501 +++++++++++++++++++++++++++++++++++++++++++++++ broker.go | 12 +- mockresponses.go | 187 ++++++++++++++++++ 4 files changed, 1069 insertions(+), 6 deletions(-) create mode 100644 admin.go create mode 100644 admin_test.go diff --git a/admin.go b/admin.go new file mode 100644 index 000000000..68284641c --- /dev/null +++ b/admin.go @@ -0,0 +1,375 @@ +package sarama + +import "errors" + +// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, +// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. +// Methods with stricter requirements will specify the minimum broker version required. +// You MUST call Close() on a client to avoid leaks +type ClusterAdmin interface { + // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. + // It may take several seconds after CreateTopic returns success for all the brokers + // to become aware that the topic has been created. During this time, listTopics + // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0. + CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error + + // Delete a topic. It may take several seconds after the DeleteTopic to returns success + // and for all the brokers to become aware that the topics are gone. + // During this time, listTopics may continue to return information about the deleted topic. + // If delete.topic.enable is false on the brokers, deleteTopic will mark + // the topic for deletion, but not actually delete them. + // This operation is supported by brokers with version 0.10.1.0 or higher. + DeleteTopic(topic string) error + + // Increase the number of partitions of the topics according to the corresponding values. + // If partitions are increased for a topic that has a key, the partition logic or ordering of + // the messages will be affected. It may take several seconds after this method returns + // success for all the brokers to become aware that the partitions have been created. + // During this time, ClusterAdmin#describeTopics may not return information about the + // new partitions. This operation is supported by brokers with version 1.0.0 or higher. + CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error + + // Delete records whose offset is smaller than the given offset of the corresponding partition. + // This operation is supported by brokers with version 0.11.0.0 or higher. + DeleteRecords(topic string, partitionOffsets map[int32]int64) error + + // Get the configuration for the specified resources. + // The returned configuration includes default values and the Default is true + // can be used to distinguish them from user supplied values. + // Config entries where ReadOnly is true cannot be updated. + // The value of config entries where Sensitive is true is always nil so + // sensitive information is not disclosed. + // This operation is supported by brokers with version 0.11.0.0 or higher. + DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) + + // Update the configuration for the specified resources with the default options. + // This operation is supported by brokers with version 0.11.0.0 or higher. + // The resources with their configs (topic is the only resource type with configs + // that can be updated currently Updates are not transactional so they may succeed + // for some resources while fail for others. The configs for a particular resource are updated automatically. + AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error + + // Creates access control lists (ACLs) which are bound to specific resources. + // This operation is not transactional so it may succeed for some ACLs while fail for others. + // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but + // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher. + CreateACL(resource Resource, acl Acl) error + + // Lists access control lists (ACLs) according to the supplied filter. + // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls + // This operation is supported by brokers with version 0.11.0.0 or higher. + ListAcls(filter AclFilter) ([]ResourceAcls, error) + + // Deletes access control lists (ACLs) according to the supplied filters. + // This operation is not transactional so it may succeed for some ACLs while fail for others. + // This operation is supported by brokers with version 0.11.0.0 or higher. + DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) + + // Close shuts down the admin and closes underlying client. + Close() error +} + +type clusterAdmin struct { + client Client + conf *Config +} + +// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration. +func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) { + client, err := NewClient(addrs, conf) + if err != nil { + return nil, err + } + + //make sure we can retrieve the controller + _, err = client.Controller() + if err != nil { + return nil, err + } + + ca := &clusterAdmin{ + client: client, + conf: client.Config(), + } + return ca, nil +} + +func (ca *clusterAdmin) Close() error { + return ca.client.Close() +} + +func (ca *clusterAdmin) Controller() (*Broker, error) { + return ca.client.Controller() +} + +func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error { + + if topic == "" { + return ErrInvalidTopic + } + + if detail == nil { + return errors.New("You must specify topic details") + } + + topicDetails := make(map[string]*TopicDetail) + topicDetails[topic] = detail + + request := &CreateTopicsRequest{ + TopicDetails: topicDetails, + ValidateOnly: validateOnly, + } + + if ca.conf.Version.IsAtLeast(V0_11_0_0) { + request.Version = 1 + } + if ca.conf.Version.IsAtLeast(V1_0_0_0) { + request.Version = 2 + } + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.CreateTopics(request) + if err != nil { + return err + } + + topicErr, ok := rsp.TopicErrors[topic] + if !ok { + return ErrIncompleteResponse + } + + if topicErr.Err != ErrNoError { + return topicErr.Err + } + + return nil +} + +func (ca *clusterAdmin) DeleteTopic(topic string) error { + + if topic == "" { + return ErrInvalidTopic + } + + request := &DeleteTopicsRequest{Topics: []string{topic}} + + if ca.conf.Version.IsAtLeast(V0_11_0_0) { + request.Version = 1 + } + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.DeleteTopics(request) + if err != nil { + return err + } + + topicErr, ok := rsp.TopicErrorCodes[topic] + if !ok { + return ErrIncompleteResponse + } + + if topicErr != ErrNoError { + return topicErr + } + return nil +} + +func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error { + if topic == "" { + return ErrInvalidTopic + } + + topicPartitions := make(map[string]*TopicPartition) + topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment} + + request := &CreatePartitionsRequest{ + TopicPartitions: topicPartitions, + } + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.CreatePartitions(request) + if err != nil { + return err + } + + topicErr, ok := rsp.TopicPartitionErrors[topic] + if !ok { + return ErrIncompleteResponse + } + + if topicErr.Err != ErrNoError { + return topicErr.Err + } + + return nil +} + +func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error { + + if topic == "" { + return ErrInvalidTopic + } + + topics := make(map[string]*DeleteRecordsRequestTopic) + topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets} + request := &DeleteRecordsRequest{ + Topics: topics} + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.DeleteRecords(request) + if err != nil { + return err + } + + _, ok := rsp.Topics[topic] + if !ok { + return ErrIncompleteResponse + } + + //todo since we are dealing with couple of partitions it would be good if we return slice of errors + //for each partition instead of one error + return nil +} + +func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) { + + var entries []ConfigEntry + var resources []*ConfigResource + resources = append(resources, &resource) + + request := &DescribeConfigsRequest{ + Resources: resources, + } + + b, err := ca.Controller() + if err != nil { + return nil, err + } + + rsp, err := b.DescribeConfigs(request) + if err != nil { + return nil, err + } + + for _, rspResource := range rsp.Resources { + if rspResource.Name == resource.Name { + if rspResource.ErrorMsg != "" { + return nil, errors.New(rspResource.ErrorMsg) + } + for _, cfgEntry := range rspResource.Configs { + entries = append(entries, *cfgEntry) + } + } + } + return entries, nil +} + +func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error { + + var resources []*AlterConfigsResource + resources = append(resources, &AlterConfigsResource{ + Type: resourceType, + Name: name, + ConfigEntries: entries, + }) + + request := &AlterConfigsRequest{ + Resources: resources, + ValidateOnly: validateOnly, + } + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.AlterConfigs(request) + if err != nil { + return err + } + + for _, rspResource := range rsp.Resources { + if rspResource.Name == name { + if rspResource.ErrorMsg != "" { + return errors.New(rspResource.ErrorMsg) + } + } + } + return nil +} + +func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { + var acls []*AclCreation + acls = append(acls, &AclCreation{resource, acl}) + request := &CreateAclsRequest{AclCreations: acls} + + b, err := ca.Controller() + if err != nil { + return err + } + + _, err = b.CreateAcls(request) + return err +} + +func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) { + + request := &DescribeAclsRequest{AclFilter: filter} + + b, err := ca.Controller() + if err != nil { + return nil, err + } + + rsp, err := b.DescribeAcls(request) + if err != nil { + return nil, err + } + + var lAcls []ResourceAcls + for _, rAcl := range rsp.ResourceAcls { + lAcls = append(lAcls, *rAcl) + } + return lAcls, nil +} + +func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) { + var filters []*AclFilter + filters = append(filters, &filter) + request := &DeleteAclsRequest{Filters: filters} + + b, err := ca.Controller() + if err != nil { + return nil, err + } + + rsp, err := b.DeleteAcls(request) + if err != nil { + return nil, err + } + + var mAcls []MatchingAcl + for _, fr := range rsp.FilterResponses { + for _, mACL := range fr.MatchingAcls { + mAcls = append(mAcls, *mACL) + } + + } + return mAcls, nil +} diff --git a/admin_test.go b/admin_test.go new file mode 100644 index 000000000..9d3cc3170 --- /dev/null +++ b/admin_test.go @@ -0,0 +1,501 @@ +package sarama + +import ( + "errors" + "testing" +) + +func TestClusterAdmin(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminInvalidController(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + _, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err == nil { + t.Fatal(errors.New("controller not set still cluster admin was created")) + } + + if err != ErrControllerNotAvailable { + t.Fatal(err) + } +} + +func TestClusterAdminCreateTopic(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateTopicsRequest": NewMockCreateTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminCreateTopicWithInvalidTopicDetail(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateTopicsRequest": NewMockCreateTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreateTopic("my_topic", nil, false) + if err.Error() != "You must specify topic details" { + t.Fatal(err) + } + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateTopicsRequest": NewMockCreateTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_11_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreateTopic("my_topic", &TopicDetail{NumPartitions: 1, ReplicationFactor: 1}, false) + if err != ErrInsufficientData { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDeleteTopic(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.DeleteTopic("my_topic") + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDeleteEmptyTopic(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DeleteTopicsRequest": NewMockDeleteTopicsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.DeleteTopic("") + if err != ErrInvalidTopic { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminCreatePartitions(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreatePartitions("my_topic", 3, nil, false) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminCreatePartitionsWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreatePartitionsRequest": NewMockCreatePartitionsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.CreatePartitions("my_topic", 3, nil, false) + if err != ErrUnsupportedVersion { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDeleteRecords(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + partitionOffset := make(map[int32]int64) + partitionOffset[1] = 1000 + partitionOffset[2] = 1000 + partitionOffset[3] = 1000 + + err = admin.DeleteRecords("my_topic", partitionOffset) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DeleteRecordsRequest": NewMockDeleteRecordsResponse(t), + }) + + config := NewConfig() + config.Version = V0_10_2_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + partitionOffset := make(map[int32]int64) + partitionOffset[1] = 1000 + partitionOffset[2] = 1000 + partitionOffset[3] = 1000 + + err = admin.DeleteRecords("my_topic", partitionOffset) + if err != ErrUnsupportedVersion { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDescribeConfig(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}} + entries, err := admin.DescribeConfig(resource) + if err != nil { + t.Fatal(err) + } + + if len(entries) <= 0 { + t.Fatal(errors.New("no resource present")) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminAlterConfig(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "AlterConfigsRequest": NewMockAlterConfigsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var value string + entries := make(map[string]*string) + value = "3" + entries["ReplicationFactor"] = &value + err = admin.AlterConfig(TopicResource, "my_topic", entries, false) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminCreateAcl(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreateAclsRequest": NewMockCreateAclsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} + a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} + + err = admin.CreateACL(r, a) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminListAcls(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DescribeAclsRequest": NewMockListAclsResponse(t), + "CreateAclsRequest": NewMockCreateAclsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + r := Resource{ResourceType: AclResourceTopic, ResourceName: "my_topic"} + a := Acl{Host: "localhost", Operation: AclOperationAlter, PermissionType: AclPermissionAny} + + err = admin.CreateACL(r, a) + if err != nil { + t.Fatal(err) + } + resourceName := "my_topic" + filter := AclFilter{ + ResourceType: AclResourceTopic, + Operation: AclOperationRead, + ResourceName: &resourceName, + } + + rAcls, err := admin.ListAcls(filter) + if err != nil { + t.Fatal(err) + } + if len(rAcls) <= 0 { + t.Fatal("no acls present") + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminDeleteAcl(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "DeleteAclsRequest": NewMockDeleteAclsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + resourceName := "my_topic" + filter := AclFilter{ + ResourceType: AclResourceTopic, + Operation: AclOperationAlter, + ResourceName: &resourceName, + } + + _, err = admin.DeleteACL(filter, false) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/broker.go b/broker.go index 9755a7d7c..81f64494b 100644 --- a/broker.go +++ b/broker.go @@ -386,8 +386,8 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } -func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { - response := new(CreatePartitionsResponse) +func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { + response := new(CreateTopicsResponse) err := b.sendAndReceive(request, response) if err != nil { @@ -397,8 +397,8 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart return response, nil } -func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { - response := new(CreateTopicsResponse) +func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { + response := new(DeleteTopicsResponse) err := b.sendAndReceive(request, response) if err != nil { @@ -408,8 +408,8 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon return response, nil } -func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { - response := new(DeleteTopicsResponse) +func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { + response := new(CreatePartitionsResponse) err := b.sendAndReceive(request, response) if err != nil { diff --git a/mockresponses.go b/mockresponses.go index 5541d32ec..172044199 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -538,3 +538,190 @@ func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { } return res } + +type MockCreateTopicsResponse struct { + t TestReporter +} + +func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse { + return &MockCreateTopicsResponse{t: t} +} + +func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreateTopicsRequest) + res := &CreateTopicsResponse{} + res.TopicErrors = make(map[string]*TopicError) + + for topic, _ := range req.TopicDetails { + res.TopicErrors[topic] = &TopicError{Err: ErrNoError} + } + return res +} + +type MockDeleteTopicsResponse struct { + t TestReporter +} + +func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse { + return &MockDeleteTopicsResponse{t: t} +} + +func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteTopicsRequest) + res := &DeleteTopicsResponse{} + res.TopicErrorCodes = make(map[string]KError) + + for _, topic := range req.Topics { + res.TopicErrorCodes[topic] = ErrNoError + } + return res +} + +type MockCreatePartitionsResponse struct { + t TestReporter +} + +func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse { + return &MockCreatePartitionsResponse{t: t} +} + +func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreatePartitionsRequest) + res := &CreatePartitionsResponse{} + res.TopicPartitionErrors = make(map[string]*TopicPartitionError) + + for topic, _ := range req.TopicPartitions { + res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError} + } + return res +} + +type MockDeleteRecordsResponse struct { + t TestReporter +} + +func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse { + return &MockDeleteRecordsResponse{t: t} +} + +func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteRecordsRequest) + res := &DeleteRecordsResponse{} + res.Topics = make(map[string]*DeleteRecordsResponseTopic) + + for topic, deleteRecordRequestTopic := range req.Topics { + partitions := make(map[int32]*DeleteRecordsResponsePartition) + for partition, _ := range deleteRecordRequestTopic.PartitionOffsets { + partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError} + } + res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions} + } + return res +} + +type MockDescribeConfigsResponse struct { + t TestReporter +} + +func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse { + return &MockDescribeConfigsResponse{t: t} +} + +func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DescribeConfigsRequest) + res := &DescribeConfigsResponse{} + + var configEntries []*ConfigEntry + configEntries = append(configEntries, &ConfigEntry{Name: "my_topic", + Value: "my_topic", + ReadOnly: true, + Default: true, + Sensitive: false, + }) + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries}) + } + return res +} + +type MockAlterConfigsResponse struct { + t TestReporter +} + +func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse { + return &MockAlterConfigsResponse{t: t} +} + +func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*AlterConfigsRequest) + res := &AlterConfigsResponse{} + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name, + Type: TopicResource, + ErrorMsg: "", + }) + } + return res +} + +type MockCreateAclsResponse struct { + t TestReporter +} + +func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse { + return &MockCreateAclsResponse{t: t} +} + +func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*CreateAclsRequest) + res := &CreateAclsResponse{} + + for range req.AclCreations { + res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError}) + } + return res +} + +type MockListAclsResponse struct { + t TestReporter +} + +func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse { + return &MockListAclsResponse{t: t} +} + +func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DescribeAclsRequest) + res := &DescribeAclsResponse{} + + res.Err = ErrNoError + acl := &ResourceAcls{} + acl.Resource.ResourceName = *req.ResourceName + acl.Resource.ResourceType = req.ResourceType + acl.Acls = append(acl.Acls, &Acl{}) + res.ResourceAcls = append(res.ResourceAcls, acl) + + return res +} + +type MockDeleteAclsResponse struct { + t TestReporter +} + +func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse { + return &MockDeleteAclsResponse{t: t} +} + +func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*DeleteAclsRequest) + res := &DeleteAclsResponse{} + + for range req.Filters { + response := &FilterResponse{Err: ErrNoError} + response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError}) + res.FilterResponses = append(res.FilterResponses, response) + } + return res +}