Skip to content

Commit

Permalink
add kafka scaler sasl (#486)
Browse files Browse the repository at this point in the history
* add kafka scaler sasl

* update sasl auth info reference a secret

* add support sasl_ssl

* fix validate authmode

* add withAuthParams test
  • Loading branch information
iyacontrol authored and jeffhollan committed Nov 28, 2019
1 parent ae71add commit cfea46c
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
google.golang.org/api v0.10.0
google.golang.org/genproto v0.0.0-20191002211648-c459b9ce5143
google.golang.org/grpc v1.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 h1:3SVOIvH7Ae1KRYy
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
case "aws-cloudwatch":
return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams)
case "kafka":
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata)
return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams)
case "rabbitmq":
return scalers.NewRabbitMQScaler(resolvedEnv, triggerMetadata, authParams)
case "azure-eventhub":
Expand Down
115 changes: 104 additions & 11 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package scalers

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"strconv"
Expand All @@ -17,19 +19,38 @@ import (
)

type kafkaScaler struct {
resolvedSecrets map[string]string
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
}

type kafkaMetadata struct {
brokers []string
group string
topic string
lagThreshold int64

// auth
authMode kafkaAuthMode
username string
password string

// ssl
cert string
key string
ca string
}

type kafkaAuthMode string

const (
kafkaAuthModeForNone kafkaAuthMode = "none"
kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext"
kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256"
kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512"
kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl"
)

const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
Expand All @@ -39,8 +60,8 @@ const (
var kafkaLog = logf.Log.WithName("kafka_scaler")

// NewKafkaScaler creates a new kafkaScaler
func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error) {
kafkaMetadata, err := parseKafkaMetadata(metadata)
func NewKafkaScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
kafkaMetadata, err := parseKafkaMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing kafka metadata: %s", err)
}
Expand All @@ -51,14 +72,13 @@ func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error)
}

return &kafkaScaler{
client: client,
admin: admin,
metadata: kafkaMetadata,
resolvedSecrets: resolvedSecrets,
client: client,
admin: admin,
metadata: kafkaMetadata,
}, nil
}

func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) {
func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (kafkaMetadata, error) {
meta := kafkaMetadata{}

if metadata["brokerList"] == "" {
Expand Down Expand Up @@ -86,6 +106,45 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) {
meta.lagThreshold = t
}

meta.authMode = kafkaAuthModeForNone
if val, ok := authParams["authMode"]; ok {
mode := kafkaAuthMode(val)
if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslSSL && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 {
return meta, fmt.Errorf("err auth mode %s given", mode)
}

meta.authMode = mode
}

if meta.authMode != kafkaAuthModeForNone {
if authParams["username"] == "" {
return meta, errors.New("no username given")
}
meta.username = authParams["username"]

if authParams["password"] == "" {
return meta, errors.New("no password given")
}
meta.password = authParams["password"]
}

if meta.authMode == kafkaAuthModeForSaslSSL {
if authParams["ca"] == "" {
return meta, errors.New("no ca given")
}
meta.ca = authParams["ca"]

if authParams["cert"] == "" {
return meta, errors.New("no cert given")
}
meta.cert = authParams["cert"]

if authParams["key"] == "" {
return meta, errors.New("no key given")
}
meta.key = authParams["key"]
}

return meta, nil
}

Expand Down Expand Up @@ -118,6 +177,40 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslSSL || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
}

if metadata.authMode == kafkaAuthModeForSaslSSL {
cert, err := tls.X509KeyPair([]byte(metadata.cert), []byte(metadata.key))
if err != nil {
return nil, nil, fmt.Errorf("error parse X509KeyPair: %s", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(metadata.ca))

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

if metadata.authMode == kafkaAuthModeForSaslScramSha256 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
}

if metadata.authMode == kafkaAuthModeForSaslScramSha512 {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
}

client, err := sarama.NewClient(metadata.brokers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %s", err)
Expand Down
34 changes: 32 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,47 @@ var validMetadata = map[string]string{
"topic": "my-topic",
}

// A complete valid authParams example for sasl, with username and passwd
var validWithAuthParams = map[string]string{
"authMode": "sasl_plaintext",
"username": "admin",
"passwd": "admin",
}

// A complete valid authParams example for sasl, without username and passwd
var validWithoutAuthParams = map[string]string{}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{}, true, 0, nil, "", ""},
{map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""},
{map[string]string{"brokerList": "foo:9092,bar:9092"}, true, 2, []string{"foo:9092", "bar:9092"}, "", ""},
{map[string]string{"brokerList": "a", "consumerGroup": "my-group"}, true, 1, []string{"a"}, "my-group", ""},
{validMetadata, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"},
}

func TestGetBrokers(t *testing.T) {
for _, testData := range parseKafkaMetadataTestDataset {
meta, err := parseKafkaMetadata(testData.metadata)
meta, err := parseKafkaMetadata(nil, testData.metadata, validWithAuthParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.brokers) != testData.numBrokers {
t.Errorf("Expected %d brokers but got %d\n", testData.numBrokers, len(meta.brokers))
}
if !reflect.DeepEqual(testData.brokers, meta.brokers) {
t.Errorf("Expected %v but got %v\n", testData.brokers, meta.brokers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}

meta, err = parseKafkaMetadata(nil, testData.metadata, validWithoutAuthParams)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand Down
36 changes: 36 additions & 0 deletions pkg/scalers/kafka_scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package scalers

import (
"crypto/sha256"
"crypto/sha512"
"hash"

"github.com/xdg/scram"
)

var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

0 comments on commit cfea46c

Please sign in to comment.