diff --git a/pkg/scalers/artemis_scaler.go b/pkg/scalers/artemis_scaler.go index c41ba4c4292..acadb3c4d32 100644 --- a/pkg/scalers/artemis_scaler.go +++ b/pkg/scalers/artemis_scaler.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -28,6 +29,7 @@ type artemisMetadata struct { brokerAddress string username string password string + restApiTemplate string queueLength int } @@ -41,6 +43,7 @@ type artemisMonitoring struct { const ( artemisMetricType = "External" defaultArtemisQueueLength = 10 + defaultRestApiTemplate = "http://<>/console/jolokia/read/org.apache.activemq.artemis:broker=\"<>\",component=addresses,address=\"<>\",subcomponent=queues,routing-type=\"anycast\",queue=\"<>\"/MessageCount" ) var artemisLog = logf.Log.WithName("artemis_queue_scaler") @@ -63,6 +66,12 @@ func parseArtemisMetadata(resolvedEnv, metadata, authParams map[string]string) ( meta.queueLength = defaultArtemisQueueLength + if val, ok := metadata["restApiTemplate"]; ok && val != "" { + meta.restApiTemplate = metadata["restApiTemplate"] + } else { + meta.restApiTemplate = defaultRestApiTemplate + } + if metadata["managementEndpoint"] == "" { return nil, errors.New("no management endpoint given") } @@ -92,13 +101,15 @@ func parseArtemisMetadata(resolvedEnv, metadata, authParams map[string]string) ( meta.queueLength = queueLength } - if val, ok := authParams["username"]; ok { + if val, ok := authParams["username"]; ok && val != "" { meta.username = val - } else if val, ok := metadata["username"]; ok { + } else if val, ok := metadata["username"]; ok && val != "" { username := val - if val, ok := resolvedEnv[username]; ok { + if val, ok := resolvedEnv[username]; ok && val != "" { meta.username = val + } else { + meta.username = username } } @@ -106,13 +117,15 @@ func parseArtemisMetadata(resolvedEnv, metadata, authParams map[string]string) ( return nil, fmt.Errorf("username cannot be empty") } - if val, ok := authParams["password"]; ok { + if val, ok := authParams["password"]; ok && val != "" { meta.password = val - } else if val, ok := metadata["password"]; ok { + } else if val, ok := metadata["password"]; ok && val != "" { password := val - if val, ok := resolvedEnv[password]; ok { + if val, ok := resolvedEnv[password]; ok && val != "" { meta.password = val + } else { + meta.password = password } } @@ -133,13 +146,15 @@ func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) { return messages > 0, nil } -func (s *artemisScaler) getArtemisManagementEndpoint() string { - return "http://" + s.metadata.managementEndpoint -} - func (s *artemisScaler) getMonitoringEndpoint() string { - monitoringEndpoint := fmt.Sprintf("%s/console/jolokia/read/org.apache.activemq.artemis:broker=\"%s\",component=addresses,address=\"%s\",subcomponent=queues,routing-type=\"anycast\",queue=\"%s\"/MessageCount", - s.getArtemisManagementEndpoint(), s.metadata.brokerName, s.metadata.brokerAddress, s.metadata.queueName) + + replacer := strings.NewReplacer("<>", s.metadata.managementEndpoint, + "<>", s.metadata.queueName, + "<>", s.metadata.brokerName, + "<>", s.metadata.brokerAddress) + + monitoringEndpoint := replacer.Replace(s.metadata.restApiTemplate) + return monitoringEndpoint } @@ -174,7 +189,7 @@ func (s *artemisScaler) getQueueMessageCount() (int, error) { return -1, fmt.Errorf("Artemis management endpoint response error code : %d", resp.StatusCode) } - artemisLog.V(1).Info("Artemis scaler: Providing metrics based on current queue length ", messageCount, "queue length limit", s.metadata.queueLength) + artemisLog.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength)) return messageCount, nil } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 36f908c7c0c..a1279892bb9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -364,6 +364,8 @@ func (h *scaleHandler) getPods(scalableObject interface{}) (*corev1.PodTemplateS func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetadata, authParams map[string]string, podIdentity string) (scalers.Scaler, error) { switch triggerType { + case "artemis-queue": + return scalers.NewArtemisQueueScaler(resolvedEnv, triggerMetadata, authParams) case "azure-queue": return scalers.NewAzureQueueScaler(resolvedEnv, triggerMetadata, authParams, podIdentity) case "azure-servicebus":