Skip to content

Commit

Permalink
feat: Provide support for explicitly pausing autoscaling of ScaledJobs
Browse files Browse the repository at this point in the history
--- Accreditation ---

Original PR: #3828

Borrows from the work originally implemented by: https://github.com/keegancwinchester

Note: I would have loved to have pulled the commit from the original branch, but I could not be able to.

Documentation MR by original implementor: kedacore/keda-docs#932

--- Fixes ---

Fixes # #3656

--- PR Notes ---

Introduce annotation to pause ScaledJobs.

Signed-off-by: BenJ <benjamin.jessop@fluxfederation.com>
  • Loading branch information
flux-benj committed Jun 21, 2023
1 parent bc92124 commit c1980b4
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277))
- **Redis Scalers**: Allow scaling using consumer group lag ([#3127](https://github.com/kedacore/keda/issues/3127))
- **General:** Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234))
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledJobs to pause autoscaling ([#3303](https://github.com/kedacore/keda/issues/3303))

### Improvements

Expand Down
11 changes: 11 additions & 0 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ const (
ScaledObjectConditionPausedMessage = "ScaledObject is paused"
)

const (
// ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob
ScaledJobConditionPausedReason = "ScaledJobPaused"
// ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob
ScaledJobConditionUnpausedReason = "ScaledJobUnpaused"
// ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob
ScaledJobConditionPausedMessage = "ScaledJob is paused"
// ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob
ScaledJobConditionUnpausedMessage = "ScaledJob is unpaused"
)

// Condition to store the condition state
type Condition struct {
// Type of condition
Expand Down
3 changes: 3 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// +kubebuilder:printcolumn:name="Authentication",type="string",JSONPath=".spec.triggers[*].authenticationRef.name"
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status"
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
// +kubebuilder:printcolumn:name="Paused",type="string",JSONPath=".status.conditions[?(@.type==\"Paused\")].status"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// ScaledJob is the Schema for the scaledjobs API
Expand Down Expand Up @@ -78,6 +79,8 @@ type ScaledJobStatus struct {
LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Paused string `json:"Paused,omitempty"`
}

// ScaledJobList contains a list of ScaledJob
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ spec:
- jsonPath: .status.conditions[?(@.type=="Active")].status
name: Active
type: string
- jsonPath: .status.conditions[?(@.type=="Paused")].status
name: Paused
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down Expand Up @@ -8264,6 +8267,8 @@ spec:
status:
description: ScaledJobStatus defines the observed state of ScaledJob
properties:
Paused:
type: string
conditions:
description: Conditions an array representation to store multiple
Conditions
Expand Down
44 changes: 41 additions & 3 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/scaling"
Expand Down Expand Up @@ -84,7 +85,11 @@ func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options control
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
predicate.GenerationChangedPredicate{},
))).
Complete(r)
}

Expand Down Expand Up @@ -136,8 +141,8 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob, &conditions)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
Expand All @@ -159,7 +164,15 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (string, error) {
isPaused, err := r.checkIfPaused(ctx, logger, scaledJob, conditions)
if err != nil {
return "Failed to check if ScaledJob was paused", err
}
if isPaused {
return "ScaledJob is paused, skipping reconcile loop", err
}

// nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable
msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob)
if err != nil {
Expand Down Expand Up @@ -193,6 +206,31 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
return "ScaledJob is defined correctly and is ready to scaling", nil
}

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
if pausedAnnotation {
if !pausedStatus {
logger.Info("ScaledJob is paused, stopping scaling loop.")
msg := kedav1alpha1.ScaledJobConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil {
msg = "failed to stop the scale loop for paused ScaledJob"
conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledJobStopScaleLoopFailed", msg)
return false, err
}
conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledJobConditionPausedReason, msg)
}
return true, nil
}
if pausedStatus {
logger.Info("Unpausing ScaledJob.")
msg := kedav1alpha1.ScaledJobConditionUnpausedMessage
conditions.SetPausedCondition(metav1.ConditionFalse, kedav1alpha1.ScaledJobConditionUnpausedReason, msg)
}
return false, nil
}

// Delete Jobs owned by the previous version of the scaledJob based on the rolloutStrategy given for this scaledJob, if any
func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
var rolloutStrategy string
Expand Down
28 changes: 28 additions & 0 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const PausedAnnotation = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand Down Expand Up @@ -61,3 +63,29 @@ func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool {

return false
}

type PausedPredicate struct {
predicate.Funcs
}

func (PausedPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}

newAnnotations := e.ObjectNew.GetAnnotations()
oldAnnotations := e.ObjectOld.GetAnnotations()

newPausedValue := ""
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedAnnotation]
}

return newPausedValue != oldPausedValue
}
213 changes: 213 additions & 0 deletions tests/internals/pause_scaledjob/pause_scaledjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
//go:build e2e
// +build e2e

// go test -v -tags e2e ./internals/pause_scaledjob/pause_scaledjob_test.go

package pause_scaledjob_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
)

// Load environment variables from .env file

const (
testName = "pause-scaledjob-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
serviceName = fmt.Sprintf("%s-service", testName)
scalerName = fmt.Sprintf("%s-scaler", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
minReplicaCount = 0
maxReplicaCount = 3
iterationCountInitial = 15
iterationCountLatter = 30
)

type templateData struct {
TestNamespace string
ServiceName string
ScalerName string
ScaledJobName string
MinReplicaCount, MaxReplicaCount int
MetricThreshold, MetricValue int
}

const (
serviceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{.ServiceName}}
namespace: {{.TestNamespace}}
spec:
ports:
- port: 6000
targetPort: 6000
selector:
app: {{.ScalerName}}
`

scalerTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.ScalerName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScalerName}}
spec:
replicas: 1
selector:
matchLabels:
app: {{.ScalerName}}
template:
metadata:
labels:
app: {{.ScalerName}}
spec:
containers:
- name: scaler
image: ghcr.io/kedacore/tests-external-scaler-e2e:latest
imagePullPolicy: Always
ports:
- containerPort: 6000
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
spec:
pollingInterval: 5
maxReplicaCount: {{.MaxReplicaCount}}
minReplicaCount: {{.MinReplicaCount}}
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 0
jobTargetRef:
template:
spec:
containers:
- name: external-executor
image: busybox
command:
- sleep
- "15"
imagePullPolicy: IfNotPresent
restartPolicy: Never
backoffLimit: 1
triggers:
- type: external
metadata:
scalerAddress: {{.ServiceName}}.{{.TestNamespace}}:6000
metricThreshold: "{{.MetricThreshold}}"
metricValue: "{{.MetricValue}}"
`
)

// Util function
func WaitForJobByFilterCountUntilIteration(t *testing.T, kc *kubernetes.Clientset, namespace string,
target, iterations, intervalSeconds int, listOptions metav1.ListOptions) bool {
var isTargetAchieved = false

for i := 0; i < iterations; i++ {
jobList, _ := kc.BatchV1().Jobs(namespace).List(context.Background(), listOptions)
count := len(jobList.Items)

t.Logf("Waiting for job count to hit target. Namespace - %s, Current - %d, Target - %d",
namespace, count, target)

if count == target {
isTargetAchieved = true
} else {
isTargetAchieved = false
}

time.Sleep(time.Duration(intervalSeconds) * time.Second)
}

return isTargetAchieved
}

func TestScaler(t *testing.T) {
// setup
t.Log("--- setting up ---")

// Create kubernetes resources
kc := GetKubernetesClient(t)
metricValue := 1

data, templates := getTemplateData(metricValue)

listOptions := metav1.ListOptions{
FieldSelector: "status.successful=0",
}

CreateKubernetesResources(t, kc, testNamespace, data, templates)

assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, data.MetricThreshold, iterationCountInitial, 1, listOptions),
"job count should be %d after %d iterations", data.MetricThreshold, iterationCountInitial)

// test scaling
testPause(t, kc, listOptions)
testUnpause(t, kc, data, listOptions)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
}

func getTemplateData(metricValue int) (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
ScaledJobName: scaledJobName,
ScalerName: scalerName,
ServiceName: serviceName,
MinReplicaCount: minReplicaCount,
MaxReplicaCount: maxReplicaCount,
MetricThreshold: 1,
MetricValue: metricValue,
}, []Template{
{Name: "scalerTemplate", Config: scalerTemplate},
{Name: "serviceTemplate", Config: serviceTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testPause(t *testing.T, kc *kubernetes.Clientset, listOptions metav1.ListOptions) {
t.Log("--- testing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused=true --namespace %s", scaledJobName, testNamespace))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count does not change as job is paused")

expectedTarget := 0
assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}

func testUnpause(t *testing.T, kc *kubernetes.Clientset, data templateData, listOptions metav1.ListOptions) {
t.Log("--- testing removing Paused annotation ---")

_, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused- --namespace %s", scaledJobName, testNamespace))
assert.NoErrorf(t, err, "cannot execute command - %s", err)

t.Log("job count increases from zero as job is no longer paused")

expectedTarget := data.MetricThreshold
assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions),
"job count should be %d after %d iterations", expectedTarget, iterationCountLatter)
}
Loading

0 comments on commit c1980b4

Please sign in to comment.