From 12476eefd0b262318238599ef468d7cfa25b704d Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 26 Aug 2023 15:59:45 -0400 Subject: [PATCH 1/8] add support for gssapi to kafka scaler Signed-off-by: Roman Novichenok --- CHANGELOG.md | 9 +- config/manager/manager.yaml | 6 + pkg/scalers/kafka_scaler.go | 187 +++++++++++++++++++++++++------ pkg/scalers/kafka_scaler_test.go | 84 +++++++++++++- 4 files changed, 246 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5135f47371..1131c738516 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,13 +49,17 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377)) +- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63)) ### Improvements +- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764)) - **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796)) - **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726)) - **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190)) +- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A) +- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) ### Fixes @@ -67,7 +71,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked New deprecation(s): -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Clean up previously deprecated code for 2.12 release ([#4899](https://github.com/kedacore/keda/issues/4899)) ### Breaking Changes @@ -75,6 +79,7 @@ New deprecation(s): ### Other +- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902)) - **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) ## v2.11.2 diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 5cd7246d081..b3eca570b1d 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -77,6 +77,9 @@ spec: seccompProfile: type: RuntimeDefault volumeMounts: + - mountPath: /tmp/kerberos + name: temp-kerberos-vol + readOnly: false - mountPath: /certs name: certificates readOnly: true @@ -84,6 +87,9 @@ spec: nodeSelector: kubernetes.io/os: linux volumes: + - name: temp-kerberos-vol + emptyDir: + medium: Memory - name: certificates secret: defaultMode: 420 diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index c13670f8d7d..79b03864974 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "strconv" "strings" "sync" @@ -51,6 +52,11 @@ type kafkaMetadata struct { username string password string + // GSSAPI + keytabPath string + realm string + kerberosConfigPath string + // OAUTHBEARER scopes []string oauthTokenEndpointURI string @@ -82,6 +88,7 @@ const ( KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256" KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512" KafkaSASLTypeOAuthbearer kafkaSaslType = "oauthbearer" + KafkaSASLTypeGSSAPI kafkaSaslType = "gssapi" ) const ( @@ -145,39 +152,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { saslAuthType = strings.TrimSpace(saslAuthType) mode := kafkaSaslType(saslAuthType) - if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer { - if config.AuthParams["username"] == "" { - return errors.New("no username given") - } - meta.username = strings.TrimSpace(config.AuthParams["username"]) - - if config.AuthParams["password"] == "" { - return errors.New("no password given") + switch { + case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer: + err := parseSaslParams(config, meta, mode) + if err != nil { + return err } - meta.password = strings.TrimSpace(config.AuthParams["password"]) - meta.saslType = mode - - if mode == KafkaSASLTypeOAuthbearer { - meta.scopes = strings.Split(config.AuthParams["scopes"], ",") - - if config.AuthParams["oauthTokenEndpointUri"] == "" { - return errors.New("no oauth token endpoint uri given") - } - meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"]) - - meta.oauthExtensions = make(map[string]string) - oauthExtensionsRaw := config.AuthParams["oauthExtensions"] - if oauthExtensionsRaw != "" { - for _, extension := range strings.Split(oauthExtensionsRaw, ",") { - splittedExtension := strings.Split(extension, "=") - if len(splittedExtension) != 2 { - return errors.New("invalid OAuthBearer extension, must be of format key=value") - } - meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1] - } - } + case mode == KafkaSASLTypeGSSAPI: + err := parseKerberosParams(config, meta, mode) + if err != nil { + return err } - } else { + default: return fmt.Errorf("err SASL mode %s given", mode) } } @@ -229,10 +215,112 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error { } meta.enableTLS = true } + return nil +} + +func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + if config.AuthParams["username"] == "" { + return errors.New("no username given") + } + meta.username = strings.TrimSpace(config.AuthParams["username"]) + + if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") || + (config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") { + return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication") + } + if config.AuthParams["password"] != "" { + meta.password = strings.TrimSpace(config.AuthParams["password"]) + } else { + path, err := saveToFile(config.AuthParams["keytab"]) + if err != nil { + return fmt.Errorf("error saving keytab to file: %w", err) + } + meta.keytabPath = path + } + + if config.AuthParams["realm"] == "" { + return errors.New("no realm given") + } + meta.realm = strings.TrimSpace(config.AuthParams["realm"]) + + if config.AuthParams["kerberosConfig"] == "" { + return errors.New("no Kerberos configuration file (kerberosConfig) given") + } + path, err := saveToFile(config.AuthParams["kerberosConfig"]) + if err != nil { + return fmt.Errorf("error saving kerberosConfig to file: %w", err) + } + meta.kerberosConfigPath = path + + meta.saslType = mode + return nil +} + +func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error { + if config.AuthParams["username"] == "" { + return errors.New("no username given") + } + meta.username = strings.TrimSpace(config.AuthParams["username"]) + + if config.AuthParams["password"] == "" { + return errors.New("no password given") + } + meta.password = strings.TrimSpace(config.AuthParams["password"]) + meta.saslType = mode + if mode == KafkaSASLTypeOAuthbearer { + meta.scopes = strings.Split(config.AuthParams["scopes"], ",") + + if config.AuthParams["oauthTokenEndpointUri"] == "" { + return errors.New("no oauth token endpoint uri given") + } + meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"]) + + meta.oauthExtensions = make(map[string]string) + oauthExtensionsRaw := config.AuthParams["oauthExtensions"] + if oauthExtensionsRaw != "" { + for _, extension := range strings.Split(oauthExtensionsRaw, ",") { + splittedExtension := strings.Split(extension, "=") + if len(splittedExtension) != 2 { + return errors.New("invalid OAuthBearer extension, must be of format key=value") + } + meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1] + } + } + } return nil } +func saveToFile(content string) (string, error) { + data := []byte(content) + + tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos") + err := os.MkdirAll(tempKrbDir, 0700) + if err != nil { + fmt.Printf("Error creating temporary directory: %s. Error: %s\n", tempKrbDir, err) + return "", err + } + + tempFile, err := os.CreateTemp(tempKrbDir, "krb_*") + if err != nil { + fmt.Println("Error creating temporary file:", err) + return "", err + } + defer tempFile.Close() + + _, err = tempFile.Write(data) + if err != nil { + fmt.Println("Error writing to temporary file:", err) + return "", err + } + + // Get the temporary file's name + tempFilename := tempFile.Name() + fmt.Println("Data has been successfully saved to temporary file:", tempFilename) + + return tempFilename, nil +} + func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata, error) { meta := kafkaMetadata{} switch { @@ -364,7 +452,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin config := sarama.NewConfig() config.Version = metadata.version - if metadata.saslType != KafkaSASLTypeNone { + if metadata.saslType != KafkaSASLTypeNone && metadata.saslType != KafkaSASLTypeGSSAPI { config.Net.SASL.Enable = true config.Net.SASL.User = metadata.username config.Net.SASL.Password = metadata.password @@ -398,6 +486,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions) } + if metadata.saslType == KafkaSASLTypeGSSAPI { + config.Net.SASL.Enable = true + config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + config.Net.SASL.GSSAPI.ServiceName = "kafka" + config.Net.SASL.GSSAPI.Username = metadata.username + config.Net.SASL.GSSAPI.Realm = metadata.realm + config.Net.SASL.GSSAPI.KerberosConfigPath = metadata.kerberosConfigPath + if metadata.keytabPath != "" { + config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH + config.Net.SASL.GSSAPI.KeyTabPath = metadata.keytabPath + } else { + config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH + config.Net.SASL.GSSAPI.Password = metadata.password + } + } + client, err := sarama.NewClient(metadata.bootstrapServers, config) if err != nil { return nil, nil, fmt.Errorf("error creating kafka client: %w", err) @@ -558,7 +662,24 @@ func (s *kafkaScaler) Close(context.Context) error { if s.admin == nil { return nil } - return s.admin.Close() + + err := s.admin.Close() + if err != nil { + return err + } + + // clean up any temporary files + if strings.TrimSpace(s.metadata.kerberosConfigPath) != "" { + if err := os.Remove(s.metadata.kerberosConfigPath); err != nil { + return err + } + } + if strings.TrimSpace(s.metadata.keytabPath) != "" { + if err := os.Remove(s.metadata.keytabPath); err != nil { + return err + } + } + return nil } func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 32daa453830..9b616c29250 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -2,6 +2,8 @@ package scalers import ( "context" + "fmt" + "os" "reflect" "strings" "testing" @@ -135,6 +137,14 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ {map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "disable"}, false, false}, // success, SASL OAUTHBEARER + TLS {map[string]string{"sasl": "oauthbearer", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, false, false}, + // success, SASL GSSAPI/password + {map[string]string{"sasl": "gssapi", "username": "admin", "password": "admin", "kerberosConfig": "", "realm": "tst.com"}, false, false}, + // success, SASL GSSAPI/keytab + {map[string]string{"sasl": "gssapi", "username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com"}, false, false}, + // success, SASL GSSAPI/password + TLS + {map[string]string{"sasl": "gssapi", "username": "admin", "password": "admin", "kerberosConfig": "", "realm": "tst.com", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + // success, SASL GSSAPI/keytab + TLS + {map[string]string{"sasl": "gssapi", "username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, // failure, SASL OAUTHBEARER + TLS bad sasl type {map[string]string{"sasl": "foo", "username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com", "tls": "disable"}, true, false}, // success, SASL OAUTHBEARER + TLS missing scope @@ -165,6 +175,14 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ {map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "key": "keey"}, true, false}, // failure, SASL + TLS, missing key {map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false}, + // failure, SASL GSSAPI missing password and keytab + {map[string]string{"sasl": "gssapi", "username": "admin", "kerberosConfig": "", "realm": "tst.com"}, true, false}, + // failure, SASL GSSAPI provided both password and keytab + {map[string]string{"sasl": "gssapi", "username": "admin", "password": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com"}, true, false}, + // failure, SASL GSSAPI/password + TLS missing realm + {map[string]string{"sasl": "gssapi", "username": "admin", "password": "admin", "kerberosConfig": "", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + // failure, SASL GSSAPI/keytab + TLS missing username + {map[string]string{"sasl": "gssapi", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "tst.com", "tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, } var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{ // success, SASL plaintext @@ -187,6 +205,18 @@ var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{ {map[string]string{"sasl": "plaintext", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin"}, false, false}, // success, SASL OAUTHBEARER + TLS explicitly disabled {map[string]string{"sasl": "oauthbearer", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, false, false}, + // success, SASL GSSAPI/password + {map[string]string{"sasl": "gssapi", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "kerberosConfig": "", "realm": "test.com"}, false, false}, + // success, SASL GSSAPI/password + TLS explicitly disabled + {map[string]string{"sasl": "gssapi", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "kerberosConfig": "", "realm": "test.com"}, false, false}, + // success, SASL GSSAPI/password + TLS + {map[string]string{"sasl": "gssapi", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "kerberosConfig": "", "realm": "test.com", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, false}, + // success, SASL GSSAPI/keytab + {map[string]string{"sasl": "gssapi", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "test.com"}, false, false}, + // success, SASL GSSAPI/keytab + TLS explicitly disabled + {map[string]string{"sasl": "gssapi", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "test.com"}, false, false}, + // success, SASL GSSAPI/keytab + TLS + {map[string]string{"sasl": "gssapi", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "keytab": "/path/to/keytab", "kerberosConfig": "", "realm": "test.com", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, false}, // failure, SASL OAUTHBEARER + TLS explicitly disable + bad SASL type {map[string]string{"sasl": "foo", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "scopes": "scope", "oauthTokenEndpointUri": "https://website.com"}, true, false}, // success, SASL OAUTHBEARER + TLS missing scope @@ -216,8 +246,17 @@ var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{ // failure, SASL + TLS, missing cert {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "key": "keey"}, true, false}, // failure, SASL + TLS, missing key - {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert"}, true, false}, - + {map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "plaintext", "username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert"}, true, true}, + // failure, SASL GSSAPI missing keytab and password + {map[string]string{"sasl": "gssapi", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "realm": "test.com"}, true, false}, + // failure, SASL GSSAPI values in both keytab and password + {map[string]string{"sasl": "gssapi", "tls": "disable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "keytab": "/path/to/keytab", "realm": "test.com"}, true, false}, + // failure, SASL GSSAPI + TLS missing realm + {map[string]string{"sasl": "gssapi", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "keytab": "/path/to/keytab", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true}, + // failure, SASL GSSAPI + TLS missing username + {map[string]string{"sasl": "gssapi", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"realm": "test.com", "keytab": "/path/to/keytab", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true}, + // failure, SASL GSSAPI + TLS missing kerberosConfig + {map[string]string{"sasl": "gssapi", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "realm": "test.com", "keytab": "/path/to/keytab", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, true}, // failure, setting SASL values in both places {map[string]string{"sasl": "scram_sha512", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"sasl": "scram_sha512", "username": "admin", "password": "admin"}, true, false}, // failure, setting TLS values in both places @@ -325,8 +364,7 @@ func TestGetBrokers(t *testing.T) { } } -func TestKafkaAuthParams(t *testing.T) { - // Testing tls and sasl value in TriggerAuthentication +func TestKafkaAuthParamsInTriggerAuthentication(t *testing.T) { for _, testData := range parseKafkaAuthParamsTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) @@ -353,9 +391,24 @@ func TestKafkaAuthParams(t *testing.T) { t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key) } } + if meta.saslType == KafkaSASLTypeGSSAPI && !testData.isError { + if testData.authParams["keytab"] != "" { + err := testFileContents(testData, meta, "keytab") + if err != nil { + t.Errorf(err.Error()) + } + } + if !testData.isError { + err := testFileContents(testData, meta, "kerberosConfig") + if err != nil { + t.Errorf(err.Error()) + } + } + } } +} - // Testing tls and sasl value in scaledObject +func TestKafkaAuthParamsInScaledObject(t *testing.T) { for id, testData := range parseAuthParamsTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}, logr.Discard()) @@ -387,6 +440,27 @@ func TestKafkaAuthParams(t *testing.T) { } } +func testFileContents(testData parseKafkaAuthParamsTestData, meta kafkaMetadata, prop string) error { + if testData.authParams[prop] != "" { + var path string + switch prop { + case "keytab": + path = meta.keytabPath + case "kerberosConfig": + path = meta.kerberosConfigPath + } + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("expected to find '%v' file at %v", prop, path) + } + contents := string(data) + if contents != testData.authParams[prop] { + return fmt.Errorf("expected keytab value: '%v' but got '%v'", testData.authParams[prop], contents) + } + } + return nil +} + func TestKafkaOAuthbrearerAuthParams(t *testing.T) { for _, testData := range parseKafkaOAuthbrearerAuthParamsTestDataset { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: validKafkaMetadata, AuthParams: testData.authParams}, logr.Discard()) From f39f12ac11e47120ffe250b74caf42bed6883f47 Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 30 Sep 2023 10:52:00 -0400 Subject: [PATCH 2/8] merged conflicts Signed-off-by: Roman Novichenok --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index adcd887e5b6..d754bb80157 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,7 +51,6 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) -- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) #### Experimental @@ -73,7 +72,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/ked New deprecation(s): -- **General**: Clean up previously deprecated code for 2.12 release ([#4899](https://github.com/kedacore/keda/issues/4899)) +- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) ### Breaking Changes From 3b5cc60f00cf443402caf264c33f76ee9237fd5b Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 30 Sep 2023 11:08:38 -0400 Subject: [PATCH 3/8] remove dedicated kerberos mount - will be managed by adding docs Signed-off-by: Roman Novichenok --- config/manager/manager.yaml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index b3eca570b1d..9868885f1f6 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -77,19 +77,12 @@ spec: seccompProfile: type: RuntimeDefault volumeMounts: - - mountPath: /tmp/kerberos - name: temp-kerberos-vol - readOnly: false - mountPath: /certs name: certificates readOnly: true terminationGracePeriodSeconds: 10 nodeSelector: kubernetes.io/os: linux - volumes: - - name: temp-kerberos-vol - emptyDir: - medium: Memory - name: certificates secret: defaultMode: 420 From 81e0b547f90af88f46f2f74763ea63b0aebee018 Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 30 Sep 2023 11:09:53 -0400 Subject: [PATCH 4/8] remove dedicated kerberos mount - will be managed by adding docs Signed-off-by: Roman Novichenok --- config/manager/manager.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 9868885f1f6..5cd7246d081 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -83,6 +83,7 @@ spec: terminationGracePeriodSeconds: 10 nodeSelector: kubernetes.io/os: linux + volumes: - name: certificates secret: defaultMode: 420 From 1b14ae16e48a016420fe21012bbb3f65dc113d88 Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 30 Sep 2023 12:27:53 -0400 Subject: [PATCH 5/8] update error messages Signed-off-by: Roman Novichenok --- pkg/scalers/kafka_scaler.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index d3f3ea13548..6d34b14c11a 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -333,26 +333,23 @@ func saveToFile(content string) (string, error) { tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos") err := os.MkdirAll(tempKrbDir, 0700) if err != nil { - fmt.Printf("Error creating temporary directory: %s. Error: %s\n", tempKrbDir, err) - return "", err + return "", fmt.Errorf(`error creating temporary directory: %s. Error: %s + Note, when running in a container a writable /tmp/kerberos emptyDir must be mounted. Refer to documentation`, tempKrbDir, err) } tempFile, err := os.CreateTemp(tempKrbDir, "krb_*") if err != nil { - fmt.Println("Error creating temporary file:", err) - return "", err + return "", fmt.Errorf("error creating temporary file: %s", err) } defer tempFile.Close() _, err = tempFile.Write(data) if err != nil { - fmt.Println("Error writing to temporary file:", err) - return "", err + return "", fmt.Errorf("error writing to temporary file: %s", err) } // Get the temporary file's name tempFilename := tempFile.Name() - fmt.Println("Data has been successfully saved to temporary file:", tempFilename) return tempFilename, nil } From e7665febe80e21d8f38f1eb11cdb8cbf7a41590e Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Sat, 30 Sep 2023 12:33:25 -0400 Subject: [PATCH 6/8] update error messages Signed-off-by: Roman Novichenok --- pkg/scalers/kafka_scaler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 6d34b14c11a..5b1e92522d3 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -333,19 +333,19 @@ func saveToFile(content string) (string, error) { tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos") err := os.MkdirAll(tempKrbDir, 0700) if err != nil { - return "", fmt.Errorf(`error creating temporary directory: %s. Error: %s + return "", fmt.Errorf(`error creating temporary directory: %s. Error: %w Note, when running in a container a writable /tmp/kerberos emptyDir must be mounted. Refer to documentation`, tempKrbDir, err) } tempFile, err := os.CreateTemp(tempKrbDir, "krb_*") if err != nil { - return "", fmt.Errorf("error creating temporary file: %s", err) + return "", fmt.Errorf("error creating temporary file: %w", err) } defer tempFile.Close() _, err = tempFile.Write(data) if err != nil { - return "", fmt.Errorf("error writing to temporary file: %s", err) + return "", fmt.Errorf("error writing to temporary file: %w", err) } // Get the temporary file's name From 6b0ca7f25700156100bd258513fce1feb397f793 Mon Sep 17 00:00:00 2001 From: novicr <43680008+novicr@users.noreply.github.com> Date: Tue, 10 Oct 2023 09:49:59 -0400 Subject: [PATCH 7/8] Update CHANGELOG.md Co-authored-by: Zbynek Roubalik Signed-off-by: novicr <43680008+novicr@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d754bb80157..1d356996a27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,7 +77,7 @@ New deprecation(s): ### Breaking Changes - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) -- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) +- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) ### Other - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) From 73d29a8c9feb76b44875c04a923fb1de7cd604f9 Mon Sep 17 00:00:00 2001 From: Roman Novichenok Date: Tue, 10 Oct 2023 09:52:52 -0400 Subject: [PATCH 8/8] move change to improvements Signed-off-by: Roman Novichenok --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d356996a27..bac51f189c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features: ### Improvements - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) ### Fixes @@ -77,7 +78,6 @@ New deprecation(s): ### Breaking Changes - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) -- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) ### Other - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))