Skip to content

Commit

Permalink
Merge branch 'main' into influxdb-unsafeSsl-parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
dtsioumas authored Oct 7, 2021
2 parents 83c61c3 + 822e55a commit 3e75cd2
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 24 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))
- TriggerAuthentication/Vault: add support for HashiCorp Vault namespace (Vault Enterprise) ([#2085](https://github.com/kedacore/keda/pull/2085))
- Add custom http timeout in RabbitMQ Scaler ([#2086](https://github.com/kedacore/keda/pull/2086))
- Artemis Scaler parses out broker config parameters in case `restAPITemplate` is given ([#2104](https://github.com/kedacore/keda/pull/2104))
- Add support to get connection data from Trigger Authorization in MongoDB Scaler ([#2115](https://github.com/kedacore/keda/pull/2115))
- Add support to get connection data from Trigger Authorization in MySQL Scaler ([#2113](https://github.com/kedacore/keda/pull/2113))
- Add support to get connection data from Trigger Authorization in MSSQL Scaler ([#2112](https://github.com/kedacore/keda/pull/2112))
- Add support to get connection data from Trigger Authorization in PostgreSQL Scaler ([#2114](https://github.com/kedacore/keda/pull/2114))
- Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106))
- Add unsafeSsl parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162))
- Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157))

### Breaking Changes

Expand Down
74 changes: 55 additions & 19 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -79,36 +80,39 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {

if val, ok := config.TriggerMetadata["restApiTemplate"]; ok && val != "" {
meta.restAPITemplate = config.TriggerMetadata["restApiTemplate"]
var err error
if meta, err = getAPIParameters(meta); err != nil {
return nil, fmt.Errorf("can't parse restApiTemplate : %s ", err)
}
} else {
meta.restAPITemplate = defaultRestAPITemplate
}
if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if config.TriggerMetadata["queueName"] == "" {
return nil, errors.New("no queue name given")
}
meta.queueName = config.TriggerMetadata["queueName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]

if config.TriggerMetadata["managementEndpoint"] == "" {
return nil, errors.New("no management endpoint given")
if config.TriggerMetadata["brokerAddress"] == "" {
return nil, errors.New("no broker address given")
}
meta.brokerAddress = config.TriggerMetadata["brokerAddress"]
}
meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"]

if val, ok := config.TriggerMetadata["corsHeader"]; ok && val != "" {
meta.corsHeader = config.TriggerMetadata["corsHeader"]
} else {
meta.corsHeader = fmt.Sprintf(defaultCorsHeader, meta.managementEndpoint)
}

if config.TriggerMetadata["queueName"] == "" {
return nil, errors.New("no queue name given")
}
meta.queueName = config.TriggerMetadata["queueName"]

if config.TriggerMetadata["brokerName"] == "" {
return nil, errors.New("no broker name given")
}
meta.brokerName = config.TriggerMetadata["brokerName"]

if config.TriggerMetadata["brokerAddress"] == "" {
return nil, errors.New("no broker address given")
}
meta.brokerAddress = config.TriggerMetadata["brokerAddress"]

if val, ok := config.TriggerMetadata["queueLength"]; ok {
queueLength, err := strconv.Atoi(val)
if err != nil {
Expand Down Expand Up @@ -163,6 +167,38 @@ func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) {
return messages > 0, nil
}

// getAPIParameters parse restAPITemplate to provide managementEndpoint , brokerName, brokerAddress, queueName
func getAPIParameters(meta artemisMetadata) (artemisMetadata, error) {
u, err := url.ParseRequestURI(meta.restAPITemplate)
if err != nil {
return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err)
}
meta.managementEndpoint = u.Host
splitURL := strings.Split(strings.Split(u.RawPath, ":")[1], "/")[0] // This returns : broker="<<brokerName>>",component=addresses,address="<<brokerAddress>>",subcomponent=queues,routing-type="anycast",queue="<<queueName>>"
replacer := strings.NewReplacer(",", "&", "\"\"", "")
v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[address:["<<brokerAddress>>"] broker:["<<brokerName>>"] component:[addresses] queue:["<<queueName>>"] routing-type:["anycast"] subcomponent:[queues]]
if err != nil {
return meta, fmt.Errorf("unable to parse the artemis restAPITemplate: %s", err)
}

if len(v["address"][0]) == 0 {
return meta, errors.New("no brokerAddress given")
}
meta.brokerAddress = v["address"][0]

if len(v["queue"][0]) == 0 {
return meta, errors.New("no queueName is given")
}
meta.queueName = v["queue"][0]

if len(v["broker"][0]) == 0 {
return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate)
}
meta.brokerName = v["broker"][0]

return meta, nil
}

func (s *artemisScaler) getMonitoringEndpoint() string {
replacer := strings.NewReplacer("<<managementEndpoint>>", s.metadata.managementEndpoint,
"<<queueName>>", s.metadata.queueName,
Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/artemis_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var testArtemisMetadata = []parseArtemisMetadataTestData{
// Missing password, should fail
{map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": ""}, true},
{map[string]string{"managementEndpoint": "localhost:8161", "queueName": "queue1", "brokerName": "broker-activemq", "brokerAddress": "test", "username": "myUserName", "password": "myPassword"}, false},
{map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"broker-activemq\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, false},
// Missing brokername , should fail
{map[string]string{"restApiTemplate": "http://localhost:8161/console/jolokia/read/org.apache.activemq.artemis:broker=\"\",component=addresses,address=\"test\",subcomponent=queues,routing-type=\"anycast\",queue=\"queue1\"/MessageCount", "username": "myUserName", "password": "myPassword"}, true},
}

var artemisMetricIdentifiers = []artemisMetricIdentifier{
Expand Down
19 changes: 17 additions & 2 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type rabbitMQMetadata struct {
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
pageSize int // specify the page size if useRegex is enabled
operation string // specify the operation to apply in case of multiples queues
metricName string // custom metric name for trigger
timeout time.Duration // custom http timeout for a specific trigger
Expand Down Expand Up @@ -197,6 +198,20 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.useRegex = useRegex
}

// Resolve pageSize
if val, ok := config.TriggerMetadata["pageSize"]; ok {
pageSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("pageSize has invalid value")
}
meta.pageSize = int(pageSize)
if meta.pageSize < 1 {
return nil, fmt.Errorf("pageSize should be 1 or greater than 1")
}
} else {
meta.pageSize = 100
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
Expand Down Expand Up @@ -412,9 +427,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
72 changes: 70 additions & 2 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}},
// amqp timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}},
// valid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "100"}, false, map[string]string{}},
// pageSize less than 1
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}},
// invalid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -321,7 +327,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -378,6 +384,68 @@ func TestGetQueueInfoWithRegex(t *testing.T) {
}
}

type getRegexPageSizeTestData struct {
queueInfo getQueueInfoTestData
pageSize int
}

var testRegexPageSizeTestData = []getRegexPageSizeTestData{
{testRegexQueueInfoTestData[0], 100},
{testRegexQueueInfoTestData[0], 200},
{testRegexQueueInfoTestData[0], 500},
}

func TestGetPageSizeWithRegex(t *testing.T) {
for _, testData := range testRegexPageSizeTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}

w.WriteHeader(testData.queueInfo.responseStatus)
_, err := w.Write([]byte(testData.queueInfo.response))
if err != nil {
t.Error("Expect request path to =", testData.queueInfo.response, "but it is", err)
}
}))

resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.queueInfo.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "^evaluate_trials$",
"hostFromEnv": host,
"protocol": "http",
"useRegex": "true",
"pageSize": fmt.Sprint(testData.pageSize),
}

s, err := NewRabbitMQScaler(
&ScalerConfig{
ResolvedEnv: resolvedEnv,
TriggerMetadata: metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
},
)

if err != nil {
t.Error("Expect success", err)
}

ctx := context.TODO()
active, err := s.IsActive(ctx)

if err != nil {
t.Error("Expect success", err)
}

if !active {
t.Error("Expect to be active")
}
}
}

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
Expand Down Expand Up @@ -452,7 +520,7 @@ var testRegexQueueInfoNavigationTestData = []getQueueInfoNavigationTestData{
func TestRegexQueueMissingError(t *testing.T) {
for _, testData := range testRegexQueueInfoNavigationTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down

0 comments on commit 3e75cd2

Please sign in to comment.