diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..4a6856e059d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381)) - Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323)) - Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413)) +- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459)) ### Breaking Changes diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 54007891a49..1af21efaeb8 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -29,7 +29,8 @@ const ( const ( httpProtocol = "http" amqpProtocol = "amqp" - defaultProtocol = amqpProtocol + autoProtocol = "auto" + defaultProtocol = autoProtocol ) type rabbitMQScaler struct { @@ -87,12 +88,14 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { // Resolve protocol type meta.protocol = defaultProtocol + if val, ok := config.AuthParams["protocol"]; ok { + meta.protocol = val + } if val, ok := config.TriggerMetadata["protocol"]; ok { - if val == amqpProtocol || val == httpProtocol { - meta.protocol = val - } else { - return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val) - } + meta.protocol = val + } + if meta.protocol != amqpProtocol && meta.protocol != httpProtocol && meta.protocol != autoProtocol { + return nil, fmt.Errorf("the protocol has to be either `%s`, `%s`, or `%s` but is `%s`", amqpProtocol, httpProtocol, autoProtocol, meta.protocol) } // Resolve host value @@ -107,6 +110,22 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { return nil, fmt.Errorf("no host setting given") } + // If the protocol is auto, check the host scheme. + if meta.protocol == autoProtocol { + parsedURL, err := url.Parse(meta.host) + if err != nil { + return nil, fmt.Errorf("can't parse host to find protocol: %s", err) + } + switch parsedURL.Scheme { + case "amqp", "amqps": + meta.protocol = amqpProtocol + case "http", "https": + meta.protocol = httpProtocol + default: + return nil, fmt.Errorf("unknown host URL scheme %s", parsedURL.Scheme) + } + } + // Resolve queueName if val, ok := config.TriggerMetadata["queueName"]; ok { meta.queueName = val diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index dd5f5f86f60..befd5119511 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -46,6 +46,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"queueLength": "10", "queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}}, // queue name with slashes {map[string]string{"queueLength": "10", "queueName": "namespace/name", "hostFromEnv": host}, false, map[string]string{}}, + // protocol defined in authParams + {map[string]string{"queueName": "sample", "hostFromEnv": host}, false, map[string]string{"protocol": "http"}}, + // auto protocol and a bad URL + {map[string]string{"queueName": "sample", "host": "something://"}, true, map[string]string{}}, + // auto protocol and an HTTP URL + {map[string]string{"queueName": "sample", "host": "http://"}, false, map[string]string{}}, + // auto protocol and an HTTPS URL + {map[string]string{"queueName": "sample", "host": "https://"}, false, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{