Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxuzhonghu committed Apr 9, 2020
1 parent f9ae7a2 commit 59a5d24
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions pkg/controller.v1/common/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
log "github.com/sirupsen/logrus"
"k8s.io/api/policy/v1beta1"
policyapi "k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewJobController(
reconcilerSyncPeriod metav1.Duration,
enableGangScheduling bool,
kubeClientSet kubeclientset.Interface,
kubeBatchClientSet kubebatchclient.Interface,
volcanoClientSet volcanoclient.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
workQueueName string) JobController {

Expand All @@ -135,7 +135,7 @@ func NewJobController(
Controller: controllerImpl,
Config: jobControllerConfig,
KubeClientSet: kubeClientSet,
VolcanoClientSet: kubeBatchClientSet,
VolcanoClientSet: volcanoClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
Expand Down Expand Up @@ -170,9 +170,9 @@ func (jc *JobController) GenLabels(jobName string) map[string]string {

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodGroup, error) {

kubeBatchClientInterface := jc.VolcanoClientSet
volcanoClientSet := jc.VolcanoClientSet
// Check whether podGroup exists or not
podGroup, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
podGroup, err := volcanoClientSet.SchedulingV1beta1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err == nil {
return podGroup, nil
}
Expand All @@ -190,11 +190,11 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas in
MinMember: minAvailable.IntVal,
},
}
return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup)
return volcanoClientSet.SchedulingV1beta1().PodGroups(job.GetNamespace()).Create(createPodGroup)
}

// SyncPdb will create a PDB for gang scheduling by volcano.
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error) {
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*policyapi.PodDisruptionBudget, error) {
labelJobName := apiv1.JobNameLabel

// Check the pdb exist or not
Expand All @@ -208,14 +208,14 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)

// Create pdb for gang scheduling by volcano
minAvailable := intstr.FromInt(int(minAvailableReplicas))
createPdb := &v1beta1.PodDisruptionBudget{
createPdb := &policyapi.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: job.GetName(),
OwnerReferences: []metav1.OwnerReference{
*jc.GenOwnerReference(job),
},
},
Spec: v1beta1.PodDisruptionBudgetSpec{
Spec: policyapi.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand All @@ -228,18 +228,18 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32)
}

func (jc *JobController) DeletePodGroup(job metav1.Object) error {
kubeBatchClientInterface := jc.VolcanoClientSet
volcanoClientSet := jc.VolcanoClientSet

//check whether podGroup exists or not
_, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
_, err := volcanoClientSet.SchedulingV1beta1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{})
if err != nil && k8serrors.IsNotFound(err) {
return nil
}

log.Infof("Deleting PodGroup %s", job.GetName())

//delete podGroup
err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{})
err = volcanoClientSet.SchedulingV1beta1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete PodGroup: %v", err)
}
Expand Down

0 comments on commit 59a5d24

Please sign in to comment.