diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 1288f17a933..ea0ebce4fa2 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -23,8 +23,12 @@ const ( rabbitQueueLengthMetricName = "queueLength" defaultRabbitMQQueueLength = 20 rabbitMetricType = "External" - rabbitIncludeUnacked = "includeUnacked" - defaultIncludeUnacked = false +) + +const ( + httpProtocol = "http" + amqpProtocol = "amqp" + defaultProtocol = amqpProtocol ) type rabbitMQScaler struct { @@ -34,11 +38,10 @@ type rabbitMQScaler struct { } type rabbitMQMetadata struct { - queueName string - host string // connection string for AMQP protocol - apiHost string // connection string for management API requests - queueLength int - includeUnacked bool // if true uses HTTP API and requires apiHost, if false uses AMQP and requires host + queueName string + host string // connection string for either HTTP or AMQP protocol + queueLength int + protocol string // either http or amqp protocol } type queueInfo struct { @@ -56,7 +59,7 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err) } - if meta.includeUnacked { + if meta.protocol == httpProtocol { return &rabbitMQScaler{metadata: meta}, nil } @@ -75,47 +78,35 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) (*rabbitMQMetadata, error) { meta := rabbitMQMetadata{} - meta.includeUnacked = defaultIncludeUnacked - if val, ok := metadata[rabbitIncludeUnacked]; ok { - includeUnacked, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("includeUnacked parsing error %s", err.Error()) + // Resolve protocol type + meta.protocol = defaultProtocol + if val, ok := metadata["protocol"]; ok { + if val == amqpProtocol || val == httpProtocol { + meta.protocol = val + } else { + return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val) } - meta.includeUnacked = includeUnacked } - if meta.includeUnacked { - if authParams["apiHost"] != "" { - meta.apiHost = authParams["apiHost"] - } else if metadata["apiHost"] != "" { - meta.apiHost = metadata["apiHost"] - } else if metadata["apiHostFromEnv"] != "" { - meta.apiHost = resolvedEnv[metadata["apiHostFromEnv"]] - } - - if meta.apiHost == "" { - return nil, fmt.Errorf("no apiHost setting given") - } + // Resolve host value + if authParams["host"] != "" { + meta.host = authParams["host"] + } else if metadata["host"] != "" { + meta.host = metadata["host"] + } else if metadata["hostFromEnv"] != "" { + meta.host = resolvedEnv[metadata["hostFromEnv"]] } else { - if authParams["host"] != "" { - meta.host = authParams["host"] - } else if metadata["host"] != "" { - meta.host = metadata["host"] - } else if metadata["hostFromEnv"] != "" { - meta.host = resolvedEnv[metadata["hostFromEnv"]] - } - - if meta.host == "" { - return nil, fmt.Errorf("no host setting given") - } + return nil, fmt.Errorf("no host setting given") } + // Resolve queueName if val, ok := metadata["queueName"]; ok { meta.queueName = val } else { return nil, fmt.Errorf("no queue name given") } + // Resolve queueLength if val, ok := metadata[rabbitQueueLengthMetricName]; ok { queueLength, err := strconv.Atoi(val) if err != nil { @@ -167,7 +158,7 @@ func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) { } func (s *rabbitMQScaler) getQueueMessages() (int, error) { - if s.metadata.includeUnacked { + if s.metadata.protocol == httpProtocol { info, err := s.getQueueInfoViaHTTP() if err != nil { return -1, err @@ -202,7 +193,7 @@ func getJSON(url string, target interface{}) error { } func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { - parsedURL, err := url.Parse(s.metadata.apiHost) + parsedURL, err := url.Parse(s.metadata.host) if err != nil { return nil, err