Skip to content

Commit

Permalink
Wait for the prebuild workload.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed Oct 17, 2024
1 parent 587a6fc commit a59b9af
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
17 changes: 11 additions & 6 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -63,10 +64,11 @@ const (
)

var (
ErrUnknownWorkloadOwner = errors.New("workload owner is unknown")
ErrWorkloadOwnerNotFound = errors.New("workload owner not found")
ErrNoMatchingWorkloads = errors.New("no matching workloads")
ErrExtraWorkloads = errors.New("extra workloads")
ErrUnknownWorkloadOwner = errors.New("workload owner is unknown")
ErrWorkloadOwnerNotFound = errors.New("workload owner not found")
ErrNoMatchingWorkloads = errors.New("no matching workloads")
ErrExtraWorkloads = errors.New("extra workloads")
ErrPrebuildWorkloadNotFound = errors.New("prebuild workload not found")
)

// JobReconciler reconciles a GenericJob object
Expand Down Expand Up @@ -386,6 +388,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
log.V(3).Info("Handling job with no workload found an existing workload")
return ctrl.Result{Requeue: true}, nil
}
if errors.Is(err, ErrPrebuildWorkloadNotFound) {
log.V(2).Info("Prebuilt workload not found. Requeue in 1 second.")
return ctrl.Result{RequeueAfter: time.Second}, nil
}
if IsUnretryableError(err) {
log.V(3).Info("Handling job with no workload", "unretryableError", err)
} else {
Expand Down Expand Up @@ -1011,8 +1017,7 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
}

if usePrebuiltWorkload {
log.V(2).Info("Skip workload creation for job with prebuilt workload")
return nil
return ErrPrebuildWorkloadNotFound
}

// Create the corresponding workload.
Expand Down
45 changes: 43 additions & 2 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package job

import (
"fmt"
"maps"

"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
Expand All @@ -31,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"maps"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -420,6 +419,48 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should reconcile job when the workload is created later", func() {
container := corev1.Container{
Name: "c",
Image: "pause",
}
testingjob.SetContainerDefaults(&container)

job := testingjob.MakeJob("job", ns.Name).
Queue("main").
Label(constants.PrebuiltWorkloadLabel, "wl").
Containers(*container.DeepCopy()).
Obj()

gomega.Expect(k8sClient.Create(ctx, job)).To(gomega.Succeed())
ginkgo.By("Checking the job gets suspended", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdJob := batchv1.Job{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed())
g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

wl := testing.MakeWorkload("wl", ns.Name).
PodSets(*testing.MakePodSet("main", 1).
Containers(*container.DeepCopy()).
Obj()).
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())

ginkgo.By("Check the job gets the ownership of the workload", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdWl := kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed())
g.Expect(createdWl.OwnerReferences).To(gomega.ContainElement(
gomega.BeComparableTo(metav1.OwnerReference{
Name: job.Name,
UID: job.UID,
}, cmpopts.IgnoreFields(metav1.OwnerReference{}, "APIVersion", "Kind", "Controller", "BlockOwnerDeletion"))))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should take the ownership of the workload and continue the usual execution", func() {
container := corev1.Container{
Name: "c",
Expand Down

0 comments on commit a59b9af

Please sign in to comment.