Skip to content

Commit

Permalink
Reconcile jobs waiting for prebuilt workload
Browse files Browse the repository at this point in the history
  • Loading branch information
IrvingMg committed Sep 26, 2024
1 parent 4042844 commit 5045026
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ const (

// ProvReqAnnotationPrefix is the prefix for annotations that should be pass to ProvisioningRequest as Parameters.
ProvReqAnnotationPrefix = "provreq.kueue.x-k8s.io/"

PrebuiltWorkloadIndexName = "metadata.prebuiltworkload"
)
10 changes: 10 additions & 0 deletions pkg/controller/jobframework/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
)

func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, gvk schema.GroupVersionKind) error {
Expand All @@ -42,3 +43,12 @@ func SetupWorkloadOwnerIndex(ctx context.Context, indexer client.FieldIndexer, g
return owners
})
}

func SetupPrebuiltWorkloadIndex(ctx context.Context, indexer client.FieldIndexer, objectType client.Object) error {
return indexer.IndexField(ctx, objectType, constants.PrebuiltWorkloadIndexName, func(o client.Object) []string {
if pwl, found := o.GetLabels()[constants.PrebuiltWorkloadLabel]; found {
return []string{pwl}
}
return nil
})
}
32 changes: 32 additions & 0 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/podset"
Expand Down Expand Up @@ -97,6 +98,7 @@ type parentWorkloadHandler struct {

func (h *parentWorkloadHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.queueReconcileForChildJob(ctx, e.Object, q)
h.queueReconcileJobsWaitingForPrebuiltWorkload(ctx, e.Object, q)
}

func (h *parentWorkloadHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
Expand Down Expand Up @@ -139,6 +141,31 @@ func (h *parentWorkloadHandler) queueReconcileForChildJob(ctx context.Context, o
}
}

func (h *parentWorkloadHandler) queueReconcileJobsWaitingForPrebuiltWorkload(ctx context.Context, object client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
w, ok := object.(*kueue.Workload)
if !ok || len(w.OwnerReferences) > 0 {
return
}

log := ctrl.LoggerFrom(ctx).WithValues("workload", klog.KObj(w))
ctx = ctrl.LoggerInto(ctx, log)
log.V(5).Info("Queueing reconcile for prebuilt workload waiting jobs")
var waitingJobs batchv1.JobList
if err := h.client.List(ctx, &waitingJobs, client.InNamespace(w.Namespace), client.MatchingFields{constants.PrebuiltWorkloadIndexName: w.Name}); err != nil {
log.Error(err, "Unable to list waiting jobs")
return
}
for _, waitingJob := range waitingJobs.Items {
log.V(5).Info("Queueing reconcile for waiting job", "job", klog.KObj(&waitingJob))
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: waitingJob.Name,
Namespace: w.Namespace,
},
})
}
}

type Job batchv1.Job

var _ jobframework.GenericJob = (*Job)(nil)
Expand Down Expand Up @@ -344,6 +371,11 @@ func SetupIndexes(ctx context.Context, fieldIndexer client.FieldIndexer) error {
if err := fieldIndexer.IndexField(ctx, &batchv1.Job{}, indexer.OwnerReferenceUID, indexer.IndexOwnerUID); err != nil {
return err
}

if err := jobframework.SetupPrebuiltWorkloadIndex(ctx, fieldIndexer, &batchv1.Job{}); err != nil {
return err
}

return jobframework.SetupWorkloadOwnerIndex(ctx, fieldIndexer, gvk)
}

Expand Down
46 changes: 46 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,52 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should reconcile job when the workload is created later", func() {
container := corev1.Container{
Name: "c",
Image: "pause",
}
testingjob.SetContainerDefaults(&container)

job := testingjob.MakeJob("job", ns.Name).
Queue("main").
Label(constants.PrebuiltWorkloadLabel, "wl").
Containers(*container.DeepCopy()).
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).To(gomega.Succeed())
ginkgo.By("Checking the job gets suspended", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

wl := testing.MakeWorkload("wl", ns.Name).
PodSets(*testing.MakePodSet("main", 1).
Containers(*container.DeepCopy()).
Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdWl := kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed())
g.Expect(createdWl.OwnerReferences).To(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Check the job gets the ownership of the workload", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdWl := kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed())
g.Expect(createdWl.OwnerReferences).To(gomega.ContainElement(
gomega.BeComparableTo(metav1.OwnerReference{
Name: job.Name,
UID: job.UID,
}, cmpopts.IgnoreFields(metav1.OwnerReference{}, "APIVersion", "Kind", "Controller", "BlockOwnerDeletion"))))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should take the ownership of the workload and continue the usual execution", func() {
container := corev1.Container{
Name: "c",
Expand Down

0 comments on commit 5045026

Please sign in to comment.