Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Support for PodGroup updates #207

Merged
merged 3 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/go-logr/logr v1.2.3
github.com/google/go-cmp v0.5.8
github.com/prometheus/client_golang v1.12.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
Expand Down
43 changes: 27 additions & 16 deletions pkg/controller.v1/common/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,52 @@ import (

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"

"github.com/google/go-cmp/cmp"
log "github.com/sirupsen/logrus"
policyapi "k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type FillPodGroupSpecFunc func(object metav1.Object) error

func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error) {
pgctl := jc.PodGroupControl

// Check whether podGroup exists or not
podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName())
if err == nil {
// update podGroup for gang scheduling
oldPodGroup := &podGroup
if err = specFunc(podGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(podGroup), err)
}
if diff := cmp.Diff(oldPodGroup, podGroup); len(diff) != 0 {
return podGroup, pgctl.UpdatePodGroup(podGroup.(client.Object))
}
return podGroup, nil
} else if client.IgnoreNotFound(err) != nil {
return nil, fmt.Errorf("unable to get PodGroup: %v", err)
}

// create podGroup for gang scheduling
toCreatePodGroup := pgctl.NewEmptyPodGroup()
toCreatePodGroup.SetName(job.GetName())
toCreatePodGroup.SetNamespace(job.GetNamespace())
toCreatePodGroup.SetAnnotations(job.GetAnnotations())
toCreatePodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)})
if err = specFunc(toCreatePodGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup: %v", err)
}
return nil, fmt.Errorf("unable to get a PodGroup: %v", err)
} else {
// create podGroup for gang scheduling
newPodGroup := pgctl.NewEmptyPodGroup()
newPodGroup.SetName(job.GetName())
newPodGroup.SetNamespace(job.GetNamespace())
newPodGroup.SetAnnotations(job.GetAnnotations())
newPodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)})
if err = specFunc(newPodGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(newPodGroup), err)
}

err = pgctl.CreatePodGroup(toCreatePodGroup)
if err != nil {
return podGroup, fmt.Errorf("unable to create PodGroup: %v", err)
err = pgctl.CreatePodGroup(newPodGroup)
if err != nil {
return podGroup, fmt.Errorf("unable to create PodGroup: %v", err)
}
createdPodGroupsCount.Inc()
}
createdPodGroupsCount.Inc()

createdPodGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName())
if err != nil {
Expand Down
37 changes: 31 additions & 6 deletions pkg/controller.v1/control/podgroup_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -33,20 +34,22 @@ import (
// PodGroupControlInterface is an interface that knows how to add or delete PodGroups
// created as an interface to allow testing.
type PodGroupControlInterface interface {
// NewEmptyPodGroup returns an empty PodGroup
// NewEmptyPodGroup returns an empty PodGroup.
NewEmptyPodGroup() client.Object
// GetPodGroup gets the PodGroup identified by namespace and name
// GetPodGroup gets the PodGroup identified by namespace and name.
GetPodGroup(namespace string, name string) (metav1.Object, error)
// DeletePodGroup deletes the PodGroup identified by namespace and name.
DeletePodGroup(namespace string, name string) error
// UpdatePodGroup updates a PodGroup.
UpdatePodGroup(podGroup client.Object) error
// CreatePodGroup creates a new PodGroup with PodGroup spec fill function.
CreatePodGroup(podGroup client.Object) error
// DelayPodCreationDueToPodGroup determines whether it should delay Pod Creation.
DelayPodCreationDueToPodGroup(pg metav1.Object) bool
// DecoratePodTemplateSpec decorates PodTemplateSpec.
// If the PodTemplateSpec has SchedulerName set, this method will Not override
// If the PodTemplateSpec has SchedulerName set, this method will Not override.
DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, rtype string)
// GetSchedulerName returns the name of the gang scheduler
// GetSchedulerName returns the name of the gang scheduler.
GetSchedulerName() string
}

Expand Down Expand Up @@ -99,6 +102,15 @@ func (v *VolcanoControl) DeletePodGroup(namespace string, name string) error {
return v.Client.SchedulingV1beta1().PodGroups(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}

func (v *VolcanoControl) UpdatePodGroup(podGroup client.Object) error {
pg := podGroup.(*volcanov1beta1.PodGroup)
_, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Update(context.TODO(), pg, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

func (v *VolcanoControl) CreatePodGroup(podGroup client.Object) error {
pg := podGroup.(*volcanov1beta1.PodGroup)
createPodGroup, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Create(context.TODO(), pg, metav1.CreateOptions{})
Expand Down Expand Up @@ -162,9 +174,22 @@ func (s *SchedulerPluginsControl) DeletePodGroup(namespace, name string) error {
return s.Client.Delete(ctx, pg)
}

func (s *SchedulerPluginsControl) UpdatePodGroup(podGroup client.Object) error {
pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup)
err := s.Client.Update(context.TODO(), pg, &client.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

func (s *SchedulerPluginsControl) CreatePodGroup(podGroup client.Object) error {
ctx := context.TODO()
return s.Client.Create(ctx, podGroup)
pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup)
err := s.Client.Create(context.TODO(), pg, &client.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

var _ PodGroupControlInterface = &SchedulerPluginsControl{}