Skip to content

Commit

Permalink
Resolve the schema/protocol autmatically
Browse files Browse the repository at this point in the history
Signed-off-by: Tomek Urbaszek <tomasz.urbaszek@polidea.com>
  • Loading branch information
turbaszek committed Sep 10, 2020
1 parent 43c28e3 commit d17cc8a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
- Remove `New()` and `Close()` from the interface of `service ExternalScaler` in `externalscaler.proto` ([#865](https://github.com/kedacore/keda/pull/865))
- Removed deprecated brokerList for Kafka scaler ([#882](https://github.com/kedacore/keda/pull/882))
- All scalers metadata that is resolved from the scaleTarget environment have suffix `FromEnv` added. e.g: `connection` -> `connectionFromEnv` ([#1072](https://github.com/kedacore/keda/pull/1072))
- Use `host` instead of `apiHost` in `rabbitmq` scaler. Add `protocol` in trigger spec to specify which protocol should be used ([#1115](https://github.com/kedacore/keda/pull/1115))
- Use `host` instead of `apiHost` in `rabbitmq` scaler. Schema part of `host` defines which protocol should be used ([#1115](https://github.com/kedacore/keda/pull/1115))

### Other
- Update Operator SDK and k8s deps ([#1007](https://github.com/kedacore/keda/pull/1007),[#870](https://github.com/kedacore/keda/issues/870))
Expand Down
29 changes: 15 additions & 14 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ const (
)

const (
httpProtocol = "http"
amqpProtocol = "amqp"
defaultProtocol = amqpProtocol
httpProtocol = "http"
amqpProtocol = "amqp"
)

type rabbitMQScaler struct {
Expand All @@ -41,7 +40,7 @@ type rabbitMQMetadata struct {
queueName string
queueLength int
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
protocol string // either http or amqp protocol, resolved from host
}

type queueInfo struct {
Expand Down Expand Up @@ -78,16 +77,6 @@ func NewRabbitMQScaler(resolvedEnv, metadata, authParams map[string]string) (Sca
func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string) (*rabbitMQMetadata, error) {
meta := rabbitMQMetadata{}

// Resolve protocol type
meta.protocol = defaultProtocol
if val, ok := metadata["protocol"]; ok {
if val == amqpProtocol || val == httpProtocol {
meta.protocol = val
} else {
return nil, fmt.Errorf("the protocol has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, val)
}
}

// Resolve host value
if authParams["host"] != "" {
meta.host = authParams["host"]
Expand All @@ -99,6 +88,18 @@ func parseRabbitMQMetadata(resolvedEnv, metadata, authParams map[string]string)
return nil, fmt.Errorf("no host setting given")
}

// Resolve protocol type HTTP vs AMQP
parsedURL, err := url.Parse(meta.host)
if err != nil {
return nil, fmt.Errorf("failed parsing host: %s", err)
}
scheme := parsedURL.Scheme
if scheme == amqpProtocol || scheme == httpProtocol {
meta.protocol = scheme
} else {
return nil, fmt.Errorf("the host scheme has to be either `%s` or `%s` but is `%s`", amqpProtocol, httpProtocol, scheme)
}

// Resolve queueName
if val, ok := metadata["queueName"]; ok {
meta.queueName = val
Expand Down
34 changes: 20 additions & 14 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
)

const (
host = "myHostSecret"
host = "myHostSecret"
dummyAMQPHost = "amqp://user:sercet@somehost.com:5236"
dummyHTTPHost = "http://user:sercet@somehost.com:5236"
)

type parseRabbitMQMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string
protocol string
}

type rabbitMQMetricIdentifier struct {
Expand All @@ -30,19 +33,19 @@ var sampleRabbitMqResolvedEnv = map[string]string{

var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
// nothing passed
{map[string]string{}, true, map[string]string{}},
{map[string]string{}, true, map[string]string{}, "amqp"},
// properly formed metadata
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{}},
{map[string]string{"queueLength": "10", "queueName": "sample", "hostFromEnv": host}, false, map[string]string{}, "amqp"},
// malformed queueLength
{map[string]string{"queueLength": "AA", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{}},
{map[string]string{"queueLength": "AA", "queueName": "sample", "hostFromEnv": host}, true, map[string]string{}, "amqp"},
// missing host
{map[string]string{"queueLength": "AA", "queueName": "sample"}, true, map[string]string{}},
{map[string]string{"queueLength": "AA", "queueName": "sample"}, true, map[string]string{}, "amqp"},
// missing queueName
{map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{}},
{map[string]string{"queueLength": "10", "hostFromEnv": host}, true, map[string]string{}, "amqp"},
// host defined in authParams
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": host}},
{map[string]string{"queueLength": "10"}, true, map[string]string{"host": dummyAMQPHost}, "amqp"},
// properly formed metadata with http protocol
{map[string]string{"queueLength": "10", "queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}},
{map[string]string{"queueLength": "10", "queueName": "sample", "host": dummyHTTPHost}, false, map[string]string{}, "http"},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand All @@ -51,21 +54,25 @@ var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{

func TestRabbitMQParseMetadata(t *testing.T) {
for _, testData := range testRabbitMQMetadata {
_, err := parseRabbitMQMetadata(sampleRabbitMqResolvedEnv, testData.metadata, testData.authParams)
meta, err := parseRabbitMQMetadata(sampleRabbitMqResolvedEnv, testData.metadata, testData.authParams)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")

}
if meta != nil && meta.protocol != testData.protocol {
t.Errorf("Expected %s but got %s schema", testData.protocol, meta.protocol)
}
}
}

var testDefaultQueueLength = []parseRabbitMQMetadataTestData{
// use default queueLength
{map[string]string{"queueName": "sample", "host": host}, false, map[string]string{}},
// use default queueLength with includeUnacked
{map[string]string{"queueName": "sample", "host": host, "protocol": "http"}, false, map[string]string{}},
{map[string]string{"queueName": "sample", "host": dummyAMQPHost}, false, map[string]string{}, "amqp"},
// use default queueLength with http
{map[string]string{"queueName": "sample", "host": dummyHTTPHost}, false, map[string]string{}, "http"},
}

func TestParseDefaultQueueLength(t *testing.T) {
Expand Down Expand Up @@ -122,7 +129,6 @@ func TestGetQueueInfo(t *testing.T) {
"queueLength": "10",
"queueName": "evaluate_trials",
"hostFromEnv": host,
"protocol": "http",
}

s, err := NewRabbitMQScaler(resolvedEnv, metadata, map[string]string{})
Expand Down Expand Up @@ -157,7 +163,7 @@ func TestGetQueueInfo(t *testing.T) {

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(map[string]string{"myHostSecret": "myHostSecret"}, testData.metadataTestData.metadata, nil)
meta, err := parseRabbitMQMetadata(map[string]string{"myHostSecret": dummyAMQPHost}, testData.metadataTestData.metadata, nil)
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down
1 change: 0 additions & 1 deletion tests/scalers/rabbitmq-queue-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,4 @@ spec:
metadata:
queueName: {{QUEUE_NAME}}
hostFromEnv: RabbitApiHost
protocol: http
queueLength: '50'`

0 comments on commit d17cc8a

Please sign in to comment.