-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathscale_jobs.go
149 lines (119 loc) · 4.6 KB
/
scale_jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package handler
import (
"context"
"fmt"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
version "github.com/kedacore/keda/version"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func (h *ScaleHandler) scaleJobs(scaledObject *kedav1alpha1.ScaledObject, isActive bool, scaleTo int64, maxScale int64) {
runningJobCount := h.getRunningJobCount(scaledObject, maxScale)
h.logger.Info("Scaling Jobs", "Number of running Jobs ", runningJobCount)
var effectiveMaxScale int64
effectiveMaxScale = maxScale - runningJobCount
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
h.logger.Info("Scaling Jobs")
if isActive {
h.logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledObject.Status.LastActiveTime = &now
h.updateScaledObjectStatus(scaledObject)
h.createJobs(scaledObject, scaleTo, effectiveMaxScale)
} else {
h.logger.V(1).Info("No change in activity")
}
return
}
func (h *ScaleHandler) createJobs(scaledObject *kedav1alpha1.ScaledObject, scaleTo int64, maxScale int64) {
scaledObject.Spec.JobTargetRef.Template.GenerateName = scaledObject.GetName() + "-"
if scaledObject.Spec.JobTargetRef.Template.Labels == nil {
scaledObject.Spec.JobTargetRef.Template.Labels = map[string]string{}
}
scaledObject.Spec.JobTargetRef.Template.Labels["scaledobject"] = scaledObject.GetName()
h.logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
}
h.logger.Info("Creating jobs", "Number of jobs", scaleTo)
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledObject.GetName() + "-",
Namespace: scaledObject.GetNamespace(),
Labels: map[string]string{
"app.kubernetes.io/name": scaledObject.GetName(),
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledObject.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledobject": scaledObject.GetName(),
},
},
Spec: *scaledObject.Spec.JobTargetRef.DeepCopy(),
}
// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
// we should set this property to allowed value in that case
if job.Spec.Template.Spec.RestartPolicy == "" {
h.logger.V(1).Info("Job RestartPolicy is not set, setting it to 'OnFailure', to avoid setting it to the client's default value 'Always'")
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}
// Set ScaledObject instance as the owner and controller
err := controllerutil.SetControllerReference(scaledObject, job, h.reconcilerScheme)
if err != nil {
h.logger.Error(err, "Failed to set ScaledObject as the owner of the new Job")
}
err = h.client.Create(context.TODO(), job)
if err != nil {
h.logger.Error(err, "Failed to create a new Job")
}
}
h.logger.Info("Created jobs", "Number of jobs", scaleTo)
}
func (h *ScaleHandler) resolveJobEnv(scaledObject *kedav1alpha1.ScaledObject) (map[string]string, error) {
if len(scaledObject.Spec.JobTargetRef.Template.Spec.Containers) < 1 {
return nil, fmt.Errorf("Scaled Object (%s) doesn't have containers", scaledObject.GetName())
}
container := scaledObject.Spec.JobTargetRef.Template.Spec.Containers[0]
return h.resolveEnv(&container, scaledObject.GetNamespace())
}
func (h *ScaleHandler) parseJobAuthRef(triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, scaledObject *kedav1alpha1.ScaledObject) (map[string]string, string) {
return h.parseAuthRef(triggerAuthRef, scaledObject, func(name, containerName string) string {
env, err := h.resolveJobEnv(scaledObject)
if err != nil {
return ""
}
return env[name]
})
}
func (h *ScaleHandler) isJobFinished(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
return true
}
}
return false
}
func (h *ScaleHandler) getRunningJobCount(scaledObject *kedav1alpha1.ScaledObject, maxScale int64) int64 {
var runningJobs int64
opts := []client.ListOption{
client.InNamespace(scaledObject.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledobject": scaledObject.GetName()}),
}
jobs := &batchv1.JobList{}
err := h.client.List(context.TODO(), jobs, opts...)
if err != nil {
return 0
}
for _, job := range jobs.Items {
if !h.isJobFinished(&job) {
runningJobs++
}
}
return runningJobs
}