diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index 81fabdcbba..afd5030f21 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -21,6 +21,10 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + batchv1 "k8s.io/api/batch/v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ = Describe("Job E2E Test", func() { @@ -286,4 +290,101 @@ var _ = Describe("Job E2E Test", func() { Expect(err).NotTo(HaveOccurred()) Expect(evicted).NotTo(BeTrue()) }) + + It("Schedule v1.Job type using Volcano scheduler", func() { + context := initTestContext() + defer cleanupTestContext(context) + namespace := "test" + parallel := int32(2) + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: ¶llel, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + SchedulerName: schedulerName, + Containers: []v1.Container{ + { + Name: "test-container", + Image: "nginx", + }, + }, + }, + }, + }, + } + + //create job + job, err := context.kubeclient.BatchV1().Jobs(namespace).Create(job) + Expect(err).NotTo(HaveOccurred()) + + err = waitJobPhaseReady(context, job) + Expect(err).NotTo(HaveOccurred()) + }) + + It("Schedule v1.Job type using Volcano scheduler with error case", func() { + context := initTestContext() + defer cleanupTestContext(context) + namespace := "test" + parallel := int32(2) + + errorJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: ¶llel, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + SchedulerName: schedulerName, + Containers: []v1.Container{ + { + Name: "test-container", + Image: "nginx", + }, + }, + }, + }, + }, + } + + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job1", + Namespace: namespace, + }, + Spec: batchv1.JobSpec{ + Parallelism: ¶llel, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + SchedulerName: schedulerName, + Containers: []v1.Container{ + { + Name: "test-container", + Image: "nginx", + }, + }, + }, + }, + }, + } + + //create error job + errorJob, err := context.kubeclient.BatchV1().Jobs(namespace).Create(errorJob) + Expect(err).To(HaveOccurred()) + + //create job + job, err = context.kubeclient.BatchV1().Jobs(namespace).Create(job) + Expect(err).NotTo(HaveOccurred()) + + err = waitJobPhaseReady(context, job) + Expect(err).NotTo(HaveOccurred()) + }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 6f3fa130f2..a486f8f161 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -27,6 +27,7 @@ import ( . "github.com/onsi/gomega" appv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" "k8s.io/api/core/v1" schedv1 "k8s.io/api/scheduling/v1beta1" "k8s.io/apimachinery/pkg/api/errors" @@ -63,6 +64,7 @@ const ( nodeFieldSelectorKeyNodeName = api.ObjectNameField defaultBusyBoxImage = "busybox:1.24" defaultMPIImage = "volcanosh/example-mpi:0.0.1" + schedulerName = "volcano" defaultNamespace = "test" defaultQueue1 = "q1" @@ -659,6 +661,26 @@ func waitJobPhaseExpect(ctx *context, job *vkv1.Job, state vkv1.JobPhase) error return err } +func waitJobPhaseReady(ctx *context, job *batchv1.Job) error { + var additionalError error + + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + job, err := ctx.kubeclient.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + expected := job.Status.Active > 0 + if !expected { + additionalError = fmt.Errorf("expected job '%s' active pod to be greater than 0, actual got %d", job.Name, job.Status.Active) + } + return expected, nil + }) + + if err != nil && strings.Contains(err.Error(), timeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + + return err +} + func waitJobUnschedulable(ctx *context, job *vkv1.Job) error { now := time.Now() return jobUnschedulable(ctx, job, now)