Skip to content

Commit

Permalink
[test/int] Deferred preemption
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Sep 15, 2023
1 parent a4d2292 commit f7b8cfb
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 2 deletions.
157 changes: 155 additions & 2 deletions test/integration/scheduler/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/test/util"
Expand Down Expand Up @@ -59,11 +61,19 @@ var _ = ginkgo.Describe("Preemption", func() {

ginkgo.Context("In a single ClusterQueue", func() {
var (
cq *kueue.ClusterQueue
q *kueue.LocalQueue
cq *kueue.ClusterQueue
q *kueue.LocalQueue
checkAnytime *kueue.AdmissionCheck
checkOnDemand *kueue.AdmissionCheck
)

ginkgo.BeforeEach(func() {
checkAnytime = testing.MakeAdmissionCheck("anytime").Policy(kueue.Anytime).Obj()
gomega.Expect(k8sClient.Create(ctx, checkAnytime)).To(gomega.Succeed())

checkOnDemand = testing.MakeAdmissionCheck("on-demand").Policy(kueue.AfterCheckPassedOrOnDemand).Obj()
gomega.Expect(k8sClient.Create(ctx, checkOnDemand)).To(gomega.Succeed())

cq = testing.MakeClusterQueue("cq").
ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "4").Obj()).
Preemption(kueue.ClusterQueuePreemption{
Expand All @@ -79,6 +89,8 @@ var _ = ginkgo.Describe("Preemption", func() {
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true)
util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, checkOnDemand, true)
util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, checkAnytime, true)
})

ginkgo.It("Should preempt Workloads with lower priority when there is not enough quota", func() {
Expand Down Expand Up @@ -181,6 +193,147 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, wl3)
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl4)
})

ginkgo.It("Should preempt immediately when anytime check is used", func() {
ginkgo.By("Adding the check to the ClusterQueue", func() {
cqKey := client.ObjectKeyFromObject(cq)
gomega.Eventually(func() error {
updatedQueue := &kueue.ClusterQueue{}
err := k8sClient.Get(ctx, cqKey, updatedQueue)
if err != nil {
return err
}
updatedQueue.Spec.AdmissionChecks = []string{checkAnytime.Name}
return k8sClient.Update(ctx, updatedQueue)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

lowWl := testing.MakeWorkload("low-wl", ns.Name).
Queue(q.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "4").
Obj()

ginkgo.By("Creating the first workload wait its admission", func() {
gomega.Expect(k8sClient.Create(ctx, lowWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, lowWl)
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, lowWl, checkAnytime.Name, metav1.ConditionTrue, kueue.CheckStateReady)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, lowWl)
})

highWl := testing.MakeWorkload("high-wl", ns.Name).
Queue(q.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
ginkgo.By("Creating a high priority Workload", func() {
gomega.Expect(k8sClient.Create(ctx, highWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl)
util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl)
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, highWl, checkAnytime.Name, metav1.ConditionTrue, kueue.CheckStateReady)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, highWl)
})
})

ginkgo.It("Should not preempt immediately when on-demand check is used, after succeed", func() {
ginkgo.By("Adding the check to the ClusterQueue", func() {
cqKey := client.ObjectKeyFromObject(cq)
gomega.Eventually(func() error {
updatedQueue := &kueue.ClusterQueue{}
err := k8sClient.Get(ctx, cqKey, updatedQueue)
if err != nil {
return err
}
updatedQueue.Spec.AdmissionChecks = []string{checkOnDemand.Name}
return k8sClient.Update(ctx, updatedQueue)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

lowWl := testing.MakeWorkload("low-wl", ns.Name).
Queue(q.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "4").
Obj()

ginkgo.By("Creating the first workload wait its admission", func() {
gomega.Expect(k8sClient.Create(ctx, lowWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, lowWl)
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, lowWl, checkOnDemand.Name, metav1.ConditionTrue, kueue.CheckStateReady)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, lowWl)
})

highWl := testing.MakeWorkload("high-wl", ns.Name).
Queue(q.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
ginkgo.By("Creating a high priority Workload", func() {
gomega.Expect(k8sClient.Create(ctx, highWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl)
gomega.Consistently(func() bool {
readWl := &kueue.Workload{}
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(lowWl), readWl)
if err != nil {
return false
}
return apimeta.IsStatusConditionTrue(readWl.Status.Conditions, kueue.WorkloadAdmitted)

}, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue())
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, highWl, checkOnDemand.Name, metav1.ConditionTrue, kueue.CheckStateReady)
util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, highWl)
})
})

ginkgo.It("Should not preempt immediately when on-demand check is used, after demand", func() {
ginkgo.By("Adding the check to the ClusterQueue", func() {
cqKey := client.ObjectKeyFromObject(cq)
gomega.Eventually(func() error {
updatedQueue := &kueue.ClusterQueue{}
err := k8sClient.Get(ctx, cqKey, updatedQueue)
if err != nil {
return err
}
updatedQueue.Spec.AdmissionChecks = []string{checkOnDemand.Name}
return k8sClient.Update(ctx, updatedQueue)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

lowWl := testing.MakeWorkload("low-wl", ns.Name).
Queue(q.Name).
Priority(lowPriority).
Request(corev1.ResourceCPU, "4").
Obj()

ginkgo.By("Creating the first workload wait its admission", func() {
gomega.Expect(k8sClient.Create(ctx, lowWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, lowWl)
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, lowWl, checkOnDemand.Name, metav1.ConditionTrue, kueue.CheckStateReady)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, lowWl)
})

highWl := testing.MakeWorkload("high-wl", ns.Name).
Queue(q.Name).
Priority(highPriority).
Request(corev1.ResourceCPU, "4").
Obj()
ginkgo.By("Creating a high priority Workload", func() {
gomega.Expect(k8sClient.Create(ctx, highWl)).To(gomega.Succeed())
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, cq.Name, highWl)
gomega.Consistently(func() bool {
readWl := &kueue.Workload{}
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(lowWl), readWl)
if err != nil {
return false
}
return apimeta.IsStatusConditionTrue(readWl.Status.Conditions, kueue.WorkloadAdmitted)

}, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue())
util.SetWorkloadsAdmissionCkeck(ctx, k8sClient, highWl, checkOnDemand.Name, metav1.ConditionUnknown, kueue.CheckStatePreemptionRequired)
util.FinishEvictionForWorkloads(ctx, k8sClient, lowWl)
// the admission will be happen later on
})
})
})

ginkgo.Context("In a ClusterQueue that is part of a cohort", func() {
Expand Down
43 changes: 43 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,34 @@ func ExpectWorkloadsToHaveQuotaReservation(ctx context.Context, k8sClient client
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were admitted")
}

func ExpectWorkloadsToBeEvicted(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
found := 0
var updatedWorkload kueue.Workload
for _, wl := range wls {
gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
if apimeta.IsStatusConditionTrue(updatedWorkload.Status.Conditions, kueue.WorkloadEvicted) {
found++
}
}
return found
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were evicted")
}

func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, cqName string, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
admitted := 0
var updatedWorkload kueue.Workload
for _, wl := range wls {
gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
if workload.IsAdmitted(&updatedWorkload) && string(updatedWorkload.Status.Admission.ClusterQueue) == cqName {
admitted++
}
}
return admitted
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads were admitted")
}

func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
pending := 0
Expand Down Expand Up @@ -388,5 +416,20 @@ func FinishEvictionForWorkloads(ctx context.Context, k8sClient client.Client, wl
return nil
}, Timeout, Interval).Should(gomega.Succeed(), fmt.Sprintf("Unable to unset quota reservation for %q", key))
}
}

func SetWorkloadsAdmissionCkeck(ctx context.Context, k8sClient client.Client, wl *kueue.Workload, check string, status metav1.ConditionStatus, reason string) {
var updatedWorkload kueue.Workload
gomega.EventuallyWithOffset(1, func() error {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)
if err != nil {
return err
}
currentCheck := apimeta.FindStatusCondition(updatedWorkload.Status.AdmissionChecks, check)
gomega.ExpectWithOffset(1, currentCheck).NotTo(gomega.BeNil(), "the check was not found")
currentCheck.Status = status
currentCheck.Reason = reason
return k8sClient.Status().Update(ctx, &updatedWorkload)
}, Timeout, Interval).Should(gomega.Succeed())

}

0 comments on commit f7b8cfb

Please sign in to comment.