diff --git a/pkg/admission/admission_controller.go b/pkg/admission/admission_controller.go index 22a370767d..8002345952 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/admission_controller.go @@ -232,25 +232,30 @@ func getValidActions() []batchv1alpha1.Action { return actions } -// ValidateIO validate IO configuration -func ValidateIO(volumes []batchv1alpha1.VolumeSpec) (string, bool) { +// validateIO validates IO configuration +func validateIO(volumes []batchv1alpha1.VolumeSpec) error { volumeMap := map[string]bool{} for _, volume := range volumes { if len(volume.MountPath) == 0 { - return " mountPath is required;", true + return fmt.Errorf(" mountPath is required;") } if _, found := volumeMap[volume.MountPath]; found { - return fmt.Sprintf(" duplicated mountPath: %s;", volume.MountPath), true + return fmt.Errorf(" duplicated mountPath: %s;", volume.MountPath) + } + if volume.VolumeClaim == nil && volume.VolumeClaimName == "" { + return fmt.Errorf(" either VolumeClaim or VolumeClaimName must be specified;") } if len(volume.VolumeClaimName) != 0 { if volume.VolumeClaim != nil { - return fmt.Sprintf("Confilct: If you want to use an existing PVC, just specify VolumeClaimName. If you want to create a new PVC, you do not need to specify VolumeClaimName."), true + return fmt.Errorf("confilct: If you want to use an existing PVC, just specify VolumeClaimName." + + "If you want to create a new PVC, you do not need to specify VolumeClaimName") } if errMsgs := validation.ValidatePersistentVolumeName(volume.VolumeClaimName, false); len(errMsgs) > 0 { - return fmt.Sprintf("Illegal VolumeClaimName %s : %v", volume.VolumeClaimName, errMsgs), true + return fmt.Errorf("invalid VolumeClaimName %s : %v", volume.VolumeClaimName, errMsgs) } } + volumeMap[volume.MountPath] = true } - return "", false + return nil } diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 1bfc9784ab..7136a42235 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -147,8 +147,8 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } } - if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok { - msg = msg + validateInfo + if err := validateIO(job.Spec.Volumes); err != nil { + msg = msg + err.Error() } // Check whether Queue already present or not diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/admit_job_test.go index 88896d8ff3..f1e8243925 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/admit_job_test.go @@ -820,6 +820,57 @@ func TestValidateExecution(t *testing.T) { Name: "duplicate-mount-volume", Namespace: namespace, }, + Spec: v1alpha1.JobSpec{ + MinAvailable: 1, + Queue: "default", + Tasks: []v1alpha1.TaskSpec{ + { + Name: "task-1", + Replicas: 1, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"name": "test"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "busybox:1.24", + }, + }, + }, + }, + }, + }, + Policies: []v1alpha1.LifecyclePolicy{ + { + Event: v1alpha1.AnyEvent, + Action: v1alpha1.AbortJobAction, + }, + }, + Volumes: []v1alpha1.VolumeSpec{ + { + MountPath: "/var", + VolumeClaimName: "pvc1", + }, + { + MountPath: "/var", + VolumeClaimName: "pvc2", + }, + }, + }, + }, + reviewResponse: v1beta1.AdmissionResponse{Allowed: true}, + ret: " duplicated mountPath: /var;", + ExpectErr: true, + }, + { + Name: "volume without VolumeClaimName and VolumeClaim", + Job: v1alpha1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "invalid-volume", + Namespace: namespace, + }, Spec: v1alpha1.JobSpec{ MinAvailable: 1, Queue: "default", @@ -859,7 +910,7 @@ func TestValidateExecution(t *testing.T) { }, }, reviewResponse: v1beta1.AdmissionResponse{Allowed: true}, - ret: " duplicated mountPath: /var;", + ret: " either VolumeClaim or VolumeClaimName must be specified;", ExpectErr: true, }, // task Policy with any event and other events diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 86583214e4..207c164c45 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -30,8 +30,8 @@ import ( const ( // PodNameFmt pod name format PodNameFmt = "%s-%s-%d" - // VolumeClaimFmt volume claim name format - VolumeClaimFmt = "%s-volume-%s" + // persistentVolumeClaimFmt represents persistent volume claim name format + persistentVolumeClaimFmt = "%s-pvc-%s" ) // GetTaskIndex returns task Index @@ -61,9 +61,9 @@ func GenRandomStr(l int) string { return string(result) } -// MakeVolumeClaimName creates volume claim name -func MakeVolumeClaimName(jobName string) string { - return fmt.Sprintf(VolumeClaimFmt, jobName, GenRandomStr(12)) +// GenPVCName generates pvc name with job name +func GenPVCName(jobName string) string { + return fmt.Sprintf(persistentVolumeClaimFmt, jobName, GenRandomStr(12)) } // GetJobKeyByReq gets the key for the job request diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 0b3f7abe87..13e389b506 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -17,7 +17,6 @@ limitations under the License. package job import ( - "errors" "fmt" "sort" "sync" @@ -338,16 +337,15 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt func (cc *Controller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) { // If PVC does not exist, create them for Job. var needUpdate bool - volumes := job.Spec.Volumes if job.Status.ControlledResources == nil { job.Status.ControlledResources = make(map[string]string) } - for index, volume := range volumes { + for index, volume := range job.Spec.Volumes { vcName := volume.VolumeClaimName if len(vcName) == 0 { - //NOTE(k82cn): Ensure never have duplicated generated names. + // NOTE(k82cn): Ensure never have duplicated generated names. for { - vcName = jobhelpers.MakeVolumeClaimName(job.Name) + vcName = jobhelpers.GenPVCName(job.Name) exist, err := cc.checkPVCExist(job, vcName) if err != nil { return job, err @@ -363,26 +361,17 @@ func (cc *Controller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil { return job, err } - job.Status.ControlledResources["volume-pvc-"+vcName] = vcName - } else { - job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName } } else { - if job.Status.ControlledResources["volume-emptyDir-"+vcName] == vcName || job.Status.ControlledResources["volume-pvc-"+vcName] == vcName { - continue - } exist, err := cc.checkPVCExist(job, vcName) if err != nil { return job, err } - if exist { - job.Status.ControlledResources["volume-pvc-"+vcName] = vcName - } else { - msg := fmt.Sprintf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName) - glog.Error(msg) - return job, errors.New(msg) + if !exist { + return job, fmt.Errorf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName) } } + job.Status.ControlledResources["volume-pvc-"+vcName] = vcName } if needUpdate { newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(job) @@ -398,13 +387,13 @@ func (cc *Controller) createJobIOIfNotExist(job *batch.Job) (*batch.Job, error) return job, nil } -func (cc *Controller) checkPVCExist(job *batch.Job, vcName string) (bool, error) { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(vcName); err != nil { +func (cc *Controller) checkPVCExist(job *batch.Job, pvc string) (bool, error) { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(pvc); err != nil { if apierrors.IsNotFound(err) { return false, nil } - glog.V(3).Infof("Failed to get PVC for job <%s/%s>: %v", - job.Namespace, job.Name, err) + glog.V(3).Infof("Failed to get PVC %s for job <%s/%s>: %v", + pvc, job.Namespace, job.Name, err) return false, err } return true, nil diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index df37c349d4..0b5d8deca2 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -63,24 +63,19 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod vcName := volume.VolumeClaimName name := fmt.Sprintf("%s-%s", job.Name, jobhelpers.GenRandomStr(12)) if _, ok := volumeMap[vcName]; !ok { - if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok { - volume := v1.Volume{ - Name: name, - } - volume.EmptyDir = &v1.EmptyDirVolumeSource{} - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } else { - volume := v1.Volume{ - Name: name, - } - volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: vcName, - } - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + volume := v1.Volume{ + Name: name, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: vcName, + }, + }, } + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) volumeMap[vcName] = name } else { - name = volumeMap[vcName] + // duplicate volumes, should be prevented + continue } for i, c := range pod.Spec.Containers { diff --git a/pkg/controllers/job/job_controller_util_test.go b/pkg/controllers/job/job_controller_util_test.go index 624e081fff..da5cf11576 100644 --- a/pkg/controllers/job/job_controller_util_test.go +++ b/pkg/controllers/job/job_controller_util_test.go @@ -213,11 +213,6 @@ func TestCreateJobPod(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, PodTemplate: &v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -292,11 +287,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ Action: v1alpha1.EnqueueAction, @@ -332,11 +322,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ Event: v1alpha1.OutOfSyncEvent, @@ -372,12 +357,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - Version: 2, - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ JobVersion: 1, @@ -420,11 +399,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ TaskName: "task1", @@ -466,11 +440,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ TaskName: "task1", @@ -507,11 +476,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ TaskName: "task1", @@ -554,11 +518,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{ Event: v1alpha1.CommandIssuedEvent, @@ -601,11 +560,6 @@ func TestApplyPolicies(t *testing.T) { }, }, }, - Status: v1alpha1.JobStatus{ - ControlledResources: map[string]string{ - "volume-emptyDir-vc1": "vc1", - }, - }, }, Request: &apis.Request{}, ReturnVal: v1alpha1.SyncJobAction, diff --git a/test/e2e/job_controlled_resource.go b/test/e2e/job_controlled_resource.go index 4f173719a3..c19e01755c 100644 --- a/test/e2e/job_controlled_resource.go +++ b/test/e2e/job_controlled_resource.go @@ -26,47 +26,6 @@ import ( ) var _ = Describe("Job E2E Test: Test Job PVCs", func() { - It("Generate PVC name if not specified", func() { - jobName := "job-pvc-name-empty" - namespace := "test" - taskName := "task" - pvcName := "specifiedpvcname" - context := initTestContext() - defer cleanupTestContext(context) - - job := createJob(context, &jobSpec{ - namespace: namespace, - name: jobName, - tasks: []taskSpec{ - { - img: defaultNginxImage, - req: oneCPU, - min: 1, - rep: 1, - name: taskName, - }, - }, - volumes: []v1alpha1.VolumeSpec{ - { - MountPath: "/mounttwo", - }, - }, - }) - - err := waitJobReady(context, job) - Expect(err).NotTo(HaveOccurred()) - - job, err = context.vcclient.BatchV1alpha1().Jobs(namespace).Get(jobName, metav1.GetOptions{}) - Expect(err).NotTo(HaveOccurred()) - - Expect(len(job.Spec.Volumes)).To(Equal(1), - "Two volumes should be created") - for _, volume := range job.Spec.Volumes { - Expect(volume.VolumeClaimName).Should(Or(ContainSubstring(jobName), Equal(pvcName)), - "PVC name should be generated for manually specified.") - } - }) - It("use exisisting PVC in job", func() { jobName := "job-pvc-name-exist" namespace := "test"