Skip to content

Commit

Permalink
Activation threshold (activationValue) for Rabbitmq (#2831)
Browse files Browse the repository at this point in the history
Co-authored-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
adborroto and JorTurFer committed Aug 3, 2022
1 parent c065f8e commit 98e937e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
32 changes: 22 additions & 10 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ func init() {
}

const (
rabbitQueueLengthMetricName = "queueLength"
rabbitModeTriggerConfigName = "mode"
rabbitValueTriggerConfigName = "value"
rabbitModeQueueLength = "QueueLength"
rabbitModeMessageRate = "MessageRate"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitRootVhostPath = "/%2F"
rabbitQueueLengthMetricName = "queueLength"
rabbitModeTriggerConfigName = "mode"
rabbitValueTriggerConfigName = "value"
rabbitActivationValueTriggerConfigName = "activationValue"
rabbitModeQueueLength = "QueueLength"
rabbitModeMessageRate = "MessageRate"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitRootVhostPath = "/%2F"
)

const (
Expand Down Expand Up @@ -64,6 +65,7 @@ type rabbitMQMetadata struct {
queueName string
mode string // QueueLength or MessageRate
value float64 // trigger value (queue length or publish/sec. rate)
activationValue float64 // activation value
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
Expand Down Expand Up @@ -287,6 +289,7 @@ func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetada
deprecatedQueueLengthValue, deprecatedQueueLengthPresent := config.TriggerMetadata[rabbitQueueLengthMetricName]
mode, modePresent := config.TriggerMetadata[rabbitModeTriggerConfigName]
value, valuePresent := config.TriggerMetadata[rabbitValueTriggerConfigName]
activationValue, activationValuePresent := config.TriggerMetadata[rabbitActivationValueTriggerConfigName]

// Initialize to default trigger settings
meta.mode = rabbitModeQueueLength
Expand All @@ -302,6 +305,15 @@ func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetada
return nil, fmt.Errorf("queueLength is deprecated; configure only %s and %s", rabbitModeTriggerConfigName, rabbitValueTriggerConfigName)
}

// Parse activation value
if activationValuePresent {
activation, err := strconv.ParseFloat(activationValue, 64)
if err != nil {
return nil, fmt.Errorf("can't parse %s: %s", rabbitActivationValueTriggerConfigName, err)
}
meta.activationValue = activation
}

// Parse deprecated `queueLength` value
if deprecatedQueueLengthPresent {
queueLength, err := strconv.ParseFloat(deprecatedQueueLengthValue, 64)
Expand Down Expand Up @@ -377,9 +389,9 @@ func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) {
}

if s.metadata.mode == rabbitModeQueueLength {
return messages > 0, nil
return float64(messages) > s.metadata.activationValue, nil
}
return publishRate > 0 || messages > 0, nil
return publishRate > s.metadata.activationValue || float64(messages) > s.metadata.activationValue, nil
}

func (s *rabbitMQScaler) getQueueStatus() (int64, float64, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
// missing queueName
{map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{}},
// host defined in authParams
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}},
{map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{"host": host}},
// properly formed metadata with http protocol
{map[string]string{"queueLength": "10", "queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}},
// queue name with slashes
Expand Down Expand Up @@ -111,6 +111,10 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}},
// invalid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}},
// activationValue passed
{map[string]string{"activationValue": "10", "queueLength": "20", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{}},
// malformed activationValue
{map[string]string{"activationValue": "AA", "queueLength": "10", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{}},
// http and excludeUnacknowledged
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "excludeUnacknowledged": "true"}, false, map[string]string{}},
// amqp and excludeUnacknowledged
Expand Down
4 changes: 4 additions & 0 deletions tests/scalers_go/rabbitmq/rabbitmq_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package rabbitmq

import (
"fmt"
"testing"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -198,6 +199,9 @@ func RMQPublishMessages(t *testing.T, namespace, connectionString, queueName str
MessageCount: messageCount,
}

// Before push messages remove previous jobs if any
_, _ = helper.ExecuteCommand(fmt.Sprintf("kubectl delete jobs/rabbitmq-publish-%s --namespace %s", queueName, namespace))

helper.KubectlApplyWithTemplate(t, data, "rmqPublishTemplate", publishTemplate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ spec:
hostFromEnv: RabbitApiHost
mode: QueueLength
value: '10'
activationValue: '5'
`
)

Expand Down Expand Up @@ -87,6 +88,8 @@ func TestScaler(t *testing.T) {

testScaling(t, kc)

testActivationValue(t, kc)

// cleanup
t.Log("--- cleaning up ---")
DeleteKubernetesResources(t, kc, testNamespace, data, templates)
Expand Down Expand Up @@ -117,3 +120,11 @@ func testScaling(t *testing.T, kc *kubernetes.Clientset) {
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1),
"replica count should be 0 after 1 minute")
}

func testActivationValue(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing activation value ---")
messagesToQueue := 3
RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messagesToQueue)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60)
}

0 comments on commit 98e937e

Please sign in to comment.