Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ publishRate trigger #1653

Merged
merged 5 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- Fixed keda clusterroles to give permissions for clustertriggerauthentications ([#1645](https://github.com/kedacore/keda/pull/1645))
- Make `swiftURL` parameter optional for the OpenStack Swift scaler ([#1652](https://github.com/kedacore/keda/pull/1652))
- Fix memory leak of `keda-metrics-apiserver` by setting a controller-runtime logger properly ([#1654](https://github.com/kedacore/keda/pull/1654))
- Add `publishRate` trigger to RabbitMQ scaler ([#1653](https://github.com/kedacore/keda/pull/1653))

### Breaking Changes

Expand Down
98 changes: 76 additions & 22 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
)

const (
rabbitQueueLengthMetricName = "queueLength"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitQueueLengthMetricName = "queueLength"
rabbitPublishedPerSecondMetricName = "publishRate"
defaultRabbitMQQueueLength = 20
defaultRabbitMQPublishRate = 0 // Default to zero to disable publish rate metering for back compat.
rabbitMetricType = "External"
)

const (
Expand All @@ -43,15 +45,25 @@ type rabbitMQScaler struct {
type rabbitMQMetadata struct {
queueName string
queueLength int
publishRate float64 // Publish/sec. rate on the queue, requires HTTP protocol
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
}

type queueInfo struct {
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
Name string `json:"name"`
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessageStat messageStat `json:"message_stats"`
Name string `json:"name"`
}

type messageStat struct {
PublishDetail publishDetail `json:"publish_details"`
}

type publishDetail struct {
Rate float64 `json:"rate"`
}

var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler")
Expand Down Expand Up @@ -145,18 +157,40 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
return nil, fmt.Errorf("no queue name given")
}

// Resolve queueLength
if val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]; ok {
// Resolve publishRate
if val, ok := config.TriggerMetadata[rabbitPublishedPerSecondMetricName]; ok {
publishRate, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitPublishedPerSecondMetricName, err)
}

meta.publishRate = publishRate
} else {
meta.publishRate = defaultRabbitMQPublishRate
}

val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]
switch {
case ok:
queueLength, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitQueueLengthMetricName, err)
}

meta.queueLength = queueLength
} else {
case meta.publishRate > 0:
meta.queueLength = 0
default:
meta.queueLength = defaultRabbitMQQueueLength
}

if meta.publishRate > 0 && meta.queueLength > 0 {
return nil, fmt.Errorf("only one of queueLength or publishRate can be specified; use two separate triggers if both are desired")
}

if meta.publishRate > 0 && meta.protocol != httpProtocol {
return nil, fmt.Errorf("protocol %s not supported; must be http to use publishRate", meta.protocol)
}

// Resolve vhostName
if val, ok := config.TriggerMetadata["vhostName"]; ok {
meta.vhostName = &val
Expand Down Expand Up @@ -193,31 +227,34 @@ func (s *rabbitMQScaler) Close() error {

// IsActive returns true if there are pending messages to be processed
func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return false, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

return messages > 0, nil
if s.metadata.queueLength > 0 {
return messages > 0, nil
}
return publishRate > 0, nil
}

func (s *rabbitMQScaler) getQueueMessages() (int, error) {
func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
if s.metadata.protocol == httpProtocol {
info, err := s.getQueueInfoViaHTTP()
if err != nil {
return -1, err
return -1, -1, err
}

// messages count includes count of ready and unack-ed
return info.Messages, nil
return info.Messages, info.MessageStat.PublishDetail.Rate, nil
}

items, err := s.channel.QueueInspect(s.metadata.queueName)
if err != nil {
return -1, err
return -1, -1, err
}

return items.Messages, nil
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
Expand Down Expand Up @@ -269,32 +306,49 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
var metricName string
var metricValue *resource.Quantity
if s.metadata.queueLength > 0 {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName))
metricValue = resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
} else {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", s.metadata.queueName))
metricValue = resource.NewMilliQuantity(int64(s.metadata.publishRate*1000), resource.DecimalSI)
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName)),
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
AverageValue: metricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: rabbitMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

var metricValue resource.Quantity
if s.metadata.queueLength > 0 {
metricValue = *resource.NewQuantity(int64(messages), resource.DecimalSI)
} else {
metricValue = *resource.NewMilliQuantity(int64(publishRate*1000), resource.DecimalSI)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(messages), resource.DecimalSI),
Value: metricValue,
Timestamp: metav1.Now(),
}

Expand Down
46 changes: 38 additions & 8 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}},
// auto protocol and an HTTPS URL
{map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}},
// publishRate number
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
// publishRate not number
{map[string]string{rabbitPublishedPerSecondMetricName: "AA", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// publishRate http
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "http://"}, false, map[string]string{}},
// publishRate amqp
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqp://"}, true, map[string]string{}},
// publishRate amqps
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueName": "sample", "host": "amqps://"}, true, map[string]string{}},
// publishRate and queueLength
{map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "10", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand All @@ -82,6 +94,8 @@ var testDefaultQueueLength = []parseRabbitMQMetadataTestData{
{map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{}},
// use default queueLength with includeUnacked
{map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}},
// use default queueLength with includeUnacked
{map[string]string{"queueName": "sample", rabbitPublishedPerSecondMetricName: "100", "hostFromEnv": host, "protocol": "http"}, false, map[string]string{}},
}

func TestParseDefaultQueueLength(t *testing.T) {
Expand All @@ -92,7 +106,9 @@ func TestParseDefaultQueueLength(t *testing.T) {
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.queueLength != defaultRabbitMQQueueLength:
case metadata.publishRate > 0 && metadata.queueLength != 0:
t.Error("Expected default queueLength = 0 when publishRate is specified")
case metadata.publishRate == 0 && metadata.queueLength != defaultRabbitMQQueueLength:
t.Error("Expected default queueLength =", defaultRabbitMQQueueLength, "but got", metadata.queueLength)
}
}
Expand All @@ -107,19 +123,33 @@ type getQueueInfoTestData struct {
}

var testQueueInfoTestData = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
// queueLength
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
// publishRate
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{rabbitPublishedPerSecondMetricName: "100", "queueLength": "0"}, ""},
// error response
{`Password is incorrect`, http.StatusUnauthorized, false, nil, ""},
}

var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"}

var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
}

func TestGetQueueInfo(t *testing.T) {
Expand Down