Skip to content

Commit

Permalink
support OAuth for pulsar scaler (kedacore#4709)
Browse files Browse the repository at this point in the history
Signed-off-by: anton.lysina <alysina@gmail.com>
  • Loading branch information
mingmcb authored and toniiiik committed Jan 15, 2024
1 parent c0eb9bf commit 01c8c36
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 11 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
- **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960))
- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700))

### Fixes
- **RabbitMQ Scaler**: Allow subpaths along with vhost in connection string ([#2634](https://github.com/kedacore/keda/issues/2634))
Expand All @@ -84,9 +85,9 @@ New deprecation(s):
### Other

- **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902))
- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049))
- **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781))
- **General**: Replace deprecated `set-output` command with environment file ([#4914](https://github.com/kedacore/keda/issues/4914))
- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049))

## v2.11.2

Expand Down
42 changes: 40 additions & 2 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBasicAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}

out.BearerToken = authParams["bearerToken"]
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}
out.BearerToken = strings.TrimSuffix(authParams["bearerToken"], "\n")
out.EnableBearerAuth = true
case BasicAuthType:
if len(authParams["username"]) == 0 {
Expand All @@ -51,6 +53,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBearerAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}

out.Username = authParams["username"]
// password is optional. For convenience, many application implement basic auth with
Expand Down Expand Up @@ -80,6 +85,18 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
}
out.CustomAuthValue = strings.TrimSuffix(authParams["customAuthValue"], "\n")
out.EnableCustomAuth = true
case OAuthType:
if out.EnableBasicAuth {
return nil, errors.New("both oauth and basic authentication can not be set")
}
if out.EnableBearerAuth {
return nil, errors.New("both oauth and bearer authentication can not be set")
}
out.EnableOAuth = true
out.OauthTokenURI = authParams["oauthTokenURI"]
out.Scopes = ParseScope(authParams["scope"])
out.ClientID = authParams["clientID"]
out.ClientSecret = authParams["clientSecret"]
default:
return nil, fmt.Errorf("incorrect value for authMode is given: %s", t)
}
Expand All @@ -92,6 +109,27 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
return out, err
}

// ParseScope parse OAuth scopes from a comma separated string
// whitespace is trimmed
func ParseScope(inputStr string) []string {
scope := strings.TrimSpace(inputStr)
if scope != "" {
scopes := make([]string, 0)
list := strings.Split(scope, ",")
for _, sc := range list {
sc := strings.TrimSpace(sc)
if sc != "" {
scopes = append(scopes, sc)
}
}
if len(scopes) == 0 {
return nil
}
return scopes
}
return nil
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
BearerAuthType Type = "bearer"
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
// OAuthType is an auth type using a oAuth2
OAuthType Type = "oauth"
)

// TransportType is type of http transport
Expand All @@ -42,6 +44,13 @@ type AuthMeta struct {
Key string
CA string

// oAuth2
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string

// custom auth header
EnableCustomAuth bool
CustomAuthHeader string
Expand Down
31 changes: 30 additions & 1 deletion pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
"golang.org/x/oauth2/clientcredentials"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand Down Expand Up @@ -211,6 +213,23 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada
if err != nil {
return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err)
}

if auth != nil && auth.EnableOAuth {
if auth.OauthTokenURI == "" {
auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"]
}
if auth.Scopes == nil {
auth.Scopes = authentication.ParseScope(config.TriggerMetadata["scope"])
}
if auth.ClientID == "" {
auth.ClientID = config.TriggerMetadata["clientID"]
}
// client_secret is not required for mtls OAuth(RFC8705)
// set secret to random string to work around the Go OAuth lib
if auth.ClientSecret == "" {
auth.ClientSecret = time.Now().String()
}
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
return meta, nil
Expand All @@ -224,9 +243,19 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}

client := s.client
if s.metadata.pulsarAuth.EnableOAuth {
config := clientcredentials.Config{
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
}
client = config.Client(context.Background())
}
addAuthHeaders(req, &s.metadata)

res, err := s.client.Do(req)
res, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}
Expand Down
104 changes: 97 additions & 7 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type parsePulsarAuthParamsTestData struct {
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
}

type pulsarMetricIdentifier struct {
Expand Down Expand Up @@ -74,18 +79,33 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""},

// Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"},
// Passes, oauth config data is set from metadata only
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""},
// Passes, oauth config data is set from TriggerAuth if both provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"},
// Passes, with multiple scopes from metadata
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""},
// Passes, with multiple scopes from TriggerAuth
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"},
// Passes, no scope provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
// Passes, invalid scopes provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -176,6 +196,20 @@ func TestParsePulsarMetadata(t *testing.T) {
}
}

func compareScope(scopes []string, scopeStr string) bool {
scopeMap := make(map[string]bool)
for _, scope := range scopes {
scopeMap[scope] = true
}
scopeList := strings.Fields(scopeStr)
for _, scope := range scopeList {
if !scopeMap[scope] {
return false
}
}
return true
}

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
Expand Down Expand Up @@ -218,7 +252,12 @@ func TestPulsarAuthParams(t *testing.T) {
}

if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
if testData.username != "" {
t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username)
}
if testData.password != "" {
t.Errorf("Expected EnableBasicAuth to be true when password is %s\n", testData.password)
}
}

if meta.pulsarAuth.Username != testData.username {
Expand All @@ -231,6 +270,57 @@ func TestPulsarAuthParams(t *testing.T) {
}
}

func TestPulsarOAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, logger)

if err != nil && !testData.isError {
t.Error("Expected success but got error", testData.authParams, err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if meta.pulsarAuth == nil {
t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData)
continue
}

if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") {
if testData.clientID != "" {
t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID)
}
if testData.clientSecret != "" {
t.Errorf("Expected EnableOAuth to be true when clientSecret is %s\n", testData.clientSecret)
}
}

if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI {
t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI)
}

if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) {
t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes)
}
if testData.scope == "" && meta.pulsarAuth.Scopes != nil {
t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes)
}

if meta.pulsarAuth.ClientID != testData.clientID {
t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID)
}

if meta.pulsarAuth.EnableOAuth && meta.pulsarAuth.ClientSecret == "" {
t.Errorf("Expected clientSecret not to be empty.\n")
}

if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 {
t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret)
}
}
}

func TestPulsarGetMetricSpecForScaling(t *testing.T) {
for _, testData := range pulsarMetricIdentifiers {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams}, "test_pulsar_scaler")
Expand Down

0 comments on commit 01c8c36

Please sign in to comment.