Skip to content

Commit

Permalink
Apache Pulsar: add support for bearer token and basic auth (#3845)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Marshall <mmarshall@apache.org>
  • Loading branch information
michaeljmarshall authored Nov 15, 2022
1 parent 32348ae commit cd38e83
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 74 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844))
- **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833))

### Fixes
Expand Down
22 changes: 15 additions & 7 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
)

const (
authModesKey = "authModes"
AuthModesKey = "authModes"
)

func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMeta, err error) {
out = &AuthMeta{}

authModes, ok := triggerMetadata[authModesKey]
authModes, ok := triggerMetadata[AuthModesKey]
// no authMode specified
if !ok {
return nil, nil
Expand Down Expand Up @@ -81,14 +81,22 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
return out, err
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}

func NewTLSConfig(auth *AuthMeta) (*tls.Config, error) {
return kedautil.NewTLSConfig(
auth.Cert,
auth.Key,
auth.CA,
)
}

func CreateHTTPRoundTripper(roundTripperType TransportType, auth *AuthMeta, conf ...*HTTPTransport) (rt http.RoundTripper, err error) {
tlsConfig := &tls.Config{InsecureSkipVerify: false}
if auth != nil && (auth.CA != "" || auth.EnableTLS) {
tlsConfig, err = kedautil.NewTLSConfig(
auth.Cert,
auth.Key,
auth.CA,
)
tlsConfig, err = NewTLSConfig(auth)
if err != nil || tlsConfig == nil {
return nil, fmt.Errorf("error creating the TLS config: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/loki_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) {
}

if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.lokiAuth.BearerToken))
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth))
} else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/prometheus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error
}

if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBearerAuth {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken))
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.prometheusAuth))
} else if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password)
}
Expand Down
79 changes: 53 additions & 26 deletions pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -32,11 +33,7 @@ type pulsarMetadata struct {
msgBacklogThreshold int64
activationMsgBacklogThreshold int64

// TLS
enableTLS bool
cert string
key string
ca string
pulsarAuth *authentication.AuthMeta

statsURL string
metricName string
Expand All @@ -49,6 +46,7 @@ const (
defaultMsgBacklogThreshold = 10
enable = "enable"
stringTrue = "true"
pulsarAuthModeHeader = "X-Pulsar-Auth-Method-Name"
)

type pulsarSubscription struct {
Expand Down Expand Up @@ -105,12 +103,24 @@ func NewPulsarScaler(config *ScalerConfig) (Scaler, error) {

client := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)

if pulsarMetadata.enableTLS {
config, err := kedautil.NewTLSConfig(pulsarMetadata.cert, pulsarMetadata.key, pulsarMetadata.ca)
if err != nil {
return nil, err
if pulsarMetadata.pulsarAuth != nil {
if pulsarMetadata.pulsarAuth.CA != "" || pulsarMetadata.pulsarAuth.EnableTLS {
config, err := authentication.NewTLSConfig(pulsarMetadata.pulsarAuth)
if err != nil {
return nil, err
}
client.Transport = &http.Transport{TLSClientConfig: config}
}

if pulsarMetadata.pulsarAuth.EnableBearerAuth || pulsarMetadata.pulsarAuth.EnableBasicAuth {
// The pulsar broker redirects HTTP calls to other brokers and expects the Authorization header
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
if len(via) != 0 && via[0].Response.StatusCode == http.StatusTemporaryRedirect {
addAuthHeaders(req, &pulsarMetadata)
}
return nil
}
}
client.Transport = &http.Transport{TLSClientConfig: config}
}

return &pulsarScaler{
Expand Down Expand Up @@ -176,25 +186,21 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) {
}
meta.msgBacklogThreshold = t
}

meta.enableTLS = false
if val, ok := config.TriggerMetadata["tls"]; ok {
val = strings.TrimSpace(val)

if val == enable {
cert := config.AuthParams["cert"]
key := config.AuthParams["key"]
if key == "" || cert == "" {
return meta, errors.New("must be provided cert and key")
// For backwards compatibility, we need to map "tls: enable" to
if tls, ok := config.TriggerMetadata["tls"]; ok {
if tls == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") {
if authModes, authModesOk := config.TriggerMetadata[authentication.AuthModesKey]; authModesOk {
config.TriggerMetadata[authentication.AuthModesKey] = fmt.Sprintf("%s,%s", authModes, authentication.TLSAuthType)
} else {
config.TriggerMetadata[authentication.AuthModesKey] = string(authentication.TLSAuthType)
}
meta.ca = config.AuthParams["ca"]
meta.cert = cert
meta.key = key
meta.enableTLS = true
} else {
return meta, fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", msgBacklogMetricName, err)
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
return meta, nil
}
Expand All @@ -207,6 +213,8 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
return nil, fmt.Errorf("error requesting stats from url: %s", err)
}

addAuthHeaders(req, &s.metadata)

res, err := s.client.Do(req)
if res == nil || err != nil {
return nil, fmt.Errorf("error requesting stats from url: %s", err)
Expand Down Expand Up @@ -298,3 +306,22 @@ func (s *pulsarScaler) Close(context.Context) error {
s.client = nil
return nil
}

// addAuthHeaders add the relevant headers used by Pulsar to authenticate and authorize http requests
func addAuthHeaders(req *http.Request, metadata *pulsarMetadata) {
if metadata.pulsarAuth == nil {
return
}
switch {
case metadata.pulsarAuth.EnableBearerAuth:
req.Header.Add("Authorization", authentication.GetBearerToken(metadata.pulsarAuth))
req.Header.Add(pulsarAuthModeHeader, "token")
case metadata.pulsarAuth.EnableBasicAuth:
req.SetBasicAuth(metadata.pulsarAuth.Username, metadata.pulsarAuth.Password)
req.Header.Add(pulsarAuthModeHeader, "basic")
case metadata.pulsarAuth.EnableTLS:
// When BearerAuth or BasicAuth are also configured, let them take precedence for the purposes of
// the authMode header.
req.Header.Add(pulsarAuthModeHeader, "tls")
}
}
85 changes: 59 additions & 26 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@ type parsePulsarMetadataTestData struct {
}

type parsePulsarAuthParamsTestData struct {
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
triggerMetadata map[string]string
authParams map[string]string
isError bool
enableTLS bool
cert string
key string
ca string
bearerToken string
username string
password string
}

type pulsarMetricIdentifier struct {
metadataTestData *parsePulsarMetadataTestData
name string
}

// A complete valid metadata example for reference
var validPulsarMetadata = map[string]string{
"adminURL": "http://172.20.0.151:80",
"topic": "persistent://public/default/my-topic",
"subscription": "sub1",
"tls": "enable",
}

// A complete valid authParams example for sasl, with username and passwd
var validPulsarWithAuthParams = map[string]string{
"cert": "certdata",
Expand Down Expand Up @@ -70,8 +66,19 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{
}

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// failure, no adminURL
{map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata"},
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -143,28 +150,54 @@ func TestParsePulsarMetadata(t *testing.T) {

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: validPulsarMetadata, AuthParams: testData.authParams})
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams})

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
t.Error("Expected success but got error", testData.authParams, err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)

if meta.pulsarAuth == nil {
t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData)
continue
}

if meta.pulsarAuth.EnableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.pulsarAuth.EnableTLS)
}

if meta.pulsarAuth.CA != testData.ca {
t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.pulsarAuth.CA)
}

if meta.pulsarAuth.Cert != testData.cert {
t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.pulsarAuth.Cert)
}

if meta.pulsarAuth.Key != testData.key {
t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.pulsarAuth.Key)
}

if meta.pulsarAuth.EnableBearerAuth != (testData.bearerToken != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
}

if meta.pulsarAuth.BearerToken != testData.bearerToken {
t.Errorf("Expected bearer token to be set to %s but got %s\n", testData.bearerToken, meta.pulsarAuth.BearerToken)
}

if meta.ca != testData.ca {
t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.ca)
if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
}

if meta.cert != testData.cert {
t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.cert)
if meta.pulsarAuth.Username != testData.username {
t.Errorf("Expected username to be set to %s but got %s\n", testData.username, meta.pulsarAuth.Username)
}

if meta.key != testData.key {
t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.key)
if meta.pulsarAuth.Password != testData.password {
t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password)
}
}
}
Expand Down
Loading

0 comments on commit cd38e83

Please sign in to comment.