Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared implementation of operator code #773

Merged
merged 8 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 122 additions & 67 deletions pkg/controller.v2/jobcontroller/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
kubeinformers "k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -26,22 +28,38 @@ import (
// Common Interaface to be implemented by all operators
type ControllerInterface interface {

// AdoptFunc used byControlRefManager to get the latest object if UID matches
AdoptFunc(job metav1.Object) func() (metav1.Object, error)
// Returns the Controller name
GetControllerName() string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I prefer ControllerName(), GetXxx() is Java style :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same. But I left it because many interfaces in kubernetes haven't followed it.
https://github.com/kubernetes/apimachinery/blob/master/pkg/apis/meta/v1/meta.go#L33

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ScorpioCPH Added the changes


// Returns total replicas for a job. This is used for gang scheduling
GetTotalReplicas(obj metav1.Object) int32

// Returns the GrouoVersionKinf of the API
// Returns the GroupVersionKind of the API
GetAPIGroupVersionKind() schema.GroupVersionKind

// Returns the GroupVersion of the API
GetAPIGroupVersion() schema.GroupVersion

GetGroupNameLabel() string
// Returns the Group Namei(key) in the labels of the job
GetGroupNameLabelKey() string

// Returns the Job Name(key) in the labels of the job
GetJobNameLabelKey() string

GetJobNameLabel() string
// Returns the Group Name(value) in the labels of the job
GetGroupNameLabelValue() string

GetJobGroupName() string
// Returns the Replica Type(key) in the labels of the job
GetReplicaTypeLabel() string

// Returns the Replica Index(value) in the labels of the job
GetReplicaIndexLabel() string

// Returns total replicas for a job. This is used for gang scheduling
GetTotalReplicas(obj metav1.Object) int32

// Returns the Job from Infomer Cache
GetJobFromInformerCache(namespace, name string) (metav1.Object, error)

// Returns the Job from API server
GetJobFromAPIClient(namespace, name string) (metav1.Object, error)
}

// JobControllerConfiguration contains configuration of operator.
Expand Down Expand Up @@ -115,6 +133,75 @@ type JobController struct {
Recorder record.EventRecorder
}

func NewJobController(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name may conflicts users since Kubernetes has a job controller

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TrainingJobController or BaseController

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is package scoped, is it a problem? I initially thought of putting just "Controller" as the package name. But I felt that it will be confusing as we already have a "Controller" package for v1alpha1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new contributors will be confused, maybe. While it is not a big problem, just a nit.

controllerName string,
reconcilerSyncPeriod metav1.Duration,
enableGangScheduling bool,
kubeClientSet kubeclientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
workQueueName string) JobController {

log.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

realPodControl := control.RealPodControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}),
}

realServiceControl := control.RealServiceControl{
KubeClient: kubeClientSet,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}),
}

jobControllerConfig := JobControllerConfiguration{
ReconcilerSyncLoopPeriod: reconcilerSyncPeriod,
EnableGangScheduling: enableGangScheduling,
}

jc := JobController{
Config: jobControllerConfig,
PodControl: realPodControl,
ServiceControl: realServiceControl,
KubeClientSet: kubeClientSet,
Expectations: controller.NewControllerExpectations(),
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Recorder: recorder,
}

// Create pod informer.
podInformer := kubeInformerFactory.Core().V1().Pods()

// Set up an event handler for when pod resources change
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.AddPod,
UpdateFunc: jc.UpdatePod,
DeleteFunc: jc.DeletePod,
})

jc.PodLister = podInformer.Lister()
jc.PodInformerSynced = podInformer.Informer().HasSynced

// Create service informer.
serviceInformer := kubeInformerFactory.Core().V1().Services()

// Set up an event handler for when service resources change.
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.AddService,
UpdateFunc: jc.UpdateService,
DeleteFunc: jc.DeleteService,
})

jc.ServiceLister = serviceInformer.Lister()
jc.ServiceInformerSynced = serviceInformer.Informer().HasSynced

return jc

}

func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference {
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
Expand All @@ -130,71 +217,18 @@ func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerRefer
}

func (jc *JobController) GenLabels(jobName string) map[string]string {
labelGroupName := jc.Controller.GetGroupNameLabel()
labelJobName := jc.Controller.GetJobNameLabel()
groupName := jc.Controller.GetJobGroupName()
labelGroupName := jc.Controller.GetGroupNameLabelKey()
labelJobName := jc.Controller.GetJobNameLabelKey()
groupName := jc.Controller.GetGroupNameLabelValue()
return map[string]string{
labelGroupName: groupName,
labelJobName: strings.Replace(jobName, "/", "-", -1),
}
}

// getPodsForJob returns the set of pods that this job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error) {
// Create selector.
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: jc.GenLabels(job.GetName()),
})

if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jc.PodLister.Pods(job.GetNamespace()).List(labels.Everything())
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).

canAdoptFunc := RecheckDeletionTimestamp(jc.Controller.AdoptFunc(job))
cm := controller.NewPodControllerRefManager(jc.PodControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc)
return cm.ClaimPods(pods)
}

// getServicesForJob returns the set of services that this job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned services are pointers into the cache.
func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error) {
// Create selector
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: jc.GenLabels(job.GetName()),
})

if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all services to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
services, err := jc.ServiceLister.Services(job.GetNamespace()).List(labels.Everything())
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing services (see #42639).
canAdoptFunc := RecheckDeletionTimestamp(jc.Controller.AdoptFunc(job))
cm := control.NewServiceControllerRefManager(jc.ServiceControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc)
return cm.ClaimServices(services)
}

// SyncPdb will create a PDB for gang scheduling by kube-arbitrator.
func (jc *JobController) SyncPdb(job metav1.Object) (*v1beta1.PodDisruptionBudget, error) {
labelJobName := jc.Controller.GetJobNameLabel()
labelJobName := jc.Controller.GetJobNameLabelKey()
totalJobReplicas := jc.Controller.GetTotalReplicas(job)
// Non-distributed training is not required gang scheduling
if totalJobReplicas < 2 {
Expand Down Expand Up @@ -247,3 +281,24 @@ func (jc *JobController) DeletePdb(job metav1.Object) error {
}
return nil
}

// resolveControllerRef returns the job referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching job
// of the correct Kind.
func (jc *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) metav1.Object {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != jc.Controller.GetAPIGroupVersionKind().Kind {
return nil
}
job, err := jc.Controller.GetJobFromInformerCache(namespace, controllerRef.Name)
if err != nil {
return nil
}
if job.GetUID() != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}
Loading