Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce activationThreshold/minMetricValue for ActiveMQ Scaler #3339

Merged
merged 11 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

- **General:** Add support to customize HPA name ([3057](https://github.com/kedacore/keda/issues/3057))
- **General:** Basic setup for migrating e2e tests to Go. ([#2737](https://github.com/kedacore/keda/issues/2737))
- **General:** Introduce activationThreshold/minMetricValue for all scalers ([#2800](https://github.com/kedacore/keda/issues/2800))
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved
- **General:** Introduce new AWS DynamoDB Streams Scaler ([#3124](https://github.com/kedacore/keda/issues/3124))
- **General:** Support for Azure AD Workload Identity as a pod identity provider. ([#2487](https://github.com/kedacore/keda/issues/2487)|[#2656](https://github.com/kedacore/keda/issues/2656))
- **General:** Support for permission segregation when using Azure AD Pod / Workload Identity. ([#2656](https://github.com/kedacore/keda/issues/2656))
Expand Down
38 changes: 25 additions & 13 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ type activeMQScaler struct {
}

type activeMQMetadata struct {
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
corsHeader string
metricName string
scalerIndex int
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
activationTargetQueueSize int64
corsHeader string
metricName string
scalerIndex int
}

type activeMQMonitoring struct {
Expand All @@ -46,8 +47,9 @@ type activeMQMonitoring struct {
}

const (
defaultTargetQueueSize = 10
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
defaultTargetQueueSize = 10
defaultactivationTargetQueueSize = 0
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

var activeMQLog = logf.Log.WithName("activeMQ_scaler")
Expand Down Expand Up @@ -110,6 +112,16 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
meta.targetQueueSize = defaultTargetQueueSize
}

if val, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok {
activationTargetQueueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer")
}
meta.activationTargetQueueSize = activationTargetQueueSize
} else {
meta.activationTargetQueueSize = defaultactivationTargetQueueSize
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
Expand Down Expand Up @@ -162,7 +174,7 @@ func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) {
return false, err
}

return queueSize > 0, nil
return queueSize > s.metadata.activationTargetQueueSize, nil
}

// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
Expand Down
27 changes: 22 additions & 5 deletions pkg/scalers/activemq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type activeMQMetricIdentifier struct {
// Setting metric identifier mock name
var activeMQMetricIdentifiers = []activeMQMetricIdentifier{
{&testActiveMQMetadata[1], 0, "s0-activemq-testQueue"},
{&testActiveMQMetadata[9], 1, "s1-activemq-testQueue"},
{&testActiveMQMetadata[10], 1, "s1-activemq-testQueue"},
}

var testActiveMQMetadata = []parseActiveMQMetadataTestData{
Expand All @@ -40,10 +40,11 @@ var testActiveMQMetadata = []parseActiveMQMetadataTestData{
{
name: "properly formed metadata",
metadata: map[string]string{
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"activationTargetQueueSize": "0",
},
authParams: map[string]string{
"username": "testUsername",
Expand Down Expand Up @@ -80,6 +81,22 @@ var testActiveMQMetadata = []parseActiveMQMetadataTestData{
},
isError: true,
},
{
name: "Invalid activatingTargetQueueSize using a string",
metadata: map[string]string{
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"activationTargetQueueSize": "AA",
"metricName": "testMetricName",
},
authParams: map[string]string{
"username": "testUsername",
"password": "pass123",
},
isError: true,
},
{
name: "missing management endpoint should fail",
metadata: map[string]string{
Expand Down
18 changes: 18 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,24 @@ func WaitForDeploymentReplicaCountChange(t *testing.T, kc *kubernetes.Clientset,
return int(replicas)
}

// Waits some time to ensure that the replica count doesn't change.
func AssertReplicaCountNotChangeDuringTime(t *testing.T, kc *kubernetes.Clientset, name, namespace string, target, intervalSeconds int) {
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
t.Log("Waiting for some time to ensure deployment replica count doesn't change")
var replicas int32

for i := 0; i < intervalSeconds; i++ {
deployment, _ := kc.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
replicas = deployment.Status.Replicas

if replicas != int32(target) {
assert.Fail(t, fmt.Sprintf("%s replica count has changed from %d to %d", name, target, replicas))
return
}

time.Sleep(time.Second)
}
}

func WaitForHpaCreation(t *testing.T, kc *kubernetes.Clientset, name, namespace string,
iterations, intervalSeconds int) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
Expand Down
4 changes: 2 additions & 2 deletions tests/run-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
set -u

# TODO - Remove TypeScript regex after all tests have been migrated to Go.
E2E_REGEX_GO="./${E2E_TEST_REGEX:-*_test.go}"
E2E_REGEX_TS="./${E2E_TEST_REGEX:-*.test.ts}"
E2E_REGEX_GO="./*${E2E_TEST_REGEX:-*_test.go}"
E2E_REGEX_TS="./*${E2E_TEST_REGEX:-*.test.ts}"

DIR=$(dirname "$0")
cd $DIR
Expand Down
2 changes: 1 addition & 1 deletion tests/run-arm-smoke-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cd $DIR
# TODO - Replace with Go tests.
test_files=(
"scalers_go/kubernetes_workload/kubernetes_workload_test.go"
"scalers/activemq.test.ts"
"scalers_go/activemq/activemq_test.go"
"scalers_go/cron/cron_test.go"
)

Expand Down
Loading