diff --git a/CHANGELOG.md b/CHANGELOG.md index 857c2e43845..fa32419fa71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028)) - Anonymize the host in case of HTTP failure (RabbitMQ Scaler) ([#2041](https://github.com/kedacore/keda/pull/2041)) - Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055)) +- Add custom http timeout in RabbitMQ Scaler ([#2086](https://github.com/kedacore/keda/pull/2086)) ### Breaking Changes diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index c819718e5c3..376fbd8a48e 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -9,6 +9,7 @@ import ( "net/url" "regexp" "strconv" + "time" "github.com/streadway/amqp" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -60,14 +61,15 @@ type rabbitMQScaler struct { type rabbitMQMetadata struct { queueName string - mode string // QueueLength or MessageRate - value int // trigger value (queue length or publish/sec. rate) - host string // connection string for either HTTP or AMQP protocol - protocol string // either http or amqp protocol - vhostName *string // override the vhost from the connection info - useRegex bool // specify if the queueName contains a rexeg - operation string // specify the operation to apply in case of multiples queues - metricName string // Custom metric name for trigger + mode string // QueueLength or MessageRate + value int // trigger value (queue length or publish/sec. rate) + host string // connection string for either HTTP or AMQP protocol + protocol string // either http or amqp protocol + vhostName *string // override the vhost from the connection info + useRegex bool // specify if the queueName contains a rexeg + operation string // specify the operation to apply in case of multiples queues + metricName string // custom metric name for trigger + timeout time.Duration // custom http timeout for a specific trigger } type queueInfo struct { @@ -93,11 +95,11 @@ var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler") // NewRabbitMQScaler creates a new rabbitMQ scaler func NewRabbitMQScaler(config *ScalerConfig) (Scaler, error) { - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) meta, err := parseRabbitMQMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err) } + httpClient := kedautil.CreateHTTPClient(meta.timeout) if meta.protocol == httpProtocol { return &rabbitMQScaler{ @@ -220,6 +222,23 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { } } + // Resolve timeout + if val, ok := config.TriggerMetadata["timeout"]; ok { + timeoutMS, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("unable to parse timeout: %s", err) + } + if meta.protocol == amqpProtocol { + return nil, fmt.Errorf("amqp protocol doesn't support custom timeouts: %s", err) + } + if timeoutMS <= 0 { + return nil, fmt.Errorf("timeout must be greater than 0: %s", err) + } + meta.timeout = time.Duration(timeoutMS) * time.Millisecond + } else { + meta.timeout = config.GlobalHTTPTimeout + } + return &meta, nil } diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index a6a5973858f..e15f8bdb598 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -96,6 +96,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}}, // custom metric name {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "metricName": "host1-sample"}, false, map[string]string{}}, + // http valid timeout + {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "1000"}, false, map[string]string{}}, + // http invalid timeout + {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "-10"}, true, map[string]string{}}, + // http wrong timeout + {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}}, + // amqp timeout + {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{