diff --git a/pkg/controller/workload/job/helpers.go b/pkg/controller/workload/job/helpers.go new file mode 100644 index 0000000000..ee75b0ab43 --- /dev/null +++ b/pkg/controller/workload/job/helpers.go @@ -0,0 +1,266 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" + utilpriority "sigs.k8s.io/kueue/pkg/util/priority" + "sigs.k8s.io/kueue/pkg/workload" +) + +// EnsureOneWorkload will query for the single matched workload corresponding to job and return it. +// If there're more than one workload, we should delete the excess ones. +// The returned workload could be nil. +func EnsureOneWorkload(ctx context.Context, cli client.Client, req ctrl.Request, record record.EventRecorder, job GenericJob) (*kueue.Workload, error) { + log := ctrl.LoggerFrom(ctx) + + // Find a matching workload first if there is one. + var toDelete []*kueue.Workload + var match *kueue.Workload + + if pwName := job.ParentWorkloadName(); pwName != "" { + pw := kueue.Workload{} + NamespacedName := types.NamespacedName{ + Name: pwName, + Namespace: job.Object().GetNamespace(), + } + if err := cli.Get(ctx, NamespacedName, &pw); err != nil { + if !apierrors.IsNotFound(err) { + return nil, err + } + log.V(2).Info("job with no matching parent workload", "parent-workload", pwName) + } else { + match = &pw + } + } + + var workloads kueue.WorkloadList + if err := cli.List(ctx, &workloads, client.InNamespace(job.Object().GetNamespace()), + client.MatchingFields{ownerKey: job.Object().GetName()}); err != nil { + log.Error(err, "Unable to list child workloads") + return nil, err + } + + for i := range workloads.Items { + w := &workloads.Items[i] + owner := metav1.GetControllerOf(w) + // Indexes don't work in unit tests, so we explicitly check for the + // owner here. + if owner.Name != job.Object().GetName() { + continue + } + if match == nil && job.EquivalentToWorkload(*w) { + match = w + } else { + toDelete = append(toDelete, w) + } + } + + // If there is no matching workload and the job is running, suspend it. + if match == nil && !job.IsSuspend() { + log.V(2).Info("job with no matching workload, suspending") + var w *kueue.Workload + if len(workloads.Items) == 1 { + // The job may have been modified and hence the existing workload + // doesn't match the job anymore. All bets are off if there are more + // than one workload... + w = &workloads.Items[0] + } + if err := StopJob(ctx, cli, record, job, w, "No matching Workload"); err != nil { + log.Error(err, "stopping job") + } + } + + // Delete duplicate workload instances. + existedWls := 0 + for i := range toDelete { + err := cli.Delete(ctx, toDelete[i]) + if err == nil || !apierrors.IsNotFound(err) { + existedWls++ + } + if err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "Failed to delete workload") + } + if err == nil { + record.Eventf(job.Object(), corev1.EventTypeNormal, "DeletedWorkload", + "Deleted not matching Workload: %v", workload.Key(toDelete[i])) + } + } + + if existedWls != 0 { + if match == nil { + return nil, fmt.Errorf("no matching workload was found, tried deleting %d existing workload(s)", existedWls) + } + return nil, fmt.Errorf("only one workload should exist, found %d", len(workloads.Items)) + } + + return match, nil +} + +// StartJob will unsuspend the job, and also inject the node affinity. +func StartJob(ctx context.Context, client client.Client, record record.EventRecorder, job GenericJob, wl *kueue.Workload) error { + nodeSelectors, err := GetNodeSelectors(ctx, client, wl) + if err != nil { + return err + } + + if err := job.InjectNodeAffinity(nodeSelectors); err != nil { + return err + } + + if err := job.UnSuspend(); err != nil { + return err + } + + if err := client.Update(ctx, job.Object()); err != nil { + return err + } + + record.Eventf(job.Object(), corev1.EventTypeNormal, "Started", + "Admitted by clusterQueue %v", wl.Spec.Admission.ClusterQueue) + + return nil +} + +// StopJob will suspend the job, and also restore node affinity, reset job status if needed. +func StopJob(ctx context.Context, client client.Client, record record.EventRecorder, job GenericJob, wl *kueue.Workload, eventMsg string) error { + // Suspend the job at first then we're able to update the scheduling directives. + if err := job.Suspend(); err != nil { + return err + } + + if err := client.Update(ctx, job.Object()); err != nil { + return err + } + + record.Eventf(job.Object(), corev1.EventTypeNormal, "Stopped", eventMsg) + + if job.ResetStatus() { + if err := client.Status().Update(ctx, job.Object()); err != nil { + return err + } + } + + if wl != nil { + if err := job.RestoreNodeAffinity([]map[string]string{wl.Spec.PodSets[0].Spec.NodeSelector}); err != nil { + return err + } + return client.Update(ctx, job.Object()) + } + + return nil +} + +// CreateWorkload will create a workload from the corresponding job. +func CreateWorkload(ctx context.Context, client client.Client, scheme *runtime.Scheme, job GenericJob) (*kueue.Workload, error) { + wl, err := ConstructWorkload(ctx, client, scheme, job) + if err != nil { + return nil, err + } + + if err = client.Create(ctx, wl); err != nil { + return nil, err + } + + return wl, nil +} + +// ConstructWorkload will derive a workload from the corresponding job. +func ConstructWorkload(ctx context.Context, client client.Client, scheme *runtime.Scheme, job GenericJob) (*kueue.Workload, error) { + wl := &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: GetWorkloadNameForJob(job.Object().GetName()), + Namespace: job.Object().GetNamespace(), + }, + Spec: kueue.WorkloadSpec{ + PodSets: job.PodSets(), + QueueName: job.QueueName(), + }, + } + + priorityClassName, p, err := utilpriority.GetPriorityFromPriorityClass( + ctx, client, job.PriorityClass()) + if err != nil { + return nil, err + } + + wl.Spec.PriorityClassName = priorityClassName + wl.Spec.Priority = &p + + if err := ctrl.SetControllerReference(job.Object(), wl, scheme); err != nil { + return nil, err + } + return wl, nil +} + +// GetNodeSelectors will extract node selectors from admitted workloads. +func GetNodeSelectors(ctx context.Context, client client.Client, w *kueue.Workload) ([]map[string]string, error) { + if len(w.Spec.Admission.PodSetFlavors) == 0 { + return nil, nil + } + + nodeSelectors := make([]map[string]string, len(w.Spec.Admission.PodSetFlavors)) + + for i, podSetFlavor := range w.Spec.Admission.PodSetFlavors { + processedFlvs := sets.NewString() + nodeSelector := map[string]string{} + for _, flvName := range podSetFlavor.Flavors { + if processedFlvs.Has(flvName) { + continue + } + // Lookup the ResourceFlavors to fetch the node affinity labels to apply on the job. + flv := kueue.ResourceFlavor{} + if err := client.Get(ctx, types.NamespacedName{Name: flvName}, &flv); err != nil { + return nil, err + } + for k, v := range flv.NodeSelector { + nodeSelector[k] = v + } + processedFlvs.Insert(flvName) + } + + nodeSelectors[i] = nodeSelector + } + return nodeSelectors, nil +} + +// UpdateQueueNameIfChanged will update workload queue name if changed. +func UpdateQueueNameIfChanged(ctx context.Context, client client.Client, job GenericJob, wl *kueue.Workload) error { + queueName := job.QueueName() + if wl.Spec.QueueName != queueName { + wl.Spec.QueueName = queueName + return client.Update(ctx, wl) + } + return nil +} + +// SetWorkloadCondition will update the workload condition by the provide one. +func SetWorkloadCondition(ctx context.Context, client client.Client, wl *kueue.Workload, condition metav1.Condition) error { + apimeta.SetStatusCondition(&wl.Status.Conditions, condition) + return client.Status().Update(ctx, wl) +} diff --git a/pkg/controller/workload/job/interface.go b/pkg/controller/workload/job/interface.go new file mode 100644 index 0000000000..63ccc3c02c --- /dev/null +++ b/pkg/controller/workload/job/interface.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" +) + +type GenericJob interface { + // Object returns the job instance. + Object() client.Object + // IsSuspend returns whether the job is suspended or not. + IsSuspend() bool + // Suspend will suspend the job. + Suspend() error + // UnSuspend will unsuspend the job. + UnSuspend() error + // ResetStatus will reset the job status to the original state. + // If true, status is modified, if not, status is as it was. + ResetStatus() bool + // InjectNodeAffinity will inject the node affinity extracting from workload to job. + InjectNodeAffinity(nodeSelectors []map[string]string) error + // RestoreNodeAffinity will restore the original node affinity of job. + RestoreNodeAffinity(nodeSelectors []map[string]string) error + // Finished means whether the job is completed/failed or not, + // condition represents the workload finished condition. + Finished() (condition metav1.Condition, finished bool) + // PodSets will build workload podSets corresponding to the job. + PodSets() []kueue.PodSet + // EquivalentToWorkload validates whether the workload is semantically equal to the job. + EquivalentToWorkload(wl kueue.Workload) bool + // PriorityClass returns the job's priority class name. + PriorityClass() string + // QueueName returns the queue name the job enqueued. + QueueName() string + // ParentWorkloadName returns the parent workload name. + ParentWorkloadName() string + // IsActive returns true if there are any running pods. + IsActive() bool + // PodsReady instructs whether job derived pods are all ready now. + PodsReady() bool +} diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index a274861749..2c63cf27f8 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -23,12 +23,10 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -42,7 +40,6 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/workload/jobframework" - utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -154,6 +151,158 @@ func (h *parentWorkloadHandler) queueReconcileForChildJob(object client.Object, } } +//var _ GenericJob = &BatchJob{} + +type BatchJob struct { + batchv1.Job +} + +func (b *BatchJob) Object() client.Object { + return &b.Job +} + +func (b *BatchJob) ParentWorkloadName() string { + return parentWorkloadName(&b.Job) +} + +func (b *BatchJob) QueueName() string { + return queueName(&b.Job) +} + +func (b *BatchJob) IsSuspend() bool { + return b.Spec.Suspend != nil && *b.Spec.Suspend +} + +func (b *BatchJob) IsActive() bool { + return b.Status.Active != 0 +} + +func (b *BatchJob) Suspend() error { + b.Spec.Suspend = pointer.Bool(true) + return nil +} + +func (b *BatchJob) UnSuspend() error { + b.Spec.Suspend = pointer.Bool(false) + return nil +} + +func (b *BatchJob) ResetStatus() bool { + // Reset start time so we can update the scheduling directives later when unsuspending. + if b.Status.StartTime == nil { + return false + } + b.Status.StartTime = nil + return true +} + +func (b *BatchJob) PodSets() []kueue.PodSet { + return []kueue.PodSet{ + { + Spec: *b.Spec.Template.Spec.DeepCopy(), + Count: b.podsCount(), + }, + } +} + +func (b *BatchJob) InjectNodeAffinity(nodeSelectors []map[string]string) error { + if len(nodeSelectors) == 0 { + return nil + } + + if b.Spec.Template.Spec.NodeSelector == nil { + b.Spec.Template.Spec.NodeSelector = nodeSelectors[0] + } else { + for k, v := range nodeSelectors[0] { + b.Spec.Template.Spec.NodeSelector[k] = v + } + } + + return nil +} + +func (b *BatchJob) RestoreNodeAffinity(nodeSelectors []map[string]string) error { + if len(nodeSelectors) == 0 || equality.Semantic.DeepEqual(b.Spec.Template.Spec.NodeSelector, nodeSelectors) { + return nil + } + + b.Spec.Template.Spec.NodeSelector = map[string]string{} + + for k, v := range nodeSelectors[0] { + b.Spec.Template.Spec.NodeSelector[k] = v + } + return nil +} + +func (b *BatchJob) Finished() (metav1.Condition, bool) { + var conditionType batchv1.JobConditionType + var finished bool + + for _, c := range b.Status.Conditions { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { + conditionType = c.Type + finished = true + break + } + } + + condition := metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: "JobFinished", + Message: "Job finished successfully", + } + if conditionType == batchv1.JobFailed { + condition.Message = "Job failed" + } + + return condition, finished +} + +func (b *BatchJob) EquivalentToWorkload(wl kueue.Workload) bool { + owner := metav1.GetControllerOf(&wl) + // Indexes don't work in unit tests, so we explicitly check for the + // owner here. + if owner.Name != b.Name { + return false + } + + if len(wl.Spec.PodSets) != 1 { + return false + } + + if *b.Spec.Parallelism != wl.Spec.PodSets[0].Count { + return false + } + + // nodeSelector may change, hence we are not checking for + // equality of the whole job.Spec.Template.Spec. + if !equality.Semantic.DeepEqual(b.Spec.Template.Spec.InitContainers, + wl.Spec.PodSets[0].Spec.InitContainers) { + return false + } + return equality.Semantic.DeepEqual(b.Spec.Template.Spec.Containers, + wl.Spec.PodSets[0].Spec.Containers) +} + +func (b *BatchJob) PriorityClass() string { + return b.Spec.Template.Spec.PriorityClassName +} + +func (b *BatchJob) PodsReady() bool { + ready := pointer.Int32Deref(b.Job.Status.Ready, 0) + return b.Job.Status.Succeeded+ready >= b.podsCount() +} + +func (b *BatchJob) podsCount() int32 { + // parallelism is always set as it is otherwise defaulted by k8s to 1 + podsCount := *(b.Spec.Parallelism) + if b.Spec.Completions != nil && *b.Spec.Completions < podsCount { + podsCount = *b.Spec.Completions + } + return podsCount +} + // SetupWithManager sets up the controller with the Manager. It indexes workloads // based on the owning jobs. func (r *JobReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -199,20 +348,21 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - var job batchv1.Job - if err := r.client.Get(ctx, req.NamespacedName, &job); err != nil { + var batchJob BatchJob + if err := r.client.Get(ctx, req.NamespacedName, &batchJob.Job); err != nil { // we'll ignore not-found errors, since there is nothing to do. return ctrl.Result{}, client.IgnoreNotFound(err) } + var genericJob GenericJob = &batchJob - log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&job)) + log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&batchJob.Job)) ctx = ctrl.LoggerInto(ctx, log) - isStandaloneJob := parentWorkloadName(&job) == "" + isStandaloneJob := genericJob.ParentWorkloadName() == "" // when manageJobsWithoutQueueName is disabled we only reconcile jobs that have either // queue-name or the parent-workload annotation set. - if !r.manageJobsWithoutQueueName && queueName(&job) == "" && isStandaloneJob { + if !r.manageJobsWithoutQueueName && genericJob.QueueName() == "" && isStandaloneJob { log.V(3).Info(fmt.Sprintf("Neither %s, nor %s annotation is set, ignoring the job", constants.QueueAnnotation, constants.ParentWorkloadAnnotation)) return ctrl.Result{}, nil } @@ -221,24 +371,21 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R // 1. make sure there is only a single existing instance of the workload. // If there's no workload exists and job is unsuspended, we'll stop it immediately. - wl, err := r.ensureAtMostOneWorkload(ctx, &job) + wl, err := EnsureOneWorkload(ctx, r.client, req, r.record, genericJob) if err != nil { log.Error(err, "Getting existing workloads") return ctrl.Result{}, err } // 2. handle job is finished. - if jobFinishedCond, jobFinished := jobFinishedCondition(&job); jobFinished { - if !isStandaloneJob || wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { + if condition, finished := genericJob.Finished(); finished { + if wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { return ctrl.Result{}, nil } - condition := generateFinishedCondition(jobFinishedCond) - apimeta.SetStatusCondition(&wl.Status.Conditions, condition) - err := r.client.Status().Update(ctx, wl) - if err != nil { + if err := SetWorkloadCondition(ctx, r.client, wl, condition); err != nil { log.Error(err, "Updating workload status") } - return ctrl.Result{}, err + return ctrl.Result{}, nil } // 3. handle workload is nil. @@ -246,7 +393,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if !isStandaloneJob { return ctrl.Result{}, nil } - err := r.handleJobWithNoWorkload(ctx, &job) + err := r.handleJobWithNoWorkload(ctx, genericJob) if err != nil { log.Error(err, "Handling job with no workload") } @@ -258,7 +405,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R // handle a job when waitForPodsReady is enabled, and it is the main job if r.waitForPodsReady { log.V(5).Info("Handling a job when waitForPodsReady is enabled") - condition := generatePodsReadyCondition(&job, wl) + condition := generatePodsReadyCondition(genericJob, wl) // optimization to avoid sending the update request if the status didn't change if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) { log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status)) @@ -271,11 +418,12 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } // 5. handle job is suspended. - if jobSuspended(&job) { + if genericJob.IsSuspend() { // start the job if the workload has been admitted, and the job is still suspended if wl.Spec.Admission != nil { log.V(2).Info("Job admitted, unsuspending") - err := r.startJob(ctx, wl, &job) + //err := r.startJob(ctx, wl, batchJob) + err := StartJob(ctx, r.client, r.record, genericJob, wl) if err != nil { log.Error(err, "Unsuspending job") } @@ -283,7 +431,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } // update queue name if changed. - q := queueName(&job) + q := genericJob.QueueName() if wl.Spec.QueueName != q { log.V(2).Info("Job changed queues, updating workload") wl.Spec.QueueName = q @@ -301,7 +449,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R if wl.Spec.Admission == nil { // the job must be suspended if the workload is not yet admitted. log.V(2).Info("Running job is not admitted by a cluster queue, suspending") - err := r.stopJob(ctx, wl, &job, "Not admitted by cluster queue") + err := StopJob(ctx, r.client, r.record, genericJob, wl, "Not admitted by cluster queue") if err != nil { log.Error(err, "Suspending job with non admitted workload") } @@ -313,253 +461,29 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, nil } -// podsReady checks if all pods are ready or succeeded -func podsReady(job *batchv1.Job) bool { - ready := pointer.Int32Deref(job.Status.Ready, 0) - return job.Status.Succeeded+ready >= podsCount(&job.Spec) -} - -// stopJob sends updates to suspend the job, reset the startTime so we can update the scheduling directives -// later when unsuspending and resets the nodeSelector to its previous state based on what is available in -// the workload (which should include the original affinities that the job had). -func (r *JobReconciler) stopJob(ctx context.Context, w *kueue.Workload, - job *batchv1.Job, eventMsg string) error { - job.Spec.Suspend = pointer.Bool(true) - if err := r.client.Update(ctx, job); err != nil { - return err - } - r.record.Eventf(job, corev1.EventTypeNormal, "Stopped", eventMsg) - - // Reset start time so we can update the scheduling directives later when unsuspending. - if job.Status.StartTime != nil { - job.Status.StartTime = nil - if err := r.client.Status().Update(ctx, job); err != nil { - return err - } - } - - if w != nil && !equality.Semantic.DeepEqual(job.Spec.Template.Spec.NodeSelector, - w.Spec.PodSets[0].Spec.NodeSelector) { - job.Spec.Template.Spec.NodeSelector = map[string]string{} - for k, v := range w.Spec.PodSets[0].Spec.NodeSelector { - job.Spec.Template.Spec.NodeSelector[k] = v - } - return r.client.Update(ctx, job) - } - - return nil -} - -func (r *JobReconciler) startJob(ctx context.Context, w *kueue.Workload, job *batchv1.Job) error { - log := ctrl.LoggerFrom(ctx) - - if len(w.Spec.PodSets) != 1 { - return fmt.Errorf("one podset must exist, found %d", len(w.Spec.PodSets)) - } - nodeSelector, err := r.getNodeSelectors(ctx, w) - if err != nil { - return err - } - if len(nodeSelector) != 0 { - if job.Spec.Template.Spec.NodeSelector == nil { - job.Spec.Template.Spec.NodeSelector = nodeSelector - } else { - for k, v := range nodeSelector { - job.Spec.Template.Spec.NodeSelector[k] = v - } - } - } else { - log.V(3).Info("no nodeSelectors to inject") - } - - job.Spec.Suspend = pointer.Bool(false) - if err := r.client.Update(ctx, job); err != nil { - return err - } - - r.record.Eventf(job, corev1.EventTypeNormal, "Started", - "Admitted by clusterQueue %v", w.Spec.Admission.ClusterQueue) - return nil -} - -func (r *JobReconciler) getNodeSelectors(ctx context.Context, w *kueue.Workload) (map[string]string, error) { - if len(w.Spec.Admission.PodSetFlavors[0].Flavors) == 0 { - return nil, nil - } - - processedFlvs := sets.NewString() - nodeSelector := map[string]string{} - for _, flvName := range w.Spec.Admission.PodSetFlavors[0].Flavors { - if processedFlvs.Has(flvName) { - continue - } - // Lookup the ResourceFlavors to fetch the node affinity labels to apply on the job. - flv := kueue.ResourceFlavor{} - if err := r.client.Get(ctx, types.NamespacedName{Name: flvName}, &flv); err != nil { - return nil, err - } - for k, v := range flv.NodeSelector { - nodeSelector[k] = v - } - processedFlvs.Insert(flvName) - } - return nodeSelector, nil -} - -func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job *batchv1.Job) error { +func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, genericJob GenericJob) error { log := ctrl.LoggerFrom(ctx) // Wait until there are no active pods. - if job.Status.Active != 0 { + if genericJob.IsActive() { log.V(2).Info("Job is suspended but still has active pods, waiting") return nil } // Create the corresponding workload. - wl, err := ConstructWorkloadFor(ctx, r.client, job, r.scheme) + wl, err := ConstructWorkload(ctx, r.client, r.scheme, genericJob) if err != nil { return err } if err = r.client.Create(ctx, wl); err != nil { return err } - - r.record.Eventf(job, corev1.EventTypeNormal, "CreatedWorkload", + r.record.Eventf(genericJob.Object(), corev1.EventTypeNormal, "CreatedWorkload", "Created Workload: %v", workload.Key(wl)) return nil } -// ensureAtMostOneWorkload finds a matching workload and deletes redundant ones. -func (r *JobReconciler) ensureAtMostOneWorkload(ctx context.Context, job *batchv1.Job) (*kueue.Workload, error) { - log := ctrl.LoggerFrom(ctx) - - // Find a matching workload first if there is one. - var toDelete []*kueue.Workload - var match *kueue.Workload - - if pwName := parentWorkloadName(job); pwName != "" { - pw := kueue.Workload{} - NamespacedName := types.NamespacedName{ - Name: pwName, - Namespace: job.Namespace, - } - if err := r.client.Get(ctx, NamespacedName, &pw); err != nil { - if !apierrors.IsNotFound(err) { - return nil, err - } - log.V(2).Info("job with no matching parent workload", "parent-workload", pwName) - } else { - match = &pw - } - } - - var workloads kueue.WorkloadList - if err := r.client.List(ctx, &workloads, client.InNamespace(job.Namespace), - client.MatchingFields{ownerKey: job.Name}); err != nil { - log.Error(err, "Unable to list child workloads") - return nil, err - } - - for i := range workloads.Items { - w := &workloads.Items[i] - owner := metav1.GetControllerOf(w) - // Indexes don't work in unit tests, so we explicitly check for the - // owner here. - if owner.Name != job.Name { - continue - } - if match == nil && jobAndWorkloadEqual(job, w) { - match = w - } else { - toDelete = append(toDelete, w) - } - } - - // If there is no matching workload and the job is running, suspend it. - if match == nil && !jobSuspended(job) { - log.V(2).Info("job with no matching workload, suspending") - var w *kueue.Workload - if len(workloads.Items) == 1 { - // The job may have been modified and hence the existing workload - // doesn't match the job anymore. All bets are off if there are more - // than one workload... - w = &workloads.Items[0] - } - if err := r.stopJob(ctx, w, job, "No matching Workload"); err != nil { - log.Error(err, "stopping job") - } - } - - // Delete duplicate workload instances. - existedWls := 0 - for i := range toDelete { - err := r.client.Delete(ctx, toDelete[i]) - if err == nil || !apierrors.IsNotFound(err) { - existedWls++ - } - if err != nil && !apierrors.IsNotFound(err) { - log.Error(err, "Failed to delete workload") - } - if err == nil { - r.record.Eventf(job, corev1.EventTypeNormal, "DeletedWorkload", - "Deleted not matching Workload: %v", workload.Key(toDelete[i])) - } - } - - if existedWls != 0 { - if match == nil { - return nil, fmt.Errorf("no matching workload was found, tried deleting %d existing workload(s)", existedWls) - } - return nil, fmt.Errorf("only one workload should exist, found %d", len(workloads.Items)) - } - - return match, nil -} - -func ConstructWorkloadFor(ctx context.Context, client client.Client, - job *batchv1.Job, scheme *runtime.Scheme) (*kueue.Workload, error) { - w := &kueue.Workload{ - ObjectMeta: metav1.ObjectMeta{ - Name: GetWorkloadNameForJob(job.Name), - Namespace: job.Namespace, - }, - Spec: kueue.WorkloadSpec{ - PodSets: []kueue.PodSet{ - { - Spec: *job.Spec.Template.Spec.DeepCopy(), - Count: podsCount(&job.Spec), - }, - }, - QueueName: queueName(job), - }, - } - - // Populate priority from priority class. - priorityClassName, p, err := utilpriority.GetPriorityFromPriorityClass( - ctx, client, job.Spec.Template.Spec.PriorityClassName) - if err != nil { - return nil, err - } - w.Spec.Priority = &p - w.Spec.PriorityClassName = priorityClassName - - if err := ctrl.SetControllerReference(job, w, scheme); err != nil { - return nil, err - } - - return w, nil -} - -func podsCount(jobSpec *batchv1.JobSpec) int32 { - // parallelism is always set as it is otherwise defaulted by k8s to 1 - podsCount := *(jobSpec.Parallelism) - if jobSpec.Completions != nil && *jobSpec.Completions < podsCount { - podsCount = *jobSpec.Completions - } - return podsCount -} - -func generatePodsReadyCondition(job *batchv1.Job, wl *kueue.Workload) metav1.Condition { +func generatePodsReadyCondition(genericJob GenericJob, wl *kueue.Workload) metav1.Condition { conditionStatus := metav1.ConditionFalse message := "Not all pods are ready or succeeded" // Once PodsReady=True it stays as long as the workload remains admitted to @@ -567,7 +491,7 @@ func generatePodsReadyCondition(job *batchv1.Job, wl *kueue.Workload) metav1.Con // Ready to Completed. As pods finish, they transition first into the // uncountedTerminatedPods staging area, before passing to the // succeeded/failed counters. - if wl.Spec.Admission != nil && (podsReady(job) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadPodsReady)) { + if wl.Spec.Admission != nil && (genericJob.PodsReady() || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadPodsReady)) { conditionStatus = metav1.ConditionTrue message = "All pods were ready or succeeded since the workload admission" } @@ -579,51 +503,6 @@ func generatePodsReadyCondition(job *batchv1.Job, wl *kueue.Workload) metav1.Con } } -func generateFinishedCondition(jobStatus batchv1.JobConditionType) metav1.Condition { - message := "Job finished successfully" - if jobStatus == batchv1.JobFailed { - message = "Job failed" - } - return metav1.Condition{ - Type: kueue.WorkloadFinished, - Status: metav1.ConditionTrue, - Reason: "JobFinished", - Message: message, - } -} - -// From https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/utils.go -func jobFinishedCondition(j *batchv1.Job) (batchv1.JobConditionType, bool) { - for _, c := range j.Status.Conditions { - if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { - return c.Type, true - } - } - return "", false -} - -func jobSuspended(j *batchv1.Job) bool { - return j.Spec.Suspend != nil && *j.Spec.Suspend -} - -func jobAndWorkloadEqual(job *batchv1.Job, wl *kueue.Workload) bool { - if len(wl.Spec.PodSets) != 1 { - return false - } - if *job.Spec.Parallelism != wl.Spec.PodSets[0].Count { - return false - } - - // nodeSelector may change, hence we are not checking for - // equality of the whole job.Spec.Template.Spec. - if !equality.Semantic.DeepEqual(job.Spec.Template.Spec.InitContainers, - wl.Spec.PodSets[0].Spec.InitContainers) { - return false - } - return equality.Semantic.DeepEqual(job.Spec.Template.Spec.Containers, - wl.Spec.PodSets[0].Spec.Containers) -} - func queueName(job *batchv1.Job) string { return job.Annotations[constants.QueueAnnotation] } diff --git a/pkg/controller/workload/job/job_controller_test.go b/pkg/controller/workload/job/job_controller_test.go index 6db7205155..bac1bbed87 100644 --- a/pkg/controller/workload/job/job_controller_test.go +++ b/pkg/controller/workload/job/job_controller_test.go @@ -26,11 +26,11 @@ import ( func TestPodsReady(t *testing.T) { testcases := map[string]struct { - job *batchv1.Job + job batchv1.Job want bool }{ "parallelism = completions; no progress": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(3), @@ -40,7 +40,7 @@ func TestPodsReady(t *testing.T) { want: false, }, "parallelism = completions; not enough progress": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(3), @@ -53,7 +53,7 @@ func TestPodsReady(t *testing.T) { want: false, }, "parallelism = completions; all ready": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(3), @@ -66,7 +66,7 @@ func TestPodsReady(t *testing.T) { want: true, }, "parallelism = completions; some ready, some succeeded": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(3), @@ -79,7 +79,7 @@ func TestPodsReady(t *testing.T) { want: true, }, "parallelism = completions; all succeeded": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(3), @@ -91,7 +91,7 @@ func TestPodsReady(t *testing.T) { want: true, }, "parallelism < completions; reaching parallelism is enough": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(2), Completions: pointer.Int32(3), @@ -103,7 +103,7 @@ func TestPodsReady(t *testing.T) { want: true, }, "parallelism > completions; reaching completions is enough": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), Completions: pointer.Int32(2), @@ -115,7 +115,7 @@ func TestPodsReady(t *testing.T) { want: true, }, "parallelism specified only; not enough progress": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), }, @@ -126,7 +126,7 @@ func TestPodsReady(t *testing.T) { want: false, }, "parallelism specified only; all ready": { - job: &batchv1.Job{ + job: batchv1.Job{ Spec: batchv1.JobSpec{ Parallelism: pointer.Int32(3), }, @@ -140,7 +140,8 @@ func TestPodsReady(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - got := podsReady(tc.job) + batchJob := &BatchJob{tc.job} + got := batchJob.PodsReady() if tc.want != got { t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) } diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index 7d99c13fae..8278041344 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -76,7 +76,8 @@ var _ = ginkgo.Describe("Job controller", func() { job := testing.MakeJob(jobName, jobNamespace).PriorityClass(priorityClassName).Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: jobNamespace} - createdJob := &batchv1.Job{} + batchJob := workloadjob.BatchJob{} + createdJob := &batchJob.Job gomega.Eventually(func() bool { if err := k8sClient.Get(ctx, lookupKey, createdJob); err != nil { return false @@ -109,7 +110,7 @@ var _ = ginkgo.Describe("Job controller", func() { }, util.Timeout, util.Interval).Should(gomega.BeTrue()) ginkgo.By("checking a second non-matching workload is deleted") - secondWl, _ := workloadjob.ConstructWorkloadFor(ctx, k8sClient, createdJob, scheme.Scheme) + secondWl, _ := workloadjob.ConstructWorkload(ctx, k8sClient, scheme.Scheme, &batchJob) secondWl.Name = workloadjob.GetWorkloadNameForJob("second-workload") secondWl.Spec.PodSets[0].Count = parallelism + 1 gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed())