diff --git a/test/e2e/reclaim.go b/test/e2e/reclaim.go index 67b4c947db..671d8798de 100644 --- a/test/e2e/reclaim.go +++ b/test/e2e/reclaim.go @@ -6,18 +6,252 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" schedulingv1beta1 "volcano.sh/volcano/pkg/apis/scheduling/v1beta1" ) -var _ = Describe("Queue E2E Test", func() { - It("Reclaim: New queue with job created no reclaim when resource is enough", func() { - q1 := "default" +var _ = Describe("Reclaim E2E Test", func() { + + CreateReclaimJob := func(ctx *testContext, req v1.ResourceList, name string, queue string, pri string) (*batchv1alpha1.Job, error) { + job := &jobSpec{ + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: req, + min: 1, + rep: 1, + }, + }, + name: name, + queue: queue, + } + if pri != "" { + job.pri = pri + } + batchJob, err := createJobInner(ctx, job) + if err != nil { + return nil, err + } + err = waitTasksReady(ctx, batchJob, 1) + return batchJob, err + } + + WaitQueueStatus := func(ctx *testContext, status string, num int32, queue string) error { + err := waitQueueStatus(func() (bool, error) { + queue, err := ctx.vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), queue, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", queue) + switch status { + case "Running": + return queue.Status.Running == num, nil + case "Open": + return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil + case "Pending": + return queue.Status.Pending == num, nil + default: + return false, nil + } + }) + return err + } + + It("Reclaim Case 1: New queue with job created no reclaim when resource is enough", func() { + q1 := defaultQueue q2 := "reclaim-q2" ctx := initTestContext(options{ queues: []string{q2}, nodesNumLimit: 4, nodesResourceLimit: CPU1Mem1, + }) + + defer cleanupTestContext(ctx) + + By("Setup initial jobs") + + _, err := CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j1", q1, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j2", q2, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") + + By("Create new comming queue and job") + q3 := "reclaim-q3" + ctx.queues = append(ctx.queues, q3) + createQueues(ctx) + + err = WaitQueueStatus(ctx, "Open", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue open") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j3", q3, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job3 failed") + + By("Make sure all job running") + + err = WaitQueueStatus(ctx, "Running", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q2) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q3) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + }) + + It("Reclaim Case 3: New queue with job created no reclaim when job.podGroup.Status.Phase pending", func() { + q1 := defaultQueue + q2 := "reclaim-q2" + j1 := "reclaim-j1" + j2 := "reclaim-j2" + j3 := "reclaim-j3" + + ctx := initTestContext(options{ + queues: []string{q2}, + nodesNumLimit: 3, + nodesResourceLimit: CPU1Mem1, + priorityClasses: map[string]int32{ + "low-priority": 10, + "high-priority": 10000, + }, + }) + + defer cleanupTestContext(ctx) + + By("Setup initial jobs") + + _, err := CreateReclaimJob(ctx, CPU1Mem1, j1, q1, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, j2, q2, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") + + By("Create new comming queue and job") + q3 := "reclaim-q3" + ctx.queues = append(ctx.queues, q3) + createQueues(ctx) + + err = WaitQueueStatus(ctx, "Open", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue open") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, j3, q3, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job3 failed") + + // delete pod of job3 to make sure reclaim-j3 podgroup is pending + listOptions := metav1.ListOptions{ + LabelSelector: labels.Set(map[string]string{batchv1alpha1.JobNameKey: j3}).String(), + } + + job3pods, err := ctx.kubeclient.CoreV1().Pods(ctx.namespace).List(context.TODO(), listOptions) + Expect(err).NotTo(HaveOccurred(), "Get %s pod failed", j3) + + By("Make sure q1 q2 with job running in it.") + err = WaitQueueStatus(ctx, "Running", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q2) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + for _, pod := range job3pods.Items { + fmt.Println(pod.Name) + err = ctx.kubeclient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred(), "Failed to delete pod %s", pod.Name) + } + + By("Q3 pending when we delete it.") + err = WaitQueueStatus(ctx, "Pending", 1, q3) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue pending") + }) + + It("Reclaim Case 4: New queue with job created no reclaim when new queue is not created", func() { + q1 := defaultQueue + q2 := "reclaim-q2" + ctx := initTestContext(options{ + queues: []string{q2}, + nodesNumLimit: 3, + nodesResourceLimit: CPU1Mem1, + priorityClasses: map[string]int32{ + "low-priority": 10, + "high-priority": 10000, + }, + }) + + defer cleanupTestContext(ctx) + + By("Setup initial jobs") + + _, err := CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j1", q1, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j2", q2, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") + + By("Create new comming job") + q3 := "reclaim-q3" + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j3", q3, "") + Expect(err).Should(HaveOccurred(), "job3 create failed when queue3 is not created") + + By("Make sure all job running") + + err = WaitQueueStatus(ctx, "Running", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q2) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + }) + + // As we agreed, this is not intended behavior, actually, it is a bug. + It("Reclaim Case 5: New queue with job created no reclaim when job or task is low-priority", func() { + q1 := defaultQueue + q2 := "reclaim-q2" + ctx := initTestContext(options{ + queues: []string{q2}, + nodesNumLimit: 3, + nodesResourceLimit: CPU1Mem1, + priorityClasses: map[string]int32{ + "low-priority": 10, + "high-priority": 10000, + }, + }) + + defer cleanupTestContext(ctx) + + By("Setup initial jobs") + + _, err := CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j1", q1, "high-priority") + Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j2", q2, "high-priority") + Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") + + By("Create new comming queue and job") + q3 := "reclaim-q3" + + err = WaitQueueStatus(ctx, "Open", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue open") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j3", q3, "low-priority") + Expect(err).Should(HaveOccurred(), "job3 create failed when queue3 is not created") + + By("Make sure all job running") + + err = WaitQueueStatus(ctx, "Running", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q2) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + }) + + It("Reclaim Case 6: New queue with job created no reclaim when overused", func() { + q1 := defaultQueue + q2 := "reclaim-q2" + q3 := "reclaim-q3" + ctx := initTestContext(options{ + queues: []string{q2, q3}, + nodesNumLimit: 3, + nodesResourceLimit: CPU1Mem1, priorityClasses: map[string]int32{ "low-priority": 10, "high-priority": 10000, @@ -27,6 +261,17 @@ var _ = Describe("Queue E2E Test", func() { defer cleanupTestContext(ctx) By("Setup initial jobs") + + _, err := CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j1", q1, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j2", q2, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") + + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j3", q3, "") + Expect(err).NotTo(HaveOccurred(), "Wait for job3 failed") + + By("Create job4 to testing overused cases.") job := &jobSpec{ tasks: []taskSpec{ { @@ -36,22 +281,48 @@ var _ = Describe("Queue E2E Test", func() { rep: 1, }, }, + name: "reclaim-j4", + queue: q3, } - job.name = "reclaim-j1" - job.queue = q1 - job.pri = "low-priority" - job1 := createJob(ctx, job) + createJob(ctx, job) + + By("Make sure all job running") - job.name = "reclaim-j2" - job.queue = q2 - job.pri = "low-priority" - job2 := createJob(ctx, job) + err = WaitQueueStatus(ctx, "Running", 1, q1) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q2) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Running", 1, q3) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + + err = WaitQueueStatus(ctx, "Pending", 1, q3) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue pending") + }) + + It("Reclaim Case 8: New queue with job created no reclaim when task resources less than reclaimable resource", func() { + q1 := defaultQueue + q2 := "reclaim-q2" + ctx := initTestContext(options{ + queues: []string{q2}, + nodesNumLimit: 3, + nodesResourceLimit: CPU1Mem1, + priorityClasses: map[string]int32{ + "low-priority": 10, + "high-priority": 10000, + }, + }) + + defer cleanupTestContext(ctx) + + By("Setup initial jobs") - err := waitTasksReady(ctx, job1, 1) + _, err := CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j1", q1, "") Expect(err).NotTo(HaveOccurred(), "Wait for job1 failed") - err = waitTasksReady(ctx, job2, 1) + _, err = CreateReclaimJob(ctx, CPU1Mem1, "reclaim-j2", q2, "") Expect(err).NotTo(HaveOccurred(), "Wait for job2 failed") By("Create new comming queue and job") @@ -59,40 +330,33 @@ var _ = Describe("Queue E2E Test", func() { ctx.queues = append(ctx.queues, q3) createQueues(ctx) - err = waitQueueStatus(func() (bool, error) { - queue, err := ctx.vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.State == schedulingv1beta1.QueueStateOpen, nil - }) + err = WaitQueueStatus(ctx, "Open", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue open") - job.name = "reclaim-j3" - job.queue = q3 - job.pri = "low-priority" + job := &jobSpec{ + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: CPU4Mem4, + min: 1, + rep: 1, + }, + }, + name: "reclaim-j4", + queue: q3, + } createJob(ctx, job) By("Make sure all job running") - err = waitQueueStatus(func() (bool, error) { - queue, err := ctx.vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q1, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q1) - return queue.Status.Running == 1, nil - }) - Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = waitQueueStatus(func() (bool, error) { - queue, err := ctx.vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q2, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q2) - return queue.Status.Running == 1, nil - }) + err = WaitQueueStatus(ctx, "Running", 1, q1) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") - err = waitQueueStatus(func() (bool, error) { - queue, err := ctx.vcclient.SchedulingV1beta1().Queues().Get(context.TODO(), q3, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred(), "Get queue %s failed", q3) - return queue.Status.Running == 1, nil - }) + err = WaitQueueStatus(ctx, "Running", 1, q2) Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") + err = WaitQueueStatus(ctx, "Pending", 1, q3) + Expect(err).NotTo(HaveOccurred(), "Error waiting for queue running") }) It("Reclaim", func() { diff --git a/test/e2e/util.go b/test/e2e/util.go index b70c89f58b..acf89ecfdb 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -58,6 +58,7 @@ var ( halfCPU = v1.ResourceList{"cpu": resource.MustParse("500m")} CPU1Mem1 = v1.ResourceList{"cpu": resource.MustParse("1000m"), "memory": resource.MustParse("1024Mi")} CPU2Mem2 = v1.ResourceList{"cpu": resource.MustParse("2000m"), "memory": resource.MustParse("2048Mi")} + CPU4Mem4 = v1.ResourceList{"cpu": resource.MustParse("4000m"), "memory": resource.MustParse("4096Mi")} ) const ( @@ -73,6 +74,7 @@ const ( schedulerName = "volcano" executeAction = "ExecuteAction" defaultTFImage = "volcanosh/dist-mnist-tf-example:0.0.1" + defaultQueue = "default" ) func cpuResource(request string) v1.ResourceList { @@ -930,7 +932,7 @@ func satisifyMinNodesRequirements(ctx *testContext, num int) bool { func setPlaceHolderForSchedulerTesting(ctx *testContext, req v1.ResourceList, reqNum int) (bool, error) { if !satisifyMinNodesRequirements(ctx, reqNum) { - return false, lagencyerror.New("Failed to setup environment, you need to have at least " + strconv.Itoa(len(req)) + " worker node.") + return false, lagencyerror.New("Failed to setup environment, you need to have at least " + strconv.Itoa(reqNum) + " worker node.") } nodes, err := ctx.kubeclient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) @@ -1037,13 +1039,17 @@ func createPlaceHolder(ctx *testContext, phr v1.ResourceList, nodeName string) e } func deletePlaceHolder(ctx *testContext) { - podList, err := ctx.kubeclient.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) + + listOptions := metav1.ListOptions{ + LabelSelector: labels.Set(map[string]string{"role": "placeholder"}).String(), + } + podList, err := ctx.kubeclient.CoreV1().Pods("default").List(context.TODO(), listOptions) + + Expect(err).NotTo(HaveOccurred(), "Failed to get pod list") + for _, pod := range podList.Items { - if pod.Labels["role"] == "placeholder" { - err := ctx.kubeclient.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) - } + err := ctx.kubeclient.CoreV1().Pods("default").Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) } }