diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 1b9ad98c1fd..2b99edb3459 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -27,14 +27,15 @@ func init() { } const ( - rabbitQueueLengthMetricName = "queueLength" - rabbitModeTriggerConfigName = "mode" - rabbitValueTriggerConfigName = "value" - rabbitModeQueueLength = "QueueLength" - rabbitModeMessageRate = "MessageRate" - defaultRabbitMQQueueLength = 20 - rabbitMetricType = "External" - rabbitRootVhostPath = "/%2F" + rabbitQueueLengthMetricName = "queueLength" + rabbitModeTriggerConfigName = "mode" + rabbitValueTriggerConfigName = "value" + rabbitActivationValueTriggerConfigName = "activationValue" + rabbitModeQueueLength = "QueueLength" + rabbitModeMessageRate = "MessageRate" + defaultRabbitMQQueueLength = 20 + rabbitMetricType = "External" + rabbitRootVhostPath = "/%2F" ) const ( @@ -64,6 +65,7 @@ type rabbitMQMetadata struct { queueName string mode string // QueueLength or MessageRate value float64 // trigger value (queue length or publish/sec. rate) + activationValue float64 // activation value host string // connection string for either HTTP or AMQP protocol protocol string // either http or amqp protocol vhostName *string // override the vhost from the connection info @@ -287,6 +289,7 @@ func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetada deprecatedQueueLengthValue, deprecatedQueueLengthPresent := config.TriggerMetadata[rabbitQueueLengthMetricName] mode, modePresent := config.TriggerMetadata[rabbitModeTriggerConfigName] value, valuePresent := config.TriggerMetadata[rabbitValueTriggerConfigName] + activationValue, activationValuePresent := config.TriggerMetadata[rabbitActivationValueTriggerConfigName] // Initialize to default trigger settings meta.mode = rabbitModeQueueLength @@ -302,6 +305,15 @@ func parseTrigger(meta *rabbitMQMetadata, config *ScalerConfig) (*rabbitMQMetada return nil, fmt.Errorf("queueLength is deprecated; configure only %s and %s", rabbitModeTriggerConfigName, rabbitValueTriggerConfigName) } + // Parse activation value + if activationValuePresent { + activation, err := strconv.ParseFloat(activationValue, 64) + if err != nil { + return nil, fmt.Errorf("can't parse %s: %s", rabbitActivationValueTriggerConfigName, err) + } + meta.activationValue = activation + } + // Parse deprecated `queueLength` value if deprecatedQueueLengthPresent { queueLength, err := strconv.ParseFloat(deprecatedQueueLengthValue, 64) @@ -377,9 +389,9 @@ func (s *rabbitMQScaler) IsActive(ctx context.Context) (bool, error) { } if s.metadata.mode == rabbitModeQueueLength { - return messages > 0, nil + return float64(messages) > s.metadata.activationValue, nil } - return publishRate > 0 || messages > 0, nil + return publishRate > s.metadata.activationValue || float64(messages) > s.metadata.activationValue, nil } func (s *rabbitMQScaler) getQueueStatus() (int64, float64, error) { diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 2693210bf40..958ca130f30 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -44,7 +44,7 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ // missing queueName {map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{}}, // host defined in authParams - {map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}}, + {map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{"host": host}}, // properly formed metadata with http protocol {map[string]string{"queueLength": "10", "queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}}, // queue name with slashes @@ -111,6 +111,10 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}}, // invalid pageSize {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}}, + // activationValue passed + {map[string]string{"activationValue": "10", "queueLength": "20", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{}}, + // malformed activationValue + {map[string]string{"activationValue": "AA", "queueLength": "10", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{}}, // http and excludeUnacknowledged {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "excludeUnacknowledged": "true"}, false, map[string]string{}}, // amqp and excludeUnacknowledged diff --git a/tests/scalers_go/rabbitmq/rabbitmq_helper.go b/tests/scalers_go/rabbitmq/rabbitmq_helper.go index c50b2a39e3b..1f94e636b94 100644 --- a/tests/scalers_go/rabbitmq/rabbitmq_helper.go +++ b/tests/scalers_go/rabbitmq/rabbitmq_helper.go @@ -4,6 +4,7 @@ package rabbitmq import ( + "fmt" "testing" "k8s.io/client-go/kubernetes" @@ -198,6 +199,9 @@ func RMQPublishMessages(t *testing.T, namespace, connectionString, queueName str MessageCount: messageCount, } + // Before push messages remove previous jobs if any + _, _ = helper.ExecuteCommand(fmt.Sprintf("kubectl delete jobs/rabbitmq-publish-%s --namespace %s", queueName, namespace)) + helper.KubectlApplyWithTemplate(t, data, "rmqPublishTemplate", publishTemplate) } diff --git a/tests/scalers_go/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go b/tests/scalers_go/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go index 9cf224e9951..227e0d16fe8 100644 --- a/tests/scalers_go/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go +++ b/tests/scalers_go/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go @@ -58,6 +58,7 @@ spec: hostFromEnv: RabbitApiHost mode: QueueLength value: '10' + activationValue: '5' ` ) @@ -87,6 +88,8 @@ func TestScaler(t *testing.T) { testScaling(t, kc) + testActivationValue(t, kc) + // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, kc, testNamespace, data, templates) @@ -117,3 +120,11 @@ func testScaling(t *testing.T, kc *kubernetes.Clientset) { assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), "replica count should be 0 after 1 minute") } + +func testActivationValue(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation value ---") + messagesToQueue := 3 + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messagesToQueue) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) +}