From f4c7043a442159bf006b4ef025a7792865f23919 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 19 Jan 2023 22:17:42 +0900 Subject: [PATCH 1/3] Support for PodGroup updates Signed-off-by: Yuki Iwai --- pkg/controller.v1/common/scheduling.go | 40 +++++++++++-------- pkg/controller.v1/control/podgroup_control.go | 37 ++++++++++++++--- 2 files changed, 54 insertions(+), 23 deletions(-) diff --git a/pkg/controller.v1/common/scheduling.go b/pkg/controller.v1/common/scheduling.go index a88d7872..038ba73e 100644 --- a/pkg/controller.v1/common/scheduling.go +++ b/pkg/controller.v1/common/scheduling.go @@ -28,6 +28,7 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -35,29 +36,34 @@ type FillPodGroupSpecFunc func(object metav1.Object) error func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error) { pgctl := jc.PodGroupControl + // Check whether podGroup exists or not podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) if err == nil { - return podGroup, nil + // update podGroup for gang scheduling + if err = specFunc(podGroup); err != nil { + return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(podGroup), err) + } + return nil, pgctl.UpdatePodGroup(podGroup.(client.Object)) } else if client.IgnoreNotFound(err) != nil { - return nil, fmt.Errorf("unable to get PodGroup: %v", err) - } - - // create podGroup for gang scheduling - toCreatePodGroup := pgctl.NewEmptyPodGroup() - toCreatePodGroup.SetName(job.GetName()) - toCreatePodGroup.SetNamespace(job.GetNamespace()) - toCreatePodGroup.SetAnnotations(job.GetAnnotations()) - toCreatePodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)}) - if err = specFunc(toCreatePodGroup); err != nil { - return nil, fmt.Errorf("unable to fill the spec of PodGroup: %v", err) - } + return nil, fmt.Errorf("unable to get a PodGroup: %v", err) + } else { + // create podGroup for gang scheduling + newPodGroup := pgctl.NewEmptyPodGroup() + newPodGroup.SetName(job.GetName()) + newPodGroup.SetNamespace(job.GetNamespace()) + newPodGroup.SetAnnotations(job.GetAnnotations()) + newPodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)}) + if err = specFunc(newPodGroup); err != nil { + return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(newPodGroup), err) + } - err = pgctl.CreatePodGroup(toCreatePodGroup) - if err != nil { - return podGroup, fmt.Errorf("unable to create PodGroup: %v", err) + err = pgctl.CreatePodGroup(newPodGroup) + if err != nil { + return podGroup, fmt.Errorf("unable to create PodGroup: %v", err) + } + createdPodGroupsCount.Inc() } - createdPodGroupsCount.Inc() createdPodGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) if err != nil { diff --git a/pkg/controller.v1/control/podgroup_control.go b/pkg/controller.v1/control/podgroup_control.go index 3d779590..5de9188e 100644 --- a/pkg/controller.v1/control/podgroup_control.go +++ b/pkg/controller.v1/control/podgroup_control.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" @@ -33,20 +34,22 @@ import ( // PodGroupControlInterface is an interface that knows how to add or delete PodGroups // created as an interface to allow testing. type PodGroupControlInterface interface { - // NewEmptyPodGroup returns an empty PodGroup + // NewEmptyPodGroup returns an empty PodGroup. NewEmptyPodGroup() client.Object - // GetPodGroup gets the PodGroup identified by namespace and name + // GetPodGroup gets the PodGroup identified by namespace and name. GetPodGroup(namespace string, name string) (metav1.Object, error) // DeletePodGroup deletes the PodGroup identified by namespace and name. DeletePodGroup(namespace string, name string) error + // UpdatePodGroup updates a PodGroup. + UpdatePodGroup(podGroup client.Object) error // CreatePodGroup creates a new PodGroup with PodGroup spec fill function. CreatePodGroup(podGroup client.Object) error // DelayPodCreationDueToPodGroup determines whether it should delay Pod Creation. DelayPodCreationDueToPodGroup(pg metav1.Object) bool // DecoratePodTemplateSpec decorates PodTemplateSpec. - // If the PodTemplateSpec has SchedulerName set, this method will Not override + // If the PodTemplateSpec has SchedulerName set, this method will Not override. DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, rtype string) - // GetSchedulerName returns the name of the gang scheduler + // GetSchedulerName returns the name of the gang scheduler. GetSchedulerName() string } @@ -99,6 +102,15 @@ func (v *VolcanoControl) DeletePodGroup(namespace string, name string) error { return v.Client.SchedulingV1beta1().PodGroups(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } +func (v *VolcanoControl) UpdatePodGroup(podGroup client.Object) error { + pg := podGroup.(*volcanov1beta1.PodGroup) + _, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Update(context.TODO(), pg, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err) + } + return nil +} + func (v *VolcanoControl) CreatePodGroup(podGroup client.Object) error { pg := podGroup.(*volcanov1beta1.PodGroup) createPodGroup, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Create(context.TODO(), pg, metav1.CreateOptions{}) @@ -162,9 +174,22 @@ func (s *SchedulerPluginsControl) DeletePodGroup(namespace, name string) error { return s.Client.Delete(ctx, pg) } +func (s *SchedulerPluginsControl) UpdatePodGroup(podGroup client.Object) error { + pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup) + err := s.Client.Update(context.TODO(), pg, &client.UpdateOptions{}) + if err != nil { + return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err) + } + return nil +} + func (s *SchedulerPluginsControl) CreatePodGroup(podGroup client.Object) error { - ctx := context.TODO() - return s.Client.Create(ctx, podGroup) + pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup) + err := s.Client.Create(context.TODO(), pg, &client.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create a PodGroup, '%v': %v", klog.KObj(pg), err) + } + return nil } var _ PodGroupControlInterface = &SchedulerPluginsControl{} From 74186bfe813233509454baf7a275084bbc43f17b Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 19 Jan 2023 22:59:40 +0900 Subject: [PATCH 2/3] Add the logic to check PodGroup changes Signed-off-by: Yuki Iwai --- go.mod | 2 +- pkg/controller.v1/common/scheduling.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index d599382b..79850a44 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/go-logr/logr v1.2.3 + github.com/google/go-cmp v0.5.8 github.com/prometheus/client_golang v1.12.2 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.0 @@ -37,7 +38,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect - github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/imdario/mergo v0.3.12 // indirect diff --git a/pkg/controller.v1/common/scheduling.go b/pkg/controller.v1/common/scheduling.go index 038ba73e..4048d0ae 100644 --- a/pkg/controller.v1/common/scheduling.go +++ b/pkg/controller.v1/common/scheduling.go @@ -23,6 +23,7 @@ import ( apiv1 "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/google/go-cmp/cmp" log "github.com/sirupsen/logrus" policyapi "k8s.io/api/policy/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -41,10 +42,14 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSp podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) if err == nil { // update podGroup for gang scheduling + oldPodGroup := podGroup if err = specFunc(podGroup); err != nil { return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(podGroup), err) } - return nil, pgctl.UpdatePodGroup(podGroup.(client.Object)) + if diff := cmp.Diff(oldPodGroup, podGroup); len(diff) != 0 { + return podGroup, pgctl.UpdatePodGroup(podGroup.(client.Object)) + } + return podGroup, nil } else if client.IgnoreNotFound(err) != nil { return nil, fmt.Errorf("unable to get a PodGroup: %v", err) } else { From 6895dceb77b52f855df8abce8ef6ad25a7ad69de Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 19 Jan 2023 23:03:15 +0900 Subject: [PATCH 3/3] make podgroup pointer Signed-off-by: Yuki Iwai --- pkg/controller.v1/common/scheduling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller.v1/common/scheduling.go b/pkg/controller.v1/common/scheduling.go index 4048d0ae..f96e4802 100644 --- a/pkg/controller.v1/common/scheduling.go +++ b/pkg/controller.v1/common/scheduling.go @@ -42,7 +42,7 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSp podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName()) if err == nil { // update podGroup for gang scheduling - oldPodGroup := podGroup + oldPodGroup := &podGroup if err = specFunc(podGroup); err != nil { return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(podGroup), err) }