Skip to content

Commit

Permalink
Adds support to use regex with HTTP protocol in RabbitMQ Scaler (#1957)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge.turrado@docplanner.com>
  • Loading branch information
JorTurFer authored Jul 27, 2021
1 parent bd79c04 commit 50cffa1
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))

### Improvements

Expand Down
121 changes: 112 additions & 9 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
defaultProtocol = autoProtocol
)

const (
sumOperation = "sum"
avgOperation = "avg"
maxOperation = "max"
defaultOperation = sumOperation
)

type rabbitMQScaler struct {
metadata *rabbitMQMetadata
connection *amqp.Connection
Expand All @@ -51,6 +58,8 @@ type rabbitMQMetadata struct {
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
operation string // specify the operation to apply in case of multiples queues
}

type queueInfo struct {
Expand Down Expand Up @@ -164,6 +173,25 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.vhostName = &val
}

// Resolve useRegex
if val, ok := config.TriggerMetadata["useRegex"]; ok {
useRegex, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("useRegex has invalid value")
}
meta.useRegex = useRegex
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
meta.operation = val
}

if meta.useRegex && meta.protocol == amqpProtocol {
return nil, fmt.Errorf("configure only useRegex with http protocol")
}

_, err := parseTrigger(&meta, config)
if err != nil {
return nil, fmt.Errorf("unable to parse trigger: %s", err)
Expand Down Expand Up @@ -290,19 +318,31 @@ func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
r, err := httpClient.Get(url)
func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) {
var result queueInfo
r, err := s.httpClient.Get(url)
if err != nil {
return err
return result, err
}
defer r.Body.Close()

if r.StatusCode == 200 {
return json.NewDecoder(r.Body).Decode(target)
if s.metadata.useRegex {
var results []queueInfo
err = json.NewDecoder(r.Body).Decode(&results)
if err != nil {
return result, err
}
result, err := getComposedQueue(s, results)
return result, err
}

err = json.NewDecoder(r.Body).Decode(&result)
return result, err
}

body, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
return result, fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
}

func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
Expand All @@ -324,11 +364,15 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
}

parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
}

getQueueInfoManagementURI := fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)

info := queueInfo{}
err = getJSON(s.httpClient, getQueueInfoManagementURI, &info)
var info queueInfo
info, err = getJSON(s, getQueueInfoManagementURI)

if err != nil {
return nil, err
Expand Down Expand Up @@ -386,3 +430,62 @@ func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metr

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getComposedQueue(s *rabbitMQScaler, q []queueInfo) (queueInfo, error) {
var queue = queueInfo{}
queue.Name = "composed-queue"
queue.MessagesUnacknowledged = 0
if len(q) > 0 {
switch s.metadata.operation {
case sumOperation:
sumMessages, sumRate := getSum(q)
queue.Messages = sumMessages
queue.MessageStat.PublishDetail.Rate = sumRate
case avgOperation:
avgMessages, avgRate := getAverage(q)
queue.Messages = avgMessages
queue.MessageStat.PublishDetail.Rate = avgRate
case maxOperation:
maxMessages, maxRate := getMaximum(q)
queue.Messages = maxMessages
queue.MessageStat.PublishDetail.Rate = maxRate
default:
return queue, fmt.Errorf("operation mode %s must be one of %s, %s, %s", s.metadata.operation, sumOperation, avgOperation, maxOperation)
}
} else {
queue.Messages = 0
queue.MessageStat.PublishDetail.Rate = 0
}

return queue, nil
}

func getSum(q []queueInfo) (int, float64) {
var sumMessages int
var sumRate float64
for _, value := range q {
sumMessages += value.Messages
sumRate += value.MessageStat.PublishDetail.Rate
}
return sumMessages, sumRate
}

func getAverage(q []queueInfo) (int, float64) {
sumMessages, sumRate := getSum(q)
len := len(q)
return sumMessages / len, sumRate / float64(len)
}

func getMaximum(q []queueInfo) (int, float64) {
var maxMessages int
var maxRate float64
for _, value := range q {
if value.Messages > maxMessages {
maxMessages = value.Messages
}
if value.MessageStat.PublishDetail.Rate > maxRate {
maxRate = value.MessageStat.PublishDetail.Rate
}
}
return maxMessages, maxRate
}
113 changes: 113 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://"}, false, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
// amqp host and useRegex
{map[string]string{"queueName": "sample", "host": "amqps://", "useRegex": "true"}, true, map[string]string{}},
// http host and useRegex
{map[string]string{"queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
// message rate and useRegex
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
// queue length and useRegex
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -252,6 +260,111 @@ func TestGetQueueInfo(t *testing.T) {
}
}

var testRegexQueueInfoTestData = []getQueueInfoTestData{
// sum queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "sum"}, ""},
// max queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "max"}, ""},
// avg queue length
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"queueLength": "10", "useRegex": "true", "operation": "avg"}, ""},
// sum message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "sum"}, ""},
// max message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "max"}, ""},
// avg message rate
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 4}}, "name": "evaluate_trial2"}]`, http.StatusOK, true, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"},{"messages": 0, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trial2"}]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
{`[]`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
}

func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?use_regex=true&pagination=false&name=evaluate_trials"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}

w.WriteHeader(testData.responseStatus)
_, err := w.Write([]byte(testData.response))
if err != nil {
t.Error("Expect request path to =", testData.response, "but it is", err)
}
}))

resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "evaluate_trials",
"hostFromEnv": host,
"protocol": "http",
}
for k, v := range testData.extraMetadata {
metadata[k] = v
}

s, err := NewRabbitMQScaler(
&ScalerConfig{
ResolvedEnv: resolvedEnv,
TriggerMetadata: metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
},
)

if err != nil {
t.Error("Expect success", err)
}

ctx := context.TODO()
active, err := s.IsActive(ctx)

if testData.responseStatus == http.StatusOK {
if err != nil {
t.Error("Expect success", err)
}

if active != testData.isActive {
if testData.isActive {
t.Error("Expect to be active")
} else {
t.Error("Expect to not be active")
}
}
} else if !strings.Contains(err.Error(), testData.response) {
t.Error("Expect error to be like '", testData.response, "' but it's '", err, "'")
}
}
}

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
Expand Down
Loading

0 comments on commit 50cffa1

Please sign in to comment.