diff --git a/CHANGELOG.md b/CHANGELOG.md index fc84b27b825..34007df0fad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728)) - **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805)) - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) +- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844)) - **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833)) ### Fixes diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index a56d043edbe..537fe6175e2 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -17,13 +17,13 @@ import ( ) const ( - authModesKey = "authModes" + AuthModesKey = "authModes" ) func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMeta, err error) { out = &AuthMeta{} - authModes, ok := triggerMetadata[authModesKey] + authModes, ok := triggerMetadata[AuthModesKey] // no authMode specified if !ok { return nil, nil @@ -81,14 +81,22 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet return out, err } +func GetBearerToken(auth *AuthMeta) string { + return fmt.Sprintf("Bearer %s", auth.BearerToken) +} + +func NewTLSConfig(auth *AuthMeta) (*tls.Config, error) { + return kedautil.NewTLSConfig( + auth.Cert, + auth.Key, + auth.CA, + ) +} + func CreateHTTPRoundTripper(roundTripperType TransportType, auth *AuthMeta, conf ...*HTTPTransport) (rt http.RoundTripper, err error) { tlsConfig := &tls.Config{InsecureSkipVerify: false} if auth != nil && (auth.CA != "" || auth.EnableTLS) { - tlsConfig, err = kedautil.NewTLSConfig( - auth.Cert, - auth.Key, - auth.CA, - ) + tlsConfig, err = NewTLSConfig(auth) if err != nil || tlsConfig == nil { return nil, fmt.Errorf("error creating the TLS config: %s", err) } diff --git a/pkg/scalers/loki_scaler.go b/pkg/scalers/loki_scaler.go index 63ba9174a7c..0ad0fc41656 100644 --- a/pkg/scalers/loki_scaler.go +++ b/pkg/scalers/loki_scaler.go @@ -207,7 +207,7 @@ func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) { } if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.lokiAuth.BearerToken)) + req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth)) } else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth { req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password) } diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 4a64c27aca2..38cacc0fc1b 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -234,7 +234,7 @@ func (s *prometheusScaler) ExecutePromQuery(ctx context.Context) (float64, error } if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBearerAuth { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.metadata.prometheusAuth.BearerToken)) + req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.prometheusAuth)) } else if s.metadata.prometheusAuth != nil && s.metadata.prometheusAuth.EnableBasicAuth { req.SetBasicAuth(s.metadata.prometheusAuth.Username, s.metadata.prometheusAuth.Password) } diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index 4101b29536f..a0286bdd91b 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" + "github.com/kedacore/keda/v2/pkg/scalers/authentication" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -32,11 +33,7 @@ type pulsarMetadata struct { msgBacklogThreshold int64 activationMsgBacklogThreshold int64 - // TLS - enableTLS bool - cert string - key string - ca string + pulsarAuth *authentication.AuthMeta statsURL string metricName string @@ -49,6 +46,7 @@ const ( defaultMsgBacklogThreshold = 10 enable = "enable" stringTrue = "true" + pulsarAuthModeHeader = "X-Pulsar-Auth-Method-Name" ) type pulsarSubscription struct { @@ -105,12 +103,24 @@ func NewPulsarScaler(config *ScalerConfig) (Scaler, error) { client := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) - if pulsarMetadata.enableTLS { - config, err := kedautil.NewTLSConfig(pulsarMetadata.cert, pulsarMetadata.key, pulsarMetadata.ca) - if err != nil { - return nil, err + if pulsarMetadata.pulsarAuth != nil { + if pulsarMetadata.pulsarAuth.CA != "" || pulsarMetadata.pulsarAuth.EnableTLS { + config, err := authentication.NewTLSConfig(pulsarMetadata.pulsarAuth) + if err != nil { + return nil, err + } + client.Transport = &http.Transport{TLSClientConfig: config} + } + + if pulsarMetadata.pulsarAuth.EnableBearerAuth || pulsarMetadata.pulsarAuth.EnableBasicAuth { + // The pulsar broker redirects HTTP calls to other brokers and expects the Authorization header + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + if len(via) != 0 && via[0].Response.StatusCode == http.StatusTemporaryRedirect { + addAuthHeaders(req, &pulsarMetadata) + } + return nil + } } - client.Transport = &http.Transport{TLSClientConfig: config} } return &pulsarScaler{ @@ -176,25 +186,21 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { } meta.msgBacklogThreshold = t } - - meta.enableTLS = false - if val, ok := config.TriggerMetadata["tls"]; ok { - val = strings.TrimSpace(val) - - if val == enable { - cert := config.AuthParams["cert"] - key := config.AuthParams["key"] - if key == "" || cert == "" { - return meta, errors.New("must be provided cert and key") + // For backwards compatibility, we need to map "tls: enable" to + if tls, ok := config.TriggerMetadata["tls"]; ok { + if tls == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") { + if authModes, authModesOk := config.TriggerMetadata[authentication.AuthModesKey]; authModesOk { + config.TriggerMetadata[authentication.AuthModesKey] = fmt.Sprintf("%s,%s", authModes, authentication.TLSAuthType) + } else { + config.TriggerMetadata[authentication.AuthModesKey] = string(authentication.TLSAuthType) } - meta.ca = config.AuthParams["ca"] - meta.cert = cert - meta.key = key - meta.enableTLS = true - } else { - return meta, fmt.Errorf("err incorrect value for TLS given: %s", val) } } + auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams) + if err != nil { + return meta, fmt.Errorf("error parsing %s: %s", msgBacklogMetricName, err) + } + meta.pulsarAuth = auth meta.scalerIndex = config.ScalerIndex return meta, nil } @@ -207,6 +213,8 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { return nil, fmt.Errorf("error requesting stats from url: %s", err) } + addAuthHeaders(req, &s.metadata) + res, err := s.client.Do(req) if res == nil || err != nil { return nil, fmt.Errorf("error requesting stats from url: %s", err) @@ -298,3 +306,22 @@ func (s *pulsarScaler) Close(context.Context) error { s.client = nil return nil } + +// addAuthHeaders add the relevant headers used by Pulsar to authenticate and authorize http requests +func addAuthHeaders(req *http.Request, metadata *pulsarMetadata) { + if metadata.pulsarAuth == nil { + return + } + switch { + case metadata.pulsarAuth.EnableBearerAuth: + req.Header.Add("Authorization", authentication.GetBearerToken(metadata.pulsarAuth)) + req.Header.Add(pulsarAuthModeHeader, "token") + case metadata.pulsarAuth.EnableBasicAuth: + req.SetBasicAuth(metadata.pulsarAuth.Username, metadata.pulsarAuth.Password) + req.Header.Add(pulsarAuthModeHeader, "basic") + case metadata.pulsarAuth.EnableTLS: + // When BearerAuth or BasicAuth are also configured, let them take precedence for the purposes of + // the authMode header. + req.Header.Add(pulsarAuthModeHeader, "tls") + } +} diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 120011cb607..7e3f000f6a9 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -20,12 +20,16 @@ type parsePulsarMetadataTestData struct { } type parsePulsarAuthParamsTestData struct { - authParams map[string]string - isError bool - enableTLS bool - cert string - key string - ca string + triggerMetadata map[string]string + authParams map[string]string + isError bool + enableTLS bool + cert string + key string + ca string + bearerToken string + username string + password string } type pulsarMetricIdentifier struct { @@ -33,14 +37,6 @@ type pulsarMetricIdentifier struct { name string } -// A complete valid metadata example for reference -var validPulsarMetadata = map[string]string{ - "adminURL": "http://172.20.0.151:80", - "topic": "persistent://public/default/my-topic", - "subscription": "sub1", - "tls": "enable", -} - // A complete valid authParams example for sasl, with username and passwd var validPulsarWithAuthParams = map[string]string{ "cert": "certdata", @@ -70,8 +66,19 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ } var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ - // failure, no adminURL - {map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata"}, + // Passes, mutual TLS, no other auth (legacy "tls: enable") + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + // Passes, mutual TLS, no other auth (uses new way to enable tls) + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + // Fails, mutual TLS (legacy "tls: enable") without cert + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + // Fails, mutual TLS, (uses new way to enable tls) without cert + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + // Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS. + // The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca. + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""}, + // Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS. + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -143,28 +150,54 @@ func TestParsePulsarMetadata(t *testing.T) { func TestPulsarAuthParams(t *testing.T) { for _, testData := range parsePulsarMetadataTestAuthTLSDataset { - meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: validPulsarMetadata, AuthParams: testData.authParams}) + meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}) if err != nil && !testData.isError { - t.Error("Expected success but got error", err) + t.Error("Expected success but got error", testData.authParams, err) } if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + + if meta.pulsarAuth == nil { + t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData) + continue + } + + if meta.pulsarAuth.EnableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.pulsarAuth.EnableTLS) + } + + if meta.pulsarAuth.CA != testData.ca { + t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.pulsarAuth.CA) + } + + if meta.pulsarAuth.Cert != testData.cert { + t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.pulsarAuth.Cert) + } + + if meta.pulsarAuth.Key != testData.key { + t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.pulsarAuth.Key) + } + + if meta.pulsarAuth.EnableBearerAuth != (testData.bearerToken != "") { + t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken) + } + + if meta.pulsarAuth.BearerToken != testData.bearerToken { + t.Errorf("Expected bearer token to be set to %s but got %s\n", testData.bearerToken, meta.pulsarAuth.BearerToken) } - if meta.ca != testData.ca { - t.Errorf("Expected ca to be set to %s but got %s\n", testData.ca, meta.ca) + if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") { + t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken) } - if meta.cert != testData.cert { - t.Errorf("Expected cert to be set to %s but got %s\n", testData.cert, meta.cert) + if meta.pulsarAuth.Username != testData.username { + t.Errorf("Expected username to be set to %s but got %s\n", testData.username, meta.pulsarAuth.Username) } - if meta.key != testData.key { - t.Errorf("Expected key to be set to %s but got %s\n", testData.key, meta.key) + if meta.pulsarAuth.Password != testData.password { + t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password) } } } diff --git a/tests/scalers/pulsar/helper/helper.go b/tests/scalers/pulsar/helper/helper.go index 2e5b41714cd..ff123271be3 100644 --- a/tests/scalers/pulsar/helper/helper.go +++ b/tests/scalers/pulsar/helper/helper.go @@ -35,6 +35,17 @@ type templateData struct { MsgBacklog int } +const authSecretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.TestName}} + namespace: {{.TestName}} +data: + key.pub: MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnkggprp2GTl/2oQgLvnspbH0Lxthhmw3O3qpcx1FVUcJeD1JlUsuK6rO8uexfY/3JuZffzEm5busJB/5zuXQqO52ph8xDRiEeHOuFY0RKv8DAfpss+oG8Ou/LdHPYCbbyjbJXK/iVE/rUhicp7n6udv2/AaqJj/9535Qo49Q+3S/fbWqhNR6r84+Q+KTHtfwuoLsE4AbZ+g7FRpnyH3iYDxC4ISr1zIJiv4o41cwglaho/cOqCpBFwRHYyZTgeEIf9+7bjTPbpPThFztxO6DOAw73ikU7iT3T0H6hgpQqKa79kw1R8PAfeTYvkeQ4juQwlYmyGePTb9F4LZ+0w7a8wIDAQAB + token.jwt: ZXlKaGJHY2lPaUpTVXpJMU5pSjkuZXlKemRXSWlPaUpoWkcxcGJpSjkubEg2TEVqcDU3Y2pFc2xhdWV2Z1ZKV1NTa19IaThFLVZGb29EZHVxUHRiQ1Q0U0NJQlluV0YtRlA5NzBMVUMxRzFWWnZFMmJFZGlkNGd3SzhKY3RnVHNMNGJTV2V5SW4yVVBNTnNnaDVGemhWQkQ4SXVaRnFLTXktLUZnUmtKWFZzWldrbUFwNW5yamU3MEZaRkJLME1uV0licWxSZ2Y2UUZKR2Vxd1FXbzlZV0RCOUh5cTRYR0oxUGx1SGR4T282eTJjVm1Ib3c2SFV3R0dfSDZfTmk0eTNBaU0zWEhvNlNvMkEtRGU5cGRBX3d6MHQzemFyXzhBNFJNeXdTYmtXYldNSVEwUnN5bEZhSk80SzYzT0lTRG5IQkp0TUNJTUNjNlo1WDFKYWt2eUdKek9FTVNQeDZRM1hXWG1MOFFDNjBrcG1xQkd0dXV4XzZlbWFSaHZTcDlB +` + const pulsarStatefulsetTemplate = ` apiVersion: apps/v1 kind: StatefulSet @@ -58,6 +69,10 @@ spec: - name: pulsar image: apachepulsar/pulsar:{{.ApachePulsarVersion}} imagePullPolicy: IfNotPresent + volumeMounts: + - name: auth-data + mountPath: "/bin/pulsar" + readOnly: true readinessProbe: tcpSocket: port: 8080 @@ -71,10 +86,26 @@ spec: env: - name: PULSAR_PREFIX_tlsRequireTrustedClientCertOnConnect value: "true" + - name: brokerDeleteInactiveTopicsEnabled + value: "false" + - name: authenticationEnabled + value: "true" + - name: authenticationProviders + value: "org.apache.pulsar.broker.authentication.AuthenticationProviderToken" + - name: PULSAR_PREFIX_tokenPublicKey + value: "/bin/pulsar/key.pub" + - name: brokerClientAuthenticationPlugin + value: "org.apache.pulsar.client.impl.auth.AuthenticationToken" + - name: brokerClientAuthenticationParameters + value: "file:///bin/pulsar/token.jwt" command: - sh - -c - args: ["bin/apply-config-from-env.py conf/client.conf && bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"] + args: ["bin/apply-config-from-env.py conf/client.conf && bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nfw -nss"] + volumes: + - name: auth-data + secret: + secretName: {{.TestName}} ` const pulsarServiceTemplate = ` @@ -111,11 +142,19 @@ spec: - name: pulsar-topic-init image: apachepulsar/pulsar:{{.ApachePulsarVersion}} imagePullPolicy: IfNotPresent + volumeMounts: + - name: auth-data + mountPath: "/pulsar/auth" + readOnly: true command: - sh - -c - args: ["bin/pulsar-admin --admin-url http://{{.TestName}}.{{.TestName}}:8080 topics {{ if .NumPartitions }} create-partitioned-topic -p {{.NumPartitions}} {{ else }} create {{ end }} persistent://public/default/keda"] + args: ["bin/pulsar-admin --admin-url http://{{.TestName}}.{{.TestName}}:8080 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken --auth-params file:///pulsar/auth/token.jwt topics {{ if .NumPartitions }} create-partitioned-topic -p {{.NumPartitions}} {{ else }} create {{ end }} persistent://public/default/keda"] restartPolicy: Never + volumes: + - name: auth-data + secret: + secretName: {{.TestName}} backoffLimit: 4 ` @@ -138,13 +177,20 @@ spec: spec: containers: - name: pulsar-consumer - image: ghcr.io/pulsar-sigs/pulsar-client:v0.3.1 + image: apachepulsar/pulsar:{{.ApachePulsarVersion}} imagePullPolicy: IfNotPresent - readinessProbe: - tcpSocket: - port: 9494 - args: ["consumer","--broker","pulsar://{{.TestName}}.{{.TestName}}:6650","--topic","persistent://public/default/keda","--subscription-name","keda","--consume-time","200"] - + volumeMounts: + - name: auth-data + mountPath: "/pulsar/auth" + readOnly: true + command: + - sh + - -c + args: ["bin/pulsar-perf consume --service-url pulsar://{{.TestName}}.{{.TestName}}:6650 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken --auth-params file:///pulsar/auth/token.jwt --receiver-queue-size 1 --subscription-type Shared --rate 1 --subscriptions keda persistent://public/default/keda"] + volumes: + - name: auth-data + secret: + secretName: {{.TestName}} ` const scaledObjectTemplate = ` @@ -168,9 +214,25 @@ spec: adminURL: http://{{.TestName}}.{{.TestName}}:8080 topic: persistent://public/default/keda isPartitionedTopic: {{ if .NumPartitions }} "true" {{else}} "false" {{end}} + authModes: "bearer" subscription: keda + authenticationRef: + name: {{.TestName}} ` +const authenticationRefTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TestName}} + namespace: {{.TestName}} +spec: + secretTargetRef: + - parameter: bearerToken + name: {{.TestName}} + key: token.jwt +` + const topicPublishJobTemplate = ` apiVersion: batch/v1 kind: Job @@ -184,11 +246,19 @@ spec: - name: pulsar-producer image: apachepulsar/pulsar:{{.ApachePulsarVersion}} imagePullPolicy: IfNotPresent + volumeMounts: + - name: auth-data + mountPath: "/pulsar/auth" + readOnly: true command: - sh - -c - args: ["bin/pulsar-perf produce --admin-url http://{{.TestName}}.{{.TestName}}:8080 --service-url pulsar://{{.TestName}}.{{.TestName}}:6650 --num-messages {{.MessageCount}} {{ if .NumPartitions }} --partitions {{.NumPartitions}} {{ end }} persistent://public/default/keda"] + args: ["bin/pulsar-perf produce --admin-url http://{{.TestName}}.{{.TestName}}:8080 --service-url pulsar://{{.TestName}}.{{.TestName}}:6650 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken --auth-params file:///pulsar/auth/token.jwt --num-messages {{.MessageCount}} {{ if .NumPartitions }} --partitions {{.NumPartitions}} {{ end }} --batch-max-messages 1 persistent://public/default/keda"] restartPolicy: Never + volumes: + - name: auth-data + secret: + secretName: {{.TestName}} backoffLimit: 4 ` @@ -203,7 +273,7 @@ func TestScalerWithConfig(t *testing.T, testName string, numPartitions int) { helper.CreateKubernetesResources(t, kc, testName, data, templates) assert.True(t, helper.WaitForStatefulsetReplicaReadyCount(t, kc, testName, testName, 1, 300, 1), - "replica count should be 1 after 5 minute") + "replica count should be 1 within 5 minutes") helper.KubectlApplyWithTemplate(t, data, "topicInitJobTemplate", topicInitJobTemplate) @@ -214,7 +284,7 @@ func TestScalerWithConfig(t *testing.T, testName string, numPartitions int) { // run consumer for create subscription assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 1, 300, 1), - "replica count should be 1 after 5 minute") + "replica count should be 1 within 5 minutes") helper.KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) @@ -247,6 +317,8 @@ func getTemplateData(testName string, numPartitions int) (templateData, []helper }, []helper.Template{ {Name: "statefulsetTemplate", Config: pulsarStatefulsetTemplate}, {Name: "serviceTemplate", Config: pulsarServiceTemplate}, + {Name: "authenticationRefTemplate", Config: authenticationRefTemplate}, + {Name: "secretTemplate", Config: authSecretTemplate}, } } @@ -262,14 +334,14 @@ func testScaleUp(t *testing.T, kc *kubernetes.Clientset, data templateData) { data.MessageCount = 100 helper.KubectlApplyWithTemplate(t, data, "publishJobTemplate", topicPublishJobTemplate) assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(data.TestName), data.TestName, 5, 300, 1), - "replica count should be 5 after 5 minute") + "replica count should be 5 within 5 minute") } func testScaleDown(t *testing.T, kc *kubernetes.Clientset, testName string) { t.Log("--- testing scale down ---") // Check if deployment scale down to 0 after 5 minutes assert.True(t, helper.WaitForDeploymentReplicaReadyCount(t, kc, getConsumerDeploymentName(testName), testName, 0, 300, 1), - "Replica count should be 0 after 5 minutes") + "Replica count should be 0 within 5 minutes") } func getConsumerDeploymentName(testName string) string {