Skip to content

Commit

Permalink
Add AWS MSK IAM authentication to Kafka scaler (#5692)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <adrien.fillon@manomano.com>
  • Loading branch information
adrien-f authored Apr 25, 2024
1 parent da3da65 commit 6681d5e
Show file tree
Hide file tree
Showing 12 changed files with 843 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ New deprecation(s):
### New

- **General**: Provide capability to filter CloudEvents ([#3533](https://github.com/kedacore/keda/issues/3533))
- **Kafka**: Support Kafka SASL MSK IAM authentication ([#5540](https://github.com/kedacore/keda/issues/5540))
- **NATS Scaler**: Add TLS authentication ([#2296](https://github.com/kedacore/keda/issues/2296))
- **ScaledObject**: Ability to specify `initialCooldownPeriod` ([#5008](https://github.com/kedacore/keda/issues/5008))

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/Huawei/gophercloud v1.0.21
github.com/IBM/sarama v1.43.1
github.com/arangodb/go-driver v1.6.2
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
Expand Down
67 changes: 61 additions & 6 deletions pkg/scalers/kafka/kafka_scaler_oauth_token_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,91 @@ package kafka

import (
"context"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"github.com/aws/aws-sdk-go-v2/aws"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)

type TokenProvider struct {
type TokenProvider interface {
sarama.AccessTokenProvider
String() string
}

type oauthBearerTokenProvider struct {
tokenSource oauth2.TokenSource
extensions map[string]string
}

func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) sarama.AccessTokenProvider {
func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) TokenProvider {
cfg := clientcredentials.Config{
ClientID: clientID,
ClientSecret: clientSecret,
TokenURL: tokenURL,
Scopes: scopes,
}

return &TokenProvider{
return &oauthBearerTokenProvider{
tokenSource: cfg.TokenSource(context.Background()),
extensions: extensions,
}
}

func (t *TokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
func (o *oauthBearerTokenProvider) Token() (*sarama.AccessToken, error) {
token, err := o.tokenSource.Token()
if err != nil {
return nil, err
}

return &sarama.AccessToken{Token: token.AccessToken, Extensions: o.extensions}, nil
}

func (o *oauthBearerTokenProvider) String() string {
return "OAuthBearer"
}

type mskTokenProvider struct {
sync.Mutex
expireAt *time.Time
token string
region string
credentialsProvider aws.CredentialsProvider
}

func OAuthMSKTokenProvider(cfg *aws.Config) TokenProvider {
return &mskTokenProvider{
region: cfg.Region,
credentialsProvider: cfg.Credentials,
}
}

func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) {
m.Lock()
defer m.Unlock()

if m.expireAt != nil && time.Now().Before(*m.expireAt) {
return &sarama.AccessToken{Token: m.token}, nil
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

token, expirationMs, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, m.credentialsProvider)
if err != nil {
return nil, err
}

return &sarama.AccessToken{Token: token.AccessToken, Extensions: t.extensions}, nil
expirationTime := time.UnixMilli(expirationMs)
m.expireAt = &expirationTime
m.token = token

return &sarama.AccessToken{Token: token}, err
}

func (m *mskTokenProvider) String() string {
return "MSK"
}
Loading

0 comments on commit 6681d5e

Please sign in to comment.