Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronya authored Aug 2, 2021
2 parents e33dcc5 + 7e7d42b commit 690bc39
Show file tree
Hide file tree
Showing 19 changed files with 7,283 additions and 2,310 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
concurrency: e2e-tests
jobs:
validate:
name: Validate
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/nightly-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: nightly-e2e-test
on:
schedule:
- cron: "0 0 * * *"
concurrency: e2e-tests
jobs:
test:
name: Test
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: CI
on:
- push
- pull_request

jobs:
validate:
name: Validate PR
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))
- Introduce Idle Replica Mode ([#1958](https://github.com/kedacore/keda/pull/1958))
- Support pod conditions for pending job count calculation ([#1970](https://github.com/kedacore/keda/pull/1970))
- Add new scaler for Selenium Grid ([#1971](https://github.com/kedacore/keda/pull/1971))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))

### Improvements

Expand All @@ -37,6 +39,7 @@
- IBM MQ scaler password handling fix ([#1939](https://github.com/kedacore/keda/pull/1939))
- Metrics APIServer: Add ratelimiting parameters to override client ([#1944](https://github.com/kedacore/keda/pull/1944))
- Optimize KafkaScaler by fetching all topic offsets using a single HTTP request ([#1956](https://github.com/kedacore/keda/pull/1956))
- Adjusts InfluxDB scaler to support queries that return integers in addition to those that return floats ([#1977](https://github.com/kedacore/keda/pull/1977))

### Breaking Changes

Expand Down
8 changes: 4 additions & 4 deletions controllers/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Ω(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionTrue))
}, 20*time.Second).Should(Equal(metav1.ConditionTrue))
})

It("doesn't allow MinReplicaCount > MaxReplicaCount", func() {
Expand Down Expand Up @@ -345,7 +345,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Ω(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionFalse))
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})

It("doesn't allow IdleReplicaCount > MinReplicaCount", func() {
Expand Down Expand Up @@ -388,7 +388,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Ω(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionFalse))
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})

It("doesn't allow IdleReplicaCount > MaxReplicaCount, when MinReplicaCount is not explicitly defined", func() {
Expand Down Expand Up @@ -431,7 +431,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Ω(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 5*time.Second).Should(Equal(metav1.ConditionFalse))
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})
})
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func queryInfluxDB(queryAPI api.QueryAPI, query string) (float64, error) {
return 0, fmt.Errorf("no results found from query")
}

val, ok := result.Record().Value().(float64)
if !ok {
return 0, fmt.Errorf("value could not be parsed into a float")
switch valRaw := result.Record().Value().(type) {
case float64:
return valRaw, nil
case int64:
return float64(valRaw), nil
default:
return 0, fmt.Errorf("value of type %T could not be converted into a float", valRaw)
}

return val, nil
}

// GetMetrics connects to influxdb via the client and returns a value based on the query
Expand Down
121 changes: 112 additions & 9 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
defaultProtocol = autoProtocol
)

const (
sumOperation = "sum"
avgOperation = "avg"
maxOperation = "max"
defaultOperation = sumOperation
)

type rabbitMQScaler struct {
metadata *rabbitMQMetadata
connection *amqp.Connection
Expand All @@ -51,6 +58,8 @@ type rabbitMQMetadata struct {
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
operation string // specify the operation to apply in case of multiples queues
}

type queueInfo struct {
Expand Down Expand Up @@ -164,6 +173,25 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.vhostName = &val
}

// Resolve useRegex
if val, ok := config.TriggerMetadata["useRegex"]; ok {
useRegex, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("useRegex has invalid value")
}
meta.useRegex = useRegex
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
meta.operation = val
}

if meta.useRegex && meta.protocol == amqpProtocol {
return nil, fmt.Errorf("configure only useRegex with http protocol")
}

_, err := parseTrigger(&meta, config)
if err != nil {
return nil, fmt.Errorf("unable to parse trigger: %s", err)
Expand Down Expand Up @@ -290,19 +318,31 @@ func (s *rabbitMQScaler) getQueueStatus() (int, float64, error) {
return items.Messages, 0, nil
}

func getJSON(httpClient *http.Client, url string, target interface{}) error {
r, err := httpClient.Get(url)
func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) {
var result queueInfo
r, err := s.httpClient.Get(url)
if err != nil {
return err
return result, err
}
defer r.Body.Close()

if r.StatusCode == 200 {
return json.NewDecoder(r.Body).Decode(target)
if s.metadata.useRegex {
var results []queueInfo
err = json.NewDecoder(r.Body).Decode(&results)
if err != nil {
return result, err
}
result, err := getComposedQueue(s, results)
return result, err
}

err = json.NewDecoder(r.Body).Decode(&result)
return result, err
}

body, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
return result, fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url)
}

func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
Expand All @@ -324,11 +364,15 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
}

parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
}

getQueueInfoManagementURI := fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)

info := queueInfo{}
err = getJSON(s.httpClient, getQueueInfoManagementURI, &info)
var info queueInfo
info, err = getJSON(s, getQueueInfoManagementURI)

if err != nil {
return nil, err
Expand Down Expand Up @@ -386,3 +430,62 @@ func (s *rabbitMQScaler) GetMetrics(ctx context.Context, metricName string, metr

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getComposedQueue(s *rabbitMQScaler, q []queueInfo) (queueInfo, error) {
var queue = queueInfo{}
queue.Name = "composed-queue"
queue.MessagesUnacknowledged = 0
if len(q) > 0 {
switch s.metadata.operation {
case sumOperation:
sumMessages, sumRate := getSum(q)
queue.Messages = sumMessages
queue.MessageStat.PublishDetail.Rate = sumRate
case avgOperation:
avgMessages, avgRate := getAverage(q)
queue.Messages = avgMessages
queue.MessageStat.PublishDetail.Rate = avgRate
case maxOperation:
maxMessages, maxRate := getMaximum(q)
queue.Messages = maxMessages
queue.MessageStat.PublishDetail.Rate = maxRate
default:
return queue, fmt.Errorf("operation mode %s must be one of %s, %s, %s", s.metadata.operation, sumOperation, avgOperation, maxOperation)
}
} else {
queue.Messages = 0
queue.MessageStat.PublishDetail.Rate = 0
}

return queue, nil
}

func getSum(q []queueInfo) (int, float64) {
var sumMessages int
var sumRate float64
for _, value := range q {
sumMessages += value.Messages
sumRate += value.MessageStat.PublishDetail.Rate
}
return sumMessages, sumRate
}

func getAverage(q []queueInfo) (int, float64) {
sumMessages, sumRate := getSum(q)
len := len(q)
return sumMessages / len, sumRate / float64(len)
}

func getMaximum(q []queueInfo) (int, float64) {
var maxMessages int
var maxRate float64
for _, value := range q {
if value.Messages > maxMessages {
maxMessages = value.Messages
}
if value.MessageStat.PublishDetail.Rate > maxRate {
maxRate = value.MessageStat.PublishDetail.Rate
}
}
return maxMessages, maxRate
}
Loading

0 comments on commit 690bc39

Please sign in to comment.