Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Pulsar: add support for bearer token and basic auth #3845

Merged
merged 4 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844))
- **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833))

### Fixes
Expand Down
22 changes: 15 additions & 7 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
)

const (
authModesKey = "authModes"
AuthModesKey = "authModes"
)

func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMeta, err error) {
out = &AuthMeta{}

authModes, ok := triggerMetadata[authModesKey]
authModes, ok := triggerMetadata[AuthModesKey]
// no authMode specified
if !ok {
return nil, nil
Expand Down Expand Up @@ -81,14 +81,22 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
return out, err
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}

func NewTLSConfig(auth *AuthMeta) (*tls.Config, error) {
return kedautil.NewTLSConfig(
auth.Cert,
auth.Key,
auth.CA,
)
}

func CreateHTTPRoundTripper(roundTripperType TransportType, auth *AuthMeta, conf ...*HTTPTransport) (rt http.RoundTripper, err error) {
tlsConfig := &tls.Config{InsecureSkipVerify: false}
if auth != nil && (auth.CA != "" || auth.EnableTLS) {
tlsConfig, err = kedautil.NewTLSConfig(
auth.Cert,
auth.Key,
auth.CA,
)
tlsConfig, err = NewTLSConfig(auth)
if err != nil || tlsConfig == nil {
return nil, fmt.Errorf("error creating the TLS config: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/loki_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) {
}

if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.lokiAuth.BearerToken))
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth))
} else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
}

if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBearerAuth {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken))
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.prometheusAuth))
} else if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password)
}
Expand Down
79 changes: 53 additions & 26 deletions pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -32,11 +33,7 @@ type pulsarMetadata struct {
msgBacklogThreshold int64
activationMsgBacklogThreshold int64

// TLS
enableTLS bool
cert string
key string
ca string
pulsarAuth *authentication.AuthMeta

statsURL string
metricName string
Expand All @@ -49,6 +46,7 @@ const (
defaultMsgBacklogThreshold = 10
enable = "enable"
stringTrue = "true"
pulsarAuthModeHeader = "X-Pulsar-Auth-Method-Name"
)

type pulsarSubscription struct {
Expand Down Expand Up @@ -105,12 +103,24 @@ func NewPulsarScaler(config *ScalerConfig) (Scaler, error) {

client := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)

if pulsarMetadata.enableTLS {
config, err := kedautil.NewTLSConfig(pulsarMetadata.cert, pulsarMetadata.key, pulsarMetadata.ca)
if err != nil {
return nil, err
if pulsarMetadata.pulsarAuth != nil {
if pulsarMetadata.pulsarAuth.CA != "" || pulsarMetadata.pulsarAuth.EnableTLS {
config, err := authentication.NewTLSConfig(pulsarMetadata.pulsarAuth)
if err != nil {
return nil, err
}
client.Transport = &http.Transport{TLSClientConfig: config}
}

if pulsarMetadata.pulsarAuth.EnableBearerAuth || pulsarMetadata.pulsarAuth.EnableBasicAuth {
// The pulsar broker redirects HTTP calls to other brokers and expects the Authorization header
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
if len(via) != 0 && via[0].Response.StatusCode == http.StatusTemporaryRedirect {
addAuthHeaders(req, &pulsarMetadata)
}
return nil
}
}
client.Transport = &http.Transport{TLSClientConfig: config}
}

return &pulsarScaler{
Expand Down Expand Up @@ -176,25 +186,21 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) {
}
meta.msgBacklogThreshold = t
}

meta.enableTLS = false
if val, ok := config.TriggerMetadata["tls"]; ok {
val = strings.TrimSpace(val)

if val == enable {
cert := config.AuthParams["cert"]
key := config.AuthParams["key"]
if key == "" || cert == "" {
return meta, errors.New("must be provided cert and key")
// For backwards compatibility, we need to map "tls: enable" to
if tls, ok := config.TriggerMetadata["tls"]; ok {
if tls == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") {
if authModes, authModesOk := config.TriggerMetadata[authentication.AuthModesKey]; authModesOk {
config.TriggerMetadata[authentication.AuthModesKey] = fmt.Sprintf("%s,%s", authModes, authentication.TLSAuthType)
} else {
config.TriggerMetadata[authentication.AuthModesKey] = string(authentication.TLSAuthType)
}
meta.ca = config.AuthParams["ca"]
meta.cert = cert
meta.key = key
meta.enableTLS = true
} else {
return meta, fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", msgBacklogMetricName, err)
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
return meta, nil
}
Expand All @@ -207,6 +213,8 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
return nil, fmt.Errorf("error requesting stats from url: %s", err)
}

addAuthHeaders(req, &s.metadata)

res, err := s.client.Do(req)
if res == nil || err != nil {
return nil, fmt.Errorf("error requesting stats from url: %s", err)
Expand Down Expand Up @@ -298,3 +306,22 @@ func (s *pulsarScaler) Close(context.Context) error {
s.client = nil
return nil
}

// addAuthHeaders add the relevant headers used by Pulsar to authenticate and authorize http requests
func addAuthHeaders(req *http.Request, metadata *pulsarMetadata) {
if metadata.pulsarAuth == nil {
return
}
switch {
case metadata.pulsarAuth.EnableBearerAuth:
req.Header.Add("Authorization", authentication.GetBearerToken(metadata.pulsarAuth))
req.Header.Add(pulsarAuthModeHeader, "token")
case metadata.pulsarAuth.EnableBasicAuth:
req.SetBasicAuth(metadata.pulsarAuth.Username, metadata.pulsarAuth.Password)
req.Header.Add(pulsarAuthModeHeader, "basic")
case metadata.pulsarAuth.EnableTLS:
// When BearerAuth or BasicAuth are also configured, let them take precedence for the purposes of
// the authMode header.
req.Header.Add(pulsarAuthModeHeader, "tls")
}
}
85 changes: 59 additions & 26 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@ type parsePulsarMetadataTestData struct {
}

type parsePulsarAuthParamsTestData struct {
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
}

type pulsarMetricIdentifier struct {
metadataTestData *parsePulsarMetadataTestData
name string
}

// A complete valid metadata example for reference
var validPulsarMetadata = map[string]string{
"adminURL": "http://172.20.0.151:80",
"topic": "persistent://public/default/my-topic",
"subscription": "sub1",
"tls": "enable",
}

// A complete valid authParams example for sasl, with username and passwd
var validPulsarWithAuthParams = map[string]string{
"cert": "certdata",
Expand Down Expand Up @@ -70,8 +66,19 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{
}

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// failure, no adminURL
{map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata"},
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -143,28 +150,54 @@ func TestParsePulsarMetadata(t *testing.T) {

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: validPulsarMetadata, AuthParams: testData.authParams})
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
t.Error("Expected success but got error", testData.authParams, err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)

if meta.pulsarAuth == nil {
t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData)
continue
}

if meta.pulsarAuth.EnableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.pulsarAuth.EnableTLS)
}

if meta.pulsarAuth.CA != testData.ca {
t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.pulsarAuth.CA)
}

if meta.pulsarAuth.Cert != testData.cert {
t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.pulsarAuth.Cert)
}

if meta.pulsarAuth.Key != testData.key {
t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.pulsarAuth.Key)
}

if meta.pulsarAuth.EnableBearerAuth != (testData.bearerToken != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
}

if meta.pulsarAuth.BearerToken != testData.bearerToken {
t.Errorf("Expected bearer token to be set to %s but got %s\n", testData.bearerToken, meta.pulsarAuth.BearerToken)
}

if meta.ca != testData.ca {
t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.ca)
if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
}

if meta.cert != testData.cert {
t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.cert)
if meta.pulsarAuth.Username != testData.username {
t.Errorf("Expected username to be set to %s but got %s\n", testData.username, meta.pulsarAuth.Username)
}

if meta.key != testData.key {
t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.key)
if meta.pulsarAuth.Password != testData.password {
t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password)
}
}
}
Expand Down
Loading