From b952e635d4f1b688854c1097cd7ce6eeaf50b273 Mon Sep 17 00:00:00 2001 From: Guillaume Fillon <648780+zirkome@users.noreply.github.com> Date: Fri, 25 Feb 2022 17:19:53 +0100 Subject: [PATCH] Add CreateAcls Admin API support (#839) This adds a CreateACLs method to the kafka Client to supports the CreateAcls Admin API. Signed-off-by: Guillaume Fillon --- .circleci/config.yml | 35 ++++++++-- createacl_test.go | 49 ++++++++++++++ createacls.go | 108 ++++++++++++++++++++++++++++++ describeconfigs.go | 10 --- docker-compose.yml | 2 + protocol/createacls/createacls.go | 49 ++++++++++++++ resource.go | 37 ++++++++++ 7 files changed, 273 insertions(+), 17 deletions(-) create mode 100644 createacl_test.go create mode 100644 createacls.go create mode 100644 protocol/createacls/createacls.go create mode 100644 resource.go diff --git a/.circleci/config.yml b/.circleci/config.yml index e0e9d9d6e..5afd09010 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -129,7 +129,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-211: @@ -145,7 +148,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-222: @@ -161,7 +167,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-231: @@ -177,7 +186,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-241: @@ -204,7 +216,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-260: @@ -231,7 +246,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps kafka-271: @@ -258,7 +276,10 @@ jobs: ports: - 9092:9092 - 9093:9093 - environment: *environment + environment: + <<: *environment + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' steps: *steps workflows: diff --git a/createacl_test.go b/createacl_test.go new file mode 100644 index 000000000..4f4b15380 --- /dev/null +++ b/createacl_test.go @@ -0,0 +1,49 @@ +package kafka + +import ( + "context" + "testing" + + ktesting "github.com/segmentio/kafka-go/testing" +) + +func TestClientCreateACLs(t *testing.T) { + if !ktesting.KafkaIsAtLeast("2.0.1") { + return + } + + client, shutdown := newLocalClient() + defer shutdown() + + res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ + ACLs: []ACLEntry{ + { + Principal: "User:alice", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeTopic, + ResourcePatternType: PatternTypeLiteral, + ResourceName: "fake-topic-for-alice", + Host: "*", + }, + { + Principal: "User:bob", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeGroup, + ResourcePatternType: PatternTypeLiteral, + ResourceName: "fake-group-for-bob", + Host: "*", + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + for _, err := range res.Errors { + if err != nil { + t.Error(err) + } + } +} diff --git a/createacls.go b/createacls.go new file mode 100644 index 000000000..672f6fdce --- /dev/null +++ b/createacls.go @@ -0,0 +1,108 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/createacls" +) + +// CreateACLsRequest represents a request sent to a kafka broker to add +// new ACLs. +type CreateACLsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of ACL to create. + ACLs []ACLEntry +} + +// CreateACLsResponse represents a response from a kafka broker to an ACL +// creation request. +type CreateACLsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // List of errors that occurred while attempting to create + // the ACLs. + // + // The errors contain the kafka error code. Programs may use the standard + // errors.Is function to test the error against kafka error codes. + Errors []error +} + +type ACLPermissionType int8 + +const ( + ACLPermissionTypeUnknown ACLPermissionType = 0 + ACLPermissionTypeAny ACLPermissionType = 1 + ACLPermissionTypeDeny ACLPermissionType = 2 + ACLPermissionTypeAllow ACLPermissionType = 3 +) + +type ACLOperationType int8 + +const ( + ACLOperationTypeUnknown ACLOperationType = 0 + ACLOperationTypeAny ACLOperationType = 1 + ACLOperationTypeAll ACLOperationType = 2 + ACLOperationTypeRead ACLOperationType = 3 + ACLOperationTypeWrite ACLOperationType = 4 + ACLOperationTypeCreate ACLOperationType = 5 + ACLOperationTypeDelete ACLOperationType = 6 + ACLOperationTypeAlter ACLOperationType = 7 + ACLOperationTypeDescribe ACLOperationType = 8 + ACLOperationTypeClusterAction ACLOperationType = 9 + ACLOperationTypeDescribeConfigs ACLOperationType = 10 + ACLOperationTypeAlterConfigs ACLOperationType = 11 + ACLOperationTypeIdempotentWrite ACLOperationType = 12 +) + +type ACLEntry struct { + ResourceType ResourceType + ResourceName string + ResourcePatternType PatternType + Principal string + Host string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// CreateACLs sends ACLs creation request to a kafka broker and returns the +// response. +func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) { + acls := make([]createacls.RequestACLs, 0, len(req.ACLs)) + + for _, acl := range req.ACLs { + acls = append(acls, createacls.RequestACLs{ + ResourceType: int8(acl.ResourceType), + ResourceName: acl.ResourceName, + ResourcePatternType: int8(acl.ResourcePatternType), + Principal: acl.Principal, + Host: acl.Host, + Operation: int8(acl.Operation), + PermissionType: int8(acl.PermissionType), + }) + } + + m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{ + Creations: acls, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err) + } + + res := m.(*createacls.Response) + ret := &CreateACLsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Errors: make([]error, 0, len(res.Results)), + } + + for _, t := range res.Results { + ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage)) + } + + return ret, nil +} diff --git a/describeconfigs.go b/describeconfigs.go index 0b48e8ae4..3b4d28996 100644 --- a/describeconfigs.go +++ b/describeconfigs.go @@ -24,15 +24,6 @@ type DescribeConfigsRequest struct { IncludeDocumentation bool } -type ResourceType int8 - -const ( - // See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36 - ResourceTypeUnknown ResourceType = 0 - ResourceTypeTopic ResourceType = 2 - ResourceTypeBroker ResourceType = 4 -) - type DescribeConfigRequestResource struct { // Resource Type ResourceType ResourceType @@ -122,7 +113,6 @@ func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsReques IncludeSynonyms: req.IncludeSynonyms, IncludeDocumentation: req.IncludeDocumentation, }) - if err != nil { return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err) } diff --git a/docker-compose.yml b/docker-compose.yml index 72e7b94b1..dc0c2e85e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,8 @@ services: KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer' + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" CUSTOM_INIT_SCRIPT: |- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf; diff --git a/protocol/createacls/createacls.go b/protocol/createacls/createacls.go new file mode 100644 index 000000000..893be44dd --- /dev/null +++ b/protocol/createacls/createacls.go @@ -0,0 +1,49 @@ +package createacls + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v2,tag"` + + Creations []RequestACLs `kafka:"min=v0,max=v2"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type RequestACLs struct { + ResourceType int8 `kafka:"min=v0,max=v2"` + ResourceName string `kafka:"min=v0,max=v2"` + ResourcePatternType int8 `kafka:"min=v0,max=v2"` + Principal string `kafka:"min=v0,max=v2"` + Host string `kafka:"min=v0,max=v2"` + Operation int8 `kafka:"min=v0,max=v2"` + PermissionType int8 `kafka:"min=v0,max=v2"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v2,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` + Results []ResponseACLs `kafka:"min=v0,max=v2"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls } + +type ResponseACLs struct { + ErrorCode int16 `kafka:"min=v0,max=v2"` + ErrorMessage string `kafka:"min=v0,max=v2,nullable"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/resource.go b/resource.go new file mode 100644 index 000000000..f5c2e73a5 --- /dev/null +++ b/resource.go @@ -0,0 +1,37 @@ +package kafka + +// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java +type ResourceType int8 + +const ( + ResourceTypeUnknown ResourceType = 0 + ResourceTypeAny ResourceType = 1 + ResourceTypeTopic ResourceType = 2 + ResourceTypeGroup ResourceType = 3 + // See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L36 + ResourceTypeBroker ResourceType = 4 + ResourceTypeCluster ResourceType = 4 + ResourceTypeTransactionalID ResourceType = 5 + ResourceTypeDelegationToken ResourceType = 6 +) + +// https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java +type PatternType int8 + +const ( + // PatternTypeUnknown represents any PatternType which this client cannot + // understand. + PatternTypeUnknown PatternType = 0 + // PatternTypeAny matches any resource pattern type. + PatternTypeAny PatternType = 1 + // PatternTypeMatch perform pattern matching. + PatternTypeMatch PatternType = 2 + // PatternTypeLiteral represents a literal name. + // A literal name defines the full name of a resource, e.g. topic with name + // 'foo', or group with name 'bob'. + PatternTypeLiteral PatternType = 3 + // PatternTypePrefixed represents a prefixed name. + // A prefixed name defines a prefix for a resource, e.g. topics with names + // that start with 'foo'. + PatternTypePrefixed PatternType = 4 +)