Skip to content

Commit

Permalink
Can not sync job status correctly when upgrading from v1.5 volcano-sh…
Browse files Browse the repository at this point in the history
…#3640

v1.5 changed the naming logics of pod group by adding UID into the
name: volcano-sh#2140, and there is also another fix regarding handling the
already created pod group without UID in create or update: volcano-sh#2400.
But a similar fix does not exist in the syncJob function.

Fixes volcano-sh#3640

Signed-off-by: cheerfun <qingyafan@outlook.com>
  • Loading branch information
QingyaFan committed Aug 8, 2024
1 parent ab23490 commit 6c3ef90
Showing 1 changed file with 72 additions and 53 deletions.
125 changes: 72 additions & 53 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ import (

var calMutex sync.Mutex

// getPodGroupByJob returns the podgroup related to the vcjob.
// it will return normal pg if it exist in cluster,
// else it return legacy pg before version 1.5.
func (cc *jobcontroller) getPodGroupByJob(job *batch.Job) (*scheduling.PodGroup, error) {
pgName := cc.generateRelatedPodGroupName(job)
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(pgName)
if err == nil {
return pg, nil
}
if apierrors.IsNotFound(err) {
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
return nil, err
}
return pg, nil
}
return nil, err
}

func (cc *jobcontroller) generateRelatedPodGroupName(job *batch.Job) string {
return fmt.Sprintf("%s-%s", job.Name, string(job.UID))
}

func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
job := jobInfo.Job
klog.V(3).Infof("Killing Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version)
Expand Down Expand Up @@ -152,12 +175,17 @@ func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.Pha
}

// Delete PodGroup
pgName := job.Name + "-" + string(job.UID)
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pg.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete PodGroup of Job %s/%s: %v", job.Namespace, job.Name, err)
return err
}
}
}

Expand Down Expand Up @@ -281,8 +309,12 @@ func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.Updat
}

var syncTask bool
pgName := job.Name + "-" + string(job.UID)
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {
pg, err := cc.getPodGroupByJob(job)
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to find PodGroup of Job: %s/%s, error: %s", job.Namespace, job.Name, err.Error())
return err
}
if pg != nil {
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true
}
Expand Down Expand Up @@ -662,63 +694,50 @@ func (cc *jobcontroller) createPVC(job *batch.Job, vcName string, volumeClaim *v

func (cc *jobcontroller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
pgName := job.Name + "-" + string(job.UID)
var pg *scheduling.PodGroup
var err error
pg, err = cc.pgLister.PodGroups(job.Namespace).Get(pgName)
pg, err := cc.getPodGroupByJob(job)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
// try to get old pg if new pg not exist
pg, err = cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
minTaskMember := map[string]int32{}
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
minTaskMember[task.Name] = *task.MinAvailable
} else {
minTaskMember[task.Name] = task.Replicas
}
}

pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// add job.UID into its name when create new PodGroup
Name: pgName,
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
pg := &scheduling.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
// add job.UID into its name when create new PodGroup
Name: cc.generateRelatedPodGroupName(job),
Annotations: job.Annotations,
Labels: job.Labels,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}
},
Spec: scheduling.PodGroupSpec{
MinMember: job.Spec.MinAvailable,
MinTaskMember: minTaskMember,
Queue: job.Spec.Queue,
MinResources: cc.calcPGMinResources(job),
PriorityClassName: job.Spec.PriorityClassName,
},
}

if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Create(context.TODO(), pg, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.Errorf("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
return nil
}
return nil
}

pgShouldUpdate := false
Expand Down

0 comments on commit 6c3ef90

Please sign in to comment.