Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ publishRate trigger #1653

Merged
merged 5 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- Fixed keda clusterroles to give permissions for clustertriggerauthentications ([#1645](https://github.com/kedacore/keda/pull/1645))
- Make `swiftURL` parameter optional for the OpenStack Swift scaler ([#1652](https://github.com/kedacore/keda/pull/1652))
- Fix memory leak of `keda-metrics-apiserver` by setting a controller-runtime logger properly ([#1654](https://github.com/kedacore/keda/pull/1654))
- Add `publishRate` trigger to RabbitMQ scaler ([#1653](https://github.com/kedacore/keda/pull/1653))

### Breaking Changes

Expand Down
152 changes: 119 additions & 33 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
)

const (
rabbitQueueLengthMetricName = "queueLength"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitQueueLengthMetricName = "queueLength"
rabbitModeTriggerConfigName = "mode"
rabbitValueTriggerConfigName = "value"
rabbitModeQueueLength = "QueueLength"
rabbitModeMessageRate = "MessageRate"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
)

const (
Expand All @@ -41,17 +45,27 @@ type rabbitMQScaler struct {
}

type rabbitMQMetadata struct {
queueName string
queueLength int
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
queueName string
mode string // QueueLength or MessageRate
value int // trigger value (queue length or publish/sec. rate)
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
}

type queueInfo struct {
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
Name string `json:"name"`
Messages int `json:"messages"`
MessagesUnacknowledged int `json:"messages_unacknowledged"`
MessageStat messageStat `json:"message_stats"`
Name string `json:"name"`
}

type messageStat struct {
PublishDetail publishDetail `json:"publish_details"`
}

type publishDetail struct {
Rate float64 `json:"rate"`
}

var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler")
Expand Down Expand Up @@ -145,24 +159,77 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
return nil, fmt.Errorf("no queue name given")
}

// Resolve queueLength
if val, ok := config.TriggerMetadata[rabbitQueueLengthMetricName]; ok {
queueLength, err := strconv.Atoi(val)
// Resolve vhostName
if val, ok := config.TriggerMetadata["vhostName"]; ok {
meta.vhostName = &val
}

_, err := parseTrigger(&meta, config)
if err != nil {
return nil, fmt.Errorf("unable to parse trigger: %s", err)
}

return &meta, nil
}

func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetadata, error) {
deprecatedQueueLengthValue, deprecatedQueueLengthPresent := config.TriggerMetadata[rabbitQueueLengthMetricName]
mode, modePresent := config.TriggerMetadata[rabbitModeTriggerConfigName]
value, valuePresent := config.TriggerMetadata[rabbitValueTriggerConfigName]

// Initialize to default trigger settings
meta.mode = rabbitModeQueueLength
meta.value = defaultRabbitMQQueueLength

// If nothing is specified for the trigger then return the default
if !deprecatedQueueLengthPresent && !modePresent && !valuePresent {
return meta, nil
}

// Only allow one of `queueLength` or `mode`/`value`
if deprecatedQueueLengthPresent && (modePresent || valuePresent) {
return nil, fmt.Errorf("queueLength is deprecated; configure only %s and %s", rabbitModeTriggerConfigName, rabbitValueTriggerConfigName)
}

// Parse deprecated `queueLength` value
if deprecatedQueueLengthPresent {
queueLength, err := strconv.Atoi(deprecatedQueueLengthValue)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitQueueLengthMetricName, err)
}
meta.mode = rabbitModeQueueLength
meta.value = queueLength

meta.queueLength = queueLength
} else {
meta.queueLength = defaultRabbitMQQueueLength
return meta, nil
}

// Resolve vhostName
if val, ok := config.TriggerMetadata["vhostName"]; ok {
meta.vhostName = &val
if !modePresent {
return nil, fmt.Errorf("%s must be specified", rabbitModeTriggerConfigName)
}
if !valuePresent {
return nil, fmt.Errorf("%s must be specified", rabbitValueTriggerConfigName)
}

return &meta, nil
// Resolve trigger mode
switch mode {
case rabbitModeQueueLength:
meta.mode = rabbitModeQueueLength
case rabbitModeMessageRate:
meta.mode = rabbitModeMessageRate
default:
return nil, fmt.Errorf("trigger mode %s must be one of %s, %s", mode, rabbitModeQueueLength, rabbitModeMessageRate)
}
triggerValue, err := strconv.Atoi(value)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitValueTriggerConfigName, err)
}
meta.value = triggerValue

if meta.mode == rabbitModeMessageRate && meta.protocol != httpProtocol {
return nil, fmt.Errorf("protocol %s not supported; must be http to use mode %s", meta.protocol, rabbitModeMessageRate)
}

return meta, nil
}

func getConnectionAndChannel(host string) (*amqp.Connection, *amqp.Channel, error) {
Expand Down Expand Up @@ -193,31 +260,34 @@ func (s *rabbitMQScaler) Close() error {

// IsActive returns true if there are pending messages to be processed
func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return false, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

return messages > 0, nil
if s.metadata.mode == rabbitModeQueueLength {
return messages > 0, nil
}
return publishRate > 0 || messages > 0, nil
}

func (s *rabbitMQScaler) getQueueMessages() (int, error) {
func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
if s.metadata.protocol == httpProtocol {
info, err := s.getQueueInfoViaHTTP()
if err != nil {
return -1, err
return -1, -1, err
}

// messages count includes count of ready and unack-ed
return info.Messages, nil
return info.Messages, info.MessageStat.PublishDetail.Rate, nil
}

items, err := s.channel.QueueInspect(s.metadata.queueName)
if err != nil {
return -1, err
return -1, -1, err
}

return items.Messages, nil
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
Expand Down Expand Up @@ -269,32 +339,48 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI)
var metricName string

if s.metadata.mode == rabbitModeQueueLength {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName))
} else {
metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", s.metadata.queueName))
}
metricValue := resource.NewQuantity(int64(s.metadata.value), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", s.metadata.queueName)),
Name: metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
AverageValue: metricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: rabbitMetricType,
}

return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
messages, err := s.getQueueMessages()
messages, publishRate, err := s.getQueueStatus()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting rabbitMQ: %s", err)
}

var metricValue resource.Quantity
if s.metadata.mode == rabbitModeQueueLength {
metricValue = *resource.NewQuantity(int64(messages), resource.DecimalSI)
} else {
metricValue = *resource.NewMilliQuantity(int64(publishRate*1000), resource.DecimalSI)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(messages), resource.DecimalSI),
Value: metricValue,
Timestamp: metav1.Now(),
}

Expand Down
72 changes: 62 additions & 10 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,32 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}},
// auto protocol and an HTTPS URL
{map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}},
// queueLength and mode
{map[string]string{"queueLength": "10", "mode": "QueueLength", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// queueLength and value
{map[string]string{"queueLength": "10", "value": "20", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// queueLength and mode and value
{map[string]string{"queueLength": "10", "mode": "QueueLength", "value": "20", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// only mode
{map[string]string{"mode": "QueueLength", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// only value
{map[string]string{"value": "20", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// mode and value
{map[string]string{"mode": "QueueLength", "value": "20", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
// invalid mode
{map[string]string{"mode": "Feelings", "value": "20", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// invalid value
{map[string]string{"mode": "QueueLength", "value": "lots", "queueName": "sample", "host": "https://"}, true, map[string]string{}},
// queue length amqp
{map[string]string{"mode": "QueueLength", "value": "20", "queueName": "sample", "host": "amqps://"}, false, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "amqps://"}, true, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "amqp://"}, true, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://"}, false, map[string]string{}},
// message rate amqp
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "https://"}, false, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -92,8 +118,8 @@ func TestParseDefaultQueueLength(t *testing.T) {
t.Error("Expected success but got error", err)
case testData.isError && err == nil:
t.Error("Expected error but got success")
case metadata.queueLength != defaultRabbitMQQueueLength:
t.Error("Expected default queueLength =", defaultRabbitMQQueueLength, "but got", metadata.queueLength)
case metadata.value != defaultRabbitMQQueueLength:
t.Error("Expected default queueLength =", defaultRabbitMQQueueLength, "but got", metadata.value)
}
}
}
Expand All @@ -107,19 +133,46 @@ type getQueueInfoTestData struct {
}

var testQueueInfoTestData = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, true, nil, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "name": "evaluate_trials"}`, http.StatusOK, false, nil, ""},
// queueLength
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"queueLength": "10"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{"queueLength": "10"}, ""},
// mode QueueLength
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{"value": "100", "mode": "QueueLength"}, ""},
// mode MessageRate
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, false, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 1, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 1, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
{`{"messages": 0, "messages_unacknowledged": 0, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"value": "100", "mode": "MessageRate"}, ""},
// error response
{`Password is incorrect`, http.StatusUnauthorized, false, nil, ""},
}

var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"}

var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
}

func TestGetQueueInfo(t *testing.T) {
Expand Down Expand Up @@ -157,7 +210,6 @@ func TestGetQueueInfo(t *testing.T) {
resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueLength": "10",
"queueName": "evaluate_trials",
"hostFromEnv": host,
"protocol": "http",
Expand Down