From 01c8c364980d55838d2e9f69ce3d02c83a07d11c Mon Sep 17 00:00:00 2001 From: Ming Meng <101287520+mingmcb@users.noreply.github.com> Date: Sun, 17 Sep 2023 11:02:10 -0400 Subject: [PATCH] support OAuth for pulsar scaler (#4709) Signed-off-by: anton.lysina --- CHANGELOG.md | 3 +- .../authentication/authentication_helpers.go | 42 ++++++- .../authentication/authentication_types.go | 9 ++ pkg/scalers/pulsar_scaler.go | 31 +++++- pkg/scalers/pulsar_scaler_test.go | 104 ++++++++++++++++-- 5 files changed, 178 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74853d133e7..e144e353289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905)) - **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528)) - **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960)) +- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### Fixes - **RabbitMQ Scaler**: Allow subpaths along with vhost in connection string ([#2634](https://github.com/kedacore/keda/issues/2634)) @@ -84,9 +85,9 @@ New deprecation(s): ### Other - **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902)) +- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049)) - **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) - **General**: Replace deprecated `set-output` command with environment file ([#4914](https://github.com/kedacore/keda/issues/4914)) -- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049)) ## v2.11.2 diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index 4e5284f9c68..e56485e659c 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -41,8 +41,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet if out.EnableBasicAuth { return nil, errors.New("both bearer and basic authentication can not be set") } - - out.BearerToken = authParams["bearerToken"] + if out.EnableOAuth { + return nil, errors.New("both bearer and OAuth can not be set") + } + out.BearerToken = strings.TrimSuffix(authParams["bearerToken"], "\n") out.EnableBearerAuth = true case BasicAuthType: if len(authParams["username"]) == 0 { @@ -51,6 +53,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet if out.EnableBearerAuth { return nil, errors.New("both bearer and basic authentication can not be set") } + if out.EnableOAuth { + return nil, errors.New("both bearer and OAuth can not be set") + } out.Username = authParams["username"] // password is optional. For convenience, many application implement basic auth with @@ -80,6 +85,18 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet } out.CustomAuthValue = strings.TrimSuffix(authParams["customAuthValue"], "\n") out.EnableCustomAuth = true + case OAuthType: + if out.EnableBasicAuth { + return nil, errors.New("both oauth and basic authentication can not be set") + } + if out.EnableBearerAuth { + return nil, errors.New("both oauth and bearer authentication can not be set") + } + out.EnableOAuth = true + out.OauthTokenURI = authParams["oauthTokenURI"] + out.Scopes = ParseScope(authParams["scope"]) + out.ClientID = authParams["clientID"] + out.ClientSecret = authParams["clientSecret"] default: return nil, fmt.Errorf("incorrect value for authMode is given: %s", t) } @@ -92,6 +109,27 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet return out, err } +// ParseScope parse OAuth scopes from a comma separated string +// whitespace is trimmed +func ParseScope(inputStr string) []string { + scope := strings.TrimSpace(inputStr) + if scope != "" { + scopes := make([]string, 0) + list := strings.Split(scope, ",") + for _, sc := range list { + sc := strings.TrimSpace(sc) + if sc != "" { + scopes = append(scopes, sc) + } + } + if len(scopes) == 0 { + return nil + } + return scopes + } + return nil +} + func GetBearerToken(auth *AuthMeta) string { return fmt.Sprintf("Bearer %s", auth.BearerToken) } diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 34d009995f0..66dc12b677c 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -16,6 +16,8 @@ const ( BearerAuthType Type = "bearer" // CustomAuthType is an auth type using a custom header CustomAuthType Type = "custom" + // OAuthType is an auth type using a oAuth2 + OAuthType Type = "oauth" ) // TransportType is type of http transport @@ -42,6 +44,13 @@ type AuthMeta struct { Key string CA string + // oAuth2 + EnableOAuth bool + OauthTokenURI string + Scopes []string + ClientID string + ClientSecret string + // custom auth header EnableCustomAuth bool CustomAuthHeader string diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index 511d61b6e1c..7e4442b84f6 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -9,8 +9,10 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/go-logr/logr" + "golang.org/x/oauth2/clientcredentials" v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/metrics/pkg/apis/external_metrics" @@ -211,6 +213,23 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada if err != nil { return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err) } + + if auth != nil && auth.EnableOAuth { + if auth.OauthTokenURI == "" { + auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"] + } + if auth.Scopes == nil { + auth.Scopes = authentication.ParseScope(config.TriggerMetadata["scope"]) + } + if auth.ClientID == "" { + auth.ClientID = config.TriggerMetadata["clientID"] + } + // client_secret is not required for mtls OAuth(RFC8705) + // set secret to random string to work around the Go OAuth lib + if auth.ClientSecret == "" { + auth.ClientSecret = time.Now().String() + } + } meta.pulsarAuth = auth meta.scalerIndex = config.ScalerIndex return meta, nil @@ -224,9 +243,19 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { return nil, fmt.Errorf("error requesting stats from admin url: %w", err) } + client := s.client + if s.metadata.pulsarAuth.EnableOAuth { + config := clientcredentials.Config{ + ClientID: s.metadata.pulsarAuth.ClientID, + ClientSecret: s.metadata.pulsarAuth.ClientSecret, + TokenURL: s.metadata.pulsarAuth.OauthTokenURI, + Scopes: s.metadata.pulsarAuth.Scopes, + } + client = config.Client(context.Background()) + } addAuthHeaders(req, &s.metadata) - res, err := s.client.Do(req) + res, err := client.Do(req) if err != nil { return nil, fmt.Errorf("error requesting stats from admin url: %w", err) } diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index beb8c700c33..8fc953e5332 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -31,6 +31,11 @@ type parsePulsarAuthParamsTestData struct { bearerToken string username string password string + enableOAuth bool + oauthTokenURI string + scope string + clientID string + clientSecret string } type pulsarMetricIdentifier struct { @@ -74,18 +79,33 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, mutual TLS, no other auth (legacy "tls: enable") - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Passes, mutual TLS, no other auth (uses new way to enable tls) - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Fails, mutual TLS (legacy "tls: enable") without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Fails, mutual TLS, (uses new way to enable tls) without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS. // The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""}, // Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""}, + + // Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS. + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"}, + // Passes, oauth config data is set from metadata only + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""}, + // Passes, oauth config data is set from TriggerAuth if both provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"}, + // Passes, with multiple scopes from metadata + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, + // Passes, with multiple scopes from TriggerAuth + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + // Passes, no scope provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, + // Passes, invalid scopes provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -176,6 +196,20 @@ func TestParsePulsarMetadata(t *testing.T) { } } +func compareScope(scopes []string, scopeStr string) bool { + scopeMap := make(map[string]bool) + for _, scope := range scopes { + scopeMap[scope] = true + } + scopeList := strings.Fields(scopeStr) + for _, scope := range scopeList { + if !scopeMap[scope] { + return false + } + } + return true +} + func TestPulsarAuthParams(t *testing.T) { for _, testData := range parsePulsarMetadataTestAuthTLSDataset { logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler") @@ -218,7 +252,12 @@ func TestPulsarAuthParams(t *testing.T) { } if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") { - t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken) + if testData.username != "" { + t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username) + } + if testData.password != "" { + t.Errorf("Expected EnableBasicAuth to be true when password is %s\n", testData.password) + } } if meta.pulsarAuth.Username != testData.username { @@ -231,6 +270,57 @@ func TestPulsarAuthParams(t *testing.T) { } } +func TestPulsarOAuthParams(t *testing.T) { + for _, testData := range parsePulsarMetadataTestAuthTLSDataset { + logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler") + meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, logger) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", testData.authParams, err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + + if meta.pulsarAuth == nil { + t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData) + continue + } + + if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") { + if testData.clientID != "" { + t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID) + } + if testData.clientSecret != "" { + t.Errorf("Expected EnableOAuth to be true when clientSecret is %s\n", testData.clientSecret) + } + } + + if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI { + t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI) + } + + if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) { + t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes) + } + if testData.scope == "" && meta.pulsarAuth.Scopes != nil { + t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes) + } + + if meta.pulsarAuth.ClientID != testData.clientID { + t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID) + } + + if meta.pulsarAuth.EnableOAuth && meta.pulsarAuth.ClientSecret == "" { + t.Errorf("Expected clientSecret not to be empty.\n") + } + + if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 { + t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) + } + } +} + func TestPulsarGetMetricSpecForScaling(t *testing.T) { for _, testData := range pulsarMetricIdentifiers { logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams}, "test_pulsar_scaler")