Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for GSSAPI in Kafka scaler #4851

Merged
merged 9 commits into from
Oct 10, 2023
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ New deprecation(s):
### Breaking Changes

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

- **Kafka**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))
novicr marked this conversation as resolved.
Show resolved Hide resolved
### Other

- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
Expand Down
184 changes: 151 additions & 33 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -70,6 +71,11 @@ type kafkaMetadata struct {
username string
password string

// GSSAPI
keytabPath string
realm string
kerberosConfigPath string

// OAUTHBEARER
scopes []string
oauthTokenEndpointURI string
Expand Down Expand Up @@ -102,6 +108,7 @@ const (
KafkaSASLTypeSCRAMSHA256 kafkaSaslType = "scram_sha256"
KafkaSASLTypeSCRAMSHA512 kafkaSaslType = "scram_sha512"
KafkaSASLTypeOAuthbearer kafkaSaslType = "oauthbearer"
KafkaSASLTypeGSSAPI kafkaSaslType = "gssapi"
)

const (
Expand Down Expand Up @@ -165,39 +172,18 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
saslAuthType = strings.TrimSpace(saslAuthType)
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
switch {
case mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 || mode == KafkaSASLTypeOAuthbearer:
err := parseSaslParams(config, meta, mode)
if err != nil {
return err
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
case mode == KafkaSASLTypeGSSAPI:
err := parseKerberosParams(config, meta, mode)
if err != nil {
return err
}
} else {
default:
return fmt.Errorf("err SASL mode %s given", mode)
}
}
Expand Down Expand Up @@ -265,10 +251,109 @@ func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error {
meta.keyPassword = ""
}
meta.enableTLS = true
return nil
}

func parseKerberosParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if (config.AuthParams["password"] == "" && config.AuthParams["keytab"] == "") ||
(config.AuthParams["password"] != "" && config.AuthParams["keytab"] != "") {
return errors.New("exactly one of 'password' or 'keytab' must be provided for GSSAPI authentication")
}
if config.AuthParams["password"] != "" {
meta.password = strings.TrimSpace(config.AuthParams["password"])
} else {
path, err := saveToFile(config.AuthParams["keytab"])
if err != nil {
return fmt.Errorf("error saving keytab to file: %w", err)
}
meta.keytabPath = path
}

if config.AuthParams["realm"] == "" {
return errors.New("no realm given")
}
meta.realm = strings.TrimSpace(config.AuthParams["realm"])

if config.AuthParams["kerberosConfig"] == "" {
return errors.New("no Kerberos configuration file (kerberosConfig) given")
}
path, err := saveToFile(config.AuthParams["kerberosConfig"])
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error saving kerberosConfig to file: %w", err)
}
meta.kerberosConfigPath = path

meta.saslType = mode
return nil
}

func parseSaslParams(config *ScalerConfig, meta *kafkaMetadata, mode kafkaSaslType) error {
if config.AuthParams["username"] == "" {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode

if mode == KafkaSASLTypeOAuthbearer {
meta.scopes = strings.Split(config.AuthParams["scopes"], ",")

if config.AuthParams["oauthTokenEndpointUri"] == "" {
return errors.New("no oauth token endpoint uri given")
}
meta.oauthTokenEndpointURI = strings.TrimSpace(config.AuthParams["oauthTokenEndpointUri"])

meta.oauthExtensions = make(map[string]string)
oauthExtensionsRaw := config.AuthParams["oauthExtensions"]
if oauthExtensionsRaw != "" {
for _, extension := range strings.Split(oauthExtensionsRaw, ",") {
splittedExtension := strings.Split(extension, "=")
if len(splittedExtension) != 2 {
return errors.New("invalid OAuthBearer extension, must be of format key=value")
}
meta.oauthExtensions[splittedExtension[0]] = splittedExtension[1]
}
}
}
return nil
}

func saveToFile(content string) (string, error) {
data := []byte(content)

tempKrbDir := fmt.Sprintf("%s%c%s", os.TempDir(), os.PathSeparator, "kerberos")
err := os.MkdirAll(tempKrbDir, 0700)
if err != nil {
return "", fmt.Errorf(`error creating temporary directory: %s. Error: %w
Note, when running in a container a writable /tmp/kerberos emptyDir must be mounted. Refer to documentation`, tempKrbDir, err)
}

tempFile, err := os.CreateTemp(tempKrbDir, "krb_*")
if err != nil {
return "", fmt.Errorf("error creating temporary file: %w", err)
}
defer tempFile.Close()

_, err = tempFile.Write(data)
if err != nil {
return "", fmt.Errorf("error writing to temporary file: %w", err)
}

// Get the temporary file's name
tempFilename := tempFile.Name()

return tempFilename, nil
}

func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata, error) {
meta := kafkaMetadata{}
switch {
Expand Down Expand Up @@ -400,7 +485,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config := sarama.NewConfig()
config.Version = metadata.version

if metadata.saslType != KafkaSASLTypeNone {
if metadata.saslType != KafkaSASLTypeNone && metadata.saslType != KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.User = metadata.username
config.Net.SASL.Password = metadata.password
Expand Down Expand Up @@ -434,6 +519,22 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin
config.Net.SASL.TokenProvider = OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)
}

if metadata.saslType == KafkaSASLTypeGSSAPI {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
config.Net.SASL.GSSAPI.ServiceName = "kafka"
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
config.Net.SASL.GSSAPI.Username = metadata.username
config.Net.SASL.GSSAPI.Realm = metadata.realm
config.Net.SASL.GSSAPI.KerberosConfigPath = metadata.kerberosConfigPath
if metadata.keytabPath != "" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = metadata.keytabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = metadata.password
}
}

client, err := sarama.NewClient(metadata.bootstrapServers, config)
if err != nil {
return nil, nil, fmt.Errorf("error creating kafka client: %w", err)
Expand Down Expand Up @@ -594,7 +695,24 @@ func (s *kafkaScaler) Close(context.Context) error {
if s.admin == nil {
return nil
}
return s.admin.Close()

err := s.admin.Close()
if err != nil {
return err
}

// clean up any temporary files
if strings.TrimSpace(s.metadata.kerberosConfigPath) != "" {
if err := os.Remove(s.metadata.kerberosConfigPath); err != nil {
return err
}
}
if strings.TrimSpace(s.metadata.keytabPath) != "" {
if err := os.Remove(s.metadata.keytabPath); err != nil {
return err
}
}
return nil
}

func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
Expand Down
Loading