From 65839ce0aafbc0849802d6c36b405aeda9813fd0 Mon Sep 17 00:00:00 2001 From: gaohj Date: Wed, 27 Nov 2019 15:11:23 +0800 Subject: [PATCH 1/5] add kafka scaler sasl --- go.mod | 1 + go.sum | 2 ++ pkg/scalers/kafka_scaler.go | 53 +++++++++++++++++++++++++++++++ pkg/scalers/kafka_scaler_test.go | 21 +++++++++++- pkg/scalers/kafka_scram_client.go | 36 +++++++++++++++++++++ 5 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 pkg/scalers/kafka_scram_client.go diff --git a/go.mod b/go.mod index 2a7c46fa501..483981f5a72 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 github.com/stretchr/testify v1.4.0 + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c google.golang.org/api v0.10.0 google.golang.org/genproto v0.0.0-20191002211648-c459b9ce5143 google.golang.org/grpc v1.24.0 diff --git a/go.sum b/go.sum index d0fb2be9fd2..84d84c8cce0 100644 --- a/go.sum +++ b/go.sum @@ -599,7 +599,9 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 h1:3SVOIvH7Ae1KRYy github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 2b90636b8af..d99af9810ca 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -28,8 +28,23 @@ type kafkaMetadata struct { group string topic string lagThreshold int64 + + // auth + authMode kafkaAuthMode + username string + passwd string } +type kafkaAuthMode string + +const ( + kafkaAuthModeForNone kafkaAuthMode = "none" + kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext" + kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256" + kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512" + kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl" // sarama package not support sasl_ssl +) + const ( lagThresholdMetricName = "lagThreshold" kafkaMetricType = "External" @@ -86,6 +101,28 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) { meta.lagThreshold = t } + meta.authMode = kafkaAuthModeForNone + if val, ok := metadata["authMode"]; ok { + mode := kafkaAuthMode(val) + if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 { + return meta, fmt.Errorf("err auth mode %s given", mode) + } + + meta.authMode = mode + } + + if meta.authMode != kafkaAuthModeForNone { + if metadata["username"] == "" { + return meta, errors.New("no username given") + } + meta.username = metadata["username"] + + if metadata["passwd"] == "" { + return meta, errors.New("no passwd given") + } + meta.passwd = metadata["passwd"] + } + return meta, nil } @@ -118,6 +155,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 + if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok { + config.Net.SASL.Enable = true + config.Net.SASL.User = metadata.username + config.Net.SASL.Password = metadata.passwd + } + + if metadata.authMode == kafkaAuthModeForSaslScramSha256 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + } + + if metadata.authMode == kafkaAuthModeForSaslScramSha512 { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + } + client, err := sarama.NewClient(metadata.brokers, config) if err != nil { return nil, nil, fmt.Errorf("error creating kafka client: %s", err) diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 7e5de4c2444..49b283ebdf2 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -21,12 +21,31 @@ var validMetadata = map[string]string{ "topic": "my-topic", } +// A complete valid metadata example for sasl, with username and passwd +var validMetadataWithSasl = map[string]string{ + "brokerList": "broker1:9092,broker2:9092", + "consumerGroup": "my-group", + "topic": "my-topic", + "authMode": "sasl_plaintext", + "username": "admin", + "passwd": "admin", +} + +// A complete valid metadata example for sasl, without username and passwd +var validMetadataWithoutSasl = map[string]string{ + "brokerList": "broker1:9092,broker2:9092", + "consumerGroup": "my-group", + "topic": "my-topic", + "authMode": "sasl_plaintext", +} + var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{}, true, 0, nil, "", ""}, {map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""}, {map[string]string{"brokerList": "foo:9092,bar:9092"}, true, 2, []string{"foo:9092", "bar:9092"}, "", ""}, {map[string]string{"brokerList": "a", "consumerGroup": "my-group"}, true, 1, []string{"a"}, "my-group", ""}, - {validMetadata, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"}, + {validMetadataWithSasl, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"}, + {validMetadataWithoutSasl, true, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"}, } func TestGetBrokers(t *testing.T) { diff --git a/pkg/scalers/kafka_scram_client.go b/pkg/scalers/kafka_scram_client.go new file mode 100644 index 00000000000..6bbed28ea1e --- /dev/null +++ b/pkg/scalers/kafka_scram_client.go @@ -0,0 +1,36 @@ +package scalers + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} From 0f5a45edb69e4b0d8d6dc2353900e264b166060a Mon Sep 17 00:00:00 2001 From: gaohj Date: Thu, 28 Nov 2019 10:29:12 +0800 Subject: [PATCH 2/5] update sasl auth info reference a secret --- pkg/handler/scale_handler.go | 2 +- pkg/scalers/kafka_scaler.go | 36 +++++++++++++++----------------- pkg/scalers/kafka_scaler_test.go | 25 ++++++---------------- 3 files changed, 24 insertions(+), 39 deletions(-) diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index e4ed3846e7e..c76272863d7 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -273,7 +273,7 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn case "aws-cloudwatch": return scalers.NewAwsCloudwatchScaler(resolvedEnv, triggerMetadata, authParams) case "kafka": - return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata) + return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams) case "rabbitmq": return scalers.NewRabbitMQScaler(resolvedEnv, triggerMetadata, authParams) case "azure-eventhub": diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index d99af9810ca..230a950c177 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -17,10 +17,9 @@ import ( ) type kafkaScaler struct { - resolvedSecrets map[string]string - metadata kafkaMetadata - client sarama.Client - admin sarama.ClusterAdmin + metadata kafkaMetadata + client sarama.Client + admin sarama.ClusterAdmin } type kafkaMetadata struct { @@ -32,7 +31,7 @@ type kafkaMetadata struct { // auth authMode kafkaAuthMode username string - passwd string + password string } type kafkaAuthMode string @@ -54,8 +53,8 @@ const ( var kafkaLog = logf.Log.WithName("kafka_scaler") // NewKafkaScaler creates a new kafkaScaler -func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error) { - kafkaMetadata, err := parseKafkaMetadata(metadata) +func NewKafkaScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + kafkaMetadata, err := parseKafkaMetadata(resolvedEnv, metadata, authParams) if err != nil { return nil, fmt.Errorf("error parsing kafka metadata: %s", err) } @@ -66,14 +65,13 @@ func NewKafkaScaler(resolvedSecrets, metadata map[string]string) (Scaler, error) } return &kafkaScaler{ - client: client, - admin: admin, - metadata: kafkaMetadata, - resolvedSecrets: resolvedSecrets, + client: client, + admin: admin, + metadata: kafkaMetadata, }, nil } -func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) { +func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (kafkaMetadata, error) { meta := kafkaMetadata{} if metadata["brokerList"] == "" { @@ -102,7 +100,7 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) { } meta.authMode = kafkaAuthModeForNone - if val, ok := metadata["authMode"]; ok { + if val, ok := authParams["authMode"]; ok { mode := kafkaAuthMode(val) if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 { return meta, fmt.Errorf("err auth mode %s given", mode) @@ -112,15 +110,15 @@ func parseKafkaMetadata(metadata map[string]string) (kafkaMetadata, error) { } if meta.authMode != kafkaAuthModeForNone { - if metadata["username"] == "" { + if authParams["username"] == "" { return meta, errors.New("no username given") } - meta.username = metadata["username"] + meta.username = authParams["username"] - if metadata["passwd"] == "" { - return meta, errors.New("no passwd given") + if authParams["password"] == "" { + return meta, errors.New("no password given") } - meta.passwd = metadata["passwd"] + meta.password = authParams["password"] } return meta, nil @@ -158,7 +156,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok { config.Net.SASL.Enable = true config.Net.SASL.User = metadata.username - config.Net.SASL.Password = metadata.passwd + config.Net.SASL.Password = metadata.password } if metadata.authMode == kafkaAuthModeForSaslScramSha256 { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 49b283ebdf2..17e16eaa7f6 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -21,22 +21,11 @@ var validMetadata = map[string]string{ "topic": "my-topic", } -// A complete valid metadata example for sasl, with username and passwd -var validMetadataWithSasl = map[string]string{ - "brokerList": "broker1:9092,broker2:9092", - "consumerGroup": "my-group", - "topic": "my-topic", - "authMode": "sasl_plaintext", - "username": "admin", - "passwd": "admin", -} - -// A complete valid metadata example for sasl, without username and passwd -var validMetadataWithoutSasl = map[string]string{ - "brokerList": "broker1:9092,broker2:9092", - "consumerGroup": "my-group", - "topic": "my-topic", - "authMode": "sasl_plaintext", +// A complete valid authParams example for sasl, with username and passwd +var validAuthParams = map[string]string{ + "authMode": "sasl_plaintext", + "username": "admin", + "passwd": "admin", } var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ @@ -44,13 +33,11 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""}, {map[string]string{"brokerList": "foo:9092,bar:9092"}, true, 2, []string{"foo:9092", "bar:9092"}, "", ""}, {map[string]string{"brokerList": "a", "consumerGroup": "my-group"}, true, 1, []string{"a"}, "my-group", ""}, - {validMetadataWithSasl, false, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"}, - {validMetadataWithoutSasl, true, 2, []string{"broker1:9092", "broker2:9092"}, "my-group", "my-topic"}, } func TestGetBrokers(t *testing.T) { for _, testData := range parseKafkaMetadataTestDataset { - meta, err := parseKafkaMetadata(testData.metadata) + meta, err := parseKafkaMetadata(nil, testData.metadata, validAuthParams) if err != nil && !testData.isError { t.Error("Expected success but got error", err) From c5c876bdee5c9023f8803ed3f568f17054a01f6b Mon Sep 17 00:00:00 2001 From: gaohj Date: Thu, 28 Nov 2019 11:49:10 +0800 Subject: [PATCH 3/5] add support sasl_ssl --- pkg/scalers/kafka_scaler.go | 46 +++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 230a950c177..dc0b4da5755 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -2,6 +2,8 @@ package scalers import ( "context" + "crypto/tls" + "crypto/x509" "errors" "fmt" "strconv" @@ -32,6 +34,11 @@ type kafkaMetadata struct { authMode kafkaAuthMode username string password string + + // ssl + cert string + key string + ca string } type kafkaAuthMode string @@ -41,7 +48,7 @@ const ( kafkaAuthModeForSaslPlaintext kafkaAuthMode = "sasl_plaintext" kafkaAuthModeForSaslScramSha256 kafkaAuthMode = "sasl_scram_sha256" kafkaAuthModeForSaslScramSha512 kafkaAuthMode = "sasl_scram_sha512" - kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl" // sarama package not support sasl_ssl + kafkaAuthModeForSaslSSL kafkaAuthMode = "sasl_ssl" ) const ( @@ -121,6 +128,23 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka meta.password = authParams["password"] } + if meta.authMode == kafkaAuthModeForSaslSSL { + if authParams["ca"] == "" { + return meta, errors.New("no ca given") + } + meta.ca = authParams["ca"] + + if authParams["cert"] == "" { + return meta, errors.New("no cert given") + } + meta.cert = authParams["cert"] + + if authParams["key"] == "" { + return meta, errors.New("no key given") + } + meta.key = authParams["key"] + } + return meta, nil } @@ -153,12 +177,30 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 - if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok { + if ok := metadata.authMode == kafkaAuthModeForSaslPlaintext || metadata.authMode == kafkaAuthModeForSaslSSL || metadata.authMode == kafkaAuthModeForSaslScramSha256 || metadata.authMode == kafkaAuthModeForSaslScramSha512; ok { config.Net.SASL.Enable = true config.Net.SASL.User = metadata.username config.Net.SASL.Password = metadata.password } + if metadata.authMode == kafkaAuthModeForSaslSSL { + cert, err := tls.X509KeyPair([]byte(metadata.cert), []byte(metadata.key)) + if err != nil { + return nil, nil, fmt.Errorf("error parse X509KeyPair: %s", err) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(metadata.ca)) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig + } + if metadata.authMode == kafkaAuthModeForSaslScramSha256 { config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) From 1d43aa2472beb5f03a1124c370c242ed75e58ad1 Mon Sep 17 00:00:00 2001 From: gaohj Date: Thu, 28 Nov 2019 12:11:01 +0800 Subject: [PATCH 4/5] fix validate authmode --- pkg/scalers/kafka_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index dc0b4da5755..83446fc0699 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -109,7 +109,7 @@ func parseKafkaMetadata(resolvedEnv, metadata, authParams map[string]string) (ka meta.authMode = kafkaAuthModeForNone if val, ok := authParams["authMode"]; ok { mode := kafkaAuthMode(val) - if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 { + if mode != kafkaAuthModeForNone && mode != kafkaAuthModeForSaslPlaintext && mode != kafkaAuthModeForSaslSSL && mode != kafkaAuthModeForSaslScramSha256 && mode != kafkaAuthModeForSaslScramSha512 { return meta, fmt.Errorf("err auth mode %s given", mode) } From bdeb18f17526302acd5ba31fd584d7717be1e1f9 Mon Sep 17 00:00:00 2001 From: gaohj Date: Thu, 28 Nov 2019 12:51:55 +0800 Subject: [PATCH 5/5] add withAuthParams test --- pkg/scalers/kafka_scaler_test.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 17e16eaa7f6..5d06c48d448 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -22,12 +22,15 @@ var validMetadata = map[string]string{ } // A complete valid authParams example for sasl, with username and passwd -var validAuthParams = map[string]string{ +var validWithAuthParams = map[string]string{ "authMode": "sasl_plaintext", "username": "admin", "passwd": "admin", } +// A complete valid authParams example for sasl, without username and passwd +var validWithoutAuthParams = map[string]string{} + var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{}, true, 0, nil, "", ""}, {map[string]string{"brokerList": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", ""}, @@ -37,7 +40,28 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ func TestGetBrokers(t *testing.T) { for _, testData := range parseKafkaMetadataTestDataset { - meta, err := parseKafkaMetadata(nil, testData.metadata, validAuthParams) + meta, err := parseKafkaMetadata(nil, testData.metadata, validWithAuthParams) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if len(meta.brokers) != testData.numBrokers { + t.Errorf("Expected %d brokers but got %d\n", testData.numBrokers, len(meta.brokers)) + } + if !reflect.DeepEqual(testData.brokers, meta.brokers) { + t.Errorf("Expected %v but got %v\n", testData.brokers, meta.brokers) + } + if meta.group != testData.group { + t.Errorf("Expected group %s but got %s\n", testData.group, meta.group) + } + if meta.topic != testData.topic { + t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic) + } + + meta, err = parseKafkaMetadata(nil, testData.metadata, validWithoutAuthParams) if err != nil && !testData.isError { t.Error("Expected success but got error", err)