diff --git a/pkg/controller.v2/jobcontroller/jobcontroller.go b/pkg/controller.v2/jobcontroller/jobcontroller.go index 75c8659ec0..7455b763b7 100644 --- a/pkg/controller.v2/jobcontroller/jobcontroller.go +++ b/pkg/controller.v2/jobcontroller/jobcontroller.go @@ -10,10 +10,12 @@ import ( "k8s.io/api/policy/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" + kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -26,22 +28,38 @@ import ( // Common Interaface to be implemented by all operators type ControllerInterface interface { - // AdoptFunc used byControlRefManager to get the latest object if UID matches - AdoptFunc(job metav1.Object) func() (metav1.Object, error) + // Returns the Controller name + ControllerName() string - // Returns total replicas for a job. This is used for gang scheduling - GetTotalReplicas(obj metav1.Object) int32 - - // Returns the GrouoVersionKinf of the API + // Returns the GroupVersionKind of the API GetAPIGroupVersionKind() schema.GroupVersionKind + // Returns the GroupVersion of the API GetAPIGroupVersion() schema.GroupVersion - GetGroupNameLabel() string + // Returns the Group Namei(key) in the labels of the job + GetGroupNameLabelKey() string + + // Returns the Job Name(key) in the labels of the job + GetJobNameLabelKey() string + + // Returns the Group Name(value) in the labels of the job + GetGroupNameLabelValue() string + + // Returns the Replica Type(key) in the labels of the job + GetReplicaTypeLabelKey() string - GetJobNameLabel() string + // Returns the Replica Index(value) in the labels of the job + GetReplicaIndexLabelKey() string - GetJobGroupName() string + // Returns total replicas for a job. This is used for gang scheduling + GetTotalReplicas(obj metav1.Object) int32 + + // Returns the Job from Infomer Cache + GetJobFromInformerCache(namespace, name string) (metav1.Object, error) + + // Returns the Job from API server + GetJobFromAPIClient(namespace, name string) (metav1.Object, error) } // JobControllerConfiguration contains configuration of operator. @@ -115,6 +133,49 @@ type JobController struct { Recorder record.EventRecorder } +func NewJobController( + controllerImpl ControllerInterface, + reconcilerSyncPeriod metav1.Duration, + enableGangScheduling bool, + kubeClientSet kubeclientset.Interface, + kubeInformerFactory kubeinformers.SharedInformerFactory, + workQueueName string) JobController { + + log.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(log.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}) + + realPodControl := control.RealPodControl{ + KubeClient: kubeClientSet, + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}), + } + + realServiceControl := control.RealServiceControl{ + KubeClient: kubeClientSet, + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerImpl.ControllerName()}), + } + + jobControllerConfig := JobControllerConfiguration{ + ReconcilerSyncLoopPeriod: reconcilerSyncPeriod, + EnableGangScheduling: enableGangScheduling, + } + + jc := JobController{ + Controller: controllerImpl, + Config: jobControllerConfig, + PodControl: realPodControl, + ServiceControl: realServiceControl, + KubeClientSet: kubeClientSet, + Expectations: controller.NewControllerExpectations(), + WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), + Recorder: recorder, + } + return jc + +} + func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference { boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ @@ -130,71 +191,18 @@ func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerRefer } func (jc *JobController) GenLabels(jobName string) map[string]string { - labelGroupName := jc.Controller.GetGroupNameLabel() - labelJobName := jc.Controller.GetJobNameLabel() - groupName := jc.Controller.GetJobGroupName() + labelGroupName := jc.Controller.GetGroupNameLabelKey() + labelJobName := jc.Controller.GetJobNameLabelKey() + groupName := jc.Controller.GetGroupNameLabelValue() return map[string]string{ labelGroupName: groupName, labelJobName: strings.Replace(jobName, "/", "-", -1), } } -// getPodsForJob returns the set of pods that this job should manage. -// It also reconciles ControllerRef by adopting/orphaning. -// Note that the returned Pods are pointers into the cache. -func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error) { - // Create selector. - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: jc.GenLabels(job.GetName()), - }) - - if err != nil { - return nil, fmt.Errorf("couldn't convert Job selector: %v", err) - } - // List all pods to include those that don't match the selector anymore - // but have a ControllerRef pointing to this controller. - pods, err := jc.PodLister.Pods(job.GetNamespace()).List(labels.Everything()) - if err != nil { - return nil, err - } - - // If any adoptions are attempted, we should first recheck for deletion - // with an uncached quorum read sometime after listing Pods (see #42639). - - canAdoptFunc := RecheckDeletionTimestamp(jc.Controller.AdoptFunc(job)) - cm := controller.NewPodControllerRefManager(jc.PodControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc) - return cm.ClaimPods(pods) -} - -// getServicesForJob returns the set of services that this job should manage. -// It also reconciles ControllerRef by adopting/orphaning. -// Note that the returned services are pointers into the cache. -func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error) { - // Create selector - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: jc.GenLabels(job.GetName()), - }) - - if err != nil { - return nil, fmt.Errorf("couldn't convert Job selector: %v", err) - } - // List all services to include those that don't match the selector anymore - // but have a ControllerRef pointing to this controller. - services, err := jc.ServiceLister.Services(job.GetNamespace()).List(labels.Everything()) - if err != nil { - return nil, err - } - - // If any adoptions are attempted, we should first recheck for deletion - // with an uncached quorum read sometime after listing services (see #42639). - canAdoptFunc := RecheckDeletionTimestamp(jc.Controller.AdoptFunc(job)) - cm := control.NewServiceControllerRefManager(jc.ServiceControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc) - return cm.ClaimServices(services) -} - // SyncPdb will create a PDB for gang scheduling by kube-arbitrator. func (jc *JobController) SyncPdb(job metav1.Object) (*v1beta1.PodDisruptionBudget, error) { - labelJobName := jc.Controller.GetJobNameLabel() + labelJobName := jc.Controller.GetJobNameLabelKey() totalJobReplicas := jc.Controller.GetTotalReplicas(job) // Non-distributed training is not required gang scheduling if totalJobReplicas < 2 { @@ -247,3 +255,24 @@ func (jc *JobController) DeletePdb(job metav1.Object) error { } return nil } + +// resolveControllerRef returns the job referenced by a ControllerRef, +// or nil if the ControllerRef could not be resolved to a matching job +// of the correct Kind. +func (jc *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) metav1.Object { + // We can't look up by UID, so look up by Name and then verify UID. + // Don't even try to look up by Name if it's the wrong Kind. + if controllerRef.Kind != jc.Controller.GetAPIGroupVersionKind().Kind { + return nil + } + job, err := jc.Controller.GetJobFromInformerCache(namespace, controllerRef.Name) + if err != nil { + return nil + } + if job.GetUID() != controllerRef.UID { + // The controller we found with this Name is not the same one that the + // ControllerRef points to. + return nil + } + return job +} diff --git a/pkg/controller.v2/jobcontroller/pod.go b/pkg/controller.v2/jobcontroller/pod.go new file mode 100644 index 0000000000..e3cd56289d --- /dev/null +++ b/pkg/controller.v2/jobcontroller/pod.go @@ -0,0 +1,192 @@ +package jobcontroller + +import ( + "fmt" + "reflect" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/controller" + + jclogger "github.com/kubeflow/tf-operator/pkg/logger" +) + +// When a pod is created, enqueue the job that manages it and update its expectations. +func (jc *JobController) AddPod(obj interface{}) { + pod := obj.(*v1.Pod) + if pod.DeletionTimestamp != nil { + // on a restart of the controller controller, it's possible a new pod shows up in a state that + // is already pending deletion. Prevent the pod from being a creation observation. + // tc.deletePod(pod) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { + job := jc.resolveControllerRef(pod.Namespace, controllerRef) + + logger := jclogger.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind) + + if job == nil { + logger.Info("This pod's job does not exist") + return + } + + jobKey, err := controller.KeyFunc(job) + if err != nil { + logger.Infof("Failed to get the jobkey: %v", err) + return + } + + if _, ok := pod.Labels[jc.Controller.GetReplicaTypeLabelKey()]; !ok { + logger.Infof("This pod maybe not created by %v", jc.Controller.ControllerName()) + return + } + + rtype := pod.Labels[jc.Controller.GetReplicaTypeLabelKey()] + expectationPodsKey := GenExpectationPodsKey(jobKey, rtype) + + jc.Expectations.CreationObserved(expectationPodsKey) + // TODO: we may need add backoff here + jc.WorkQueue.Add(jobKey) + + return + } + +} + +// When a pod is updated, figure out what tfjob/s manage it and wake them up. +// If the labels of the pod have changed we need to awaken both the old +// and new replica set. old and cur must be *v1.Pod types. +func (jc *JobController) UpdatePod(old, cur interface{}) { + curPod := cur.(*v1.Pod) + oldPod := old.(*v1.Pod) + if curPod.ResourceVersion == oldPod.ResourceVersion { + // Periodic resync will send update events for all known pods. + // Two different versions of the same pod will always have different RVs. + return + } + + logger := jclogger.LoggerForPod(curPod, jc.Controller.GetAPIGroupVersionKind().Kind) + curControllerRef := metav1.GetControllerOf(curPod) + oldControllerRef := metav1.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && oldControllerRef != nil { + // The ControllerRef was changed. Sync the old controller, if any. + if job := jc.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { + logger.Infof("pod ControllerRef updated: %v, %v", curPod, oldPod) + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + // TODO: we may need add backoff here + jc.WorkQueue.Add(jobKey) + } + } + + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + job := jc.resolveControllerRef(curPod.Namespace, curControllerRef) + if job == nil { + return + } + logger.Infof("pod has a ControllerRef: %v, %v", curPod, oldPod) + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + // TODO: we may need add backoff here + jc.WorkQueue.Add(jobKey) + return + } +} + +// When a pod is deleted, enqueue the job that manages the pod and update its expectations. +// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. +func (jc *JobController) DeletePod(obj interface{}) { + pod, ok := obj.(*v1.Pod) + + logger := jclogger.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind) + + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new job will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) + return + } + } + + controllerRef := metav1.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return + } + job := jc.resolveControllerRef(pod.Namespace, controllerRef) + if job == nil { + return + } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + + if _, ok := pod.Labels[jc.Controller.GetReplicaTypeLabelKey()]; !ok { + logger.Infof("This pod maybe not created by %v", jc.Controller.ControllerName()) + return + } + + rtype := pod.Labels[jc.Controller.GetReplicaTypeLabelKey()] + expectationPodsKey := GenExpectationPodsKey(jobKey, rtype) + + jc.Expectations.DeletionObserved(expectationPodsKey) + // TODO: we may need add backoff here + jc.WorkQueue.Add(jobKey) +} + +// getPodsForJob returns the set of pods that this job should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// Note that the returned Pods are pointers into the cache. +func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error) { + // Create selector. + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: jc.GenLabels(job.GetName()), + }) + + if err != nil { + return nil, fmt.Errorf("couldn't convert Job selector: %v", err) + } + // List all pods to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + pods, err := jc.PodLister.Pods(job.GetNamespace()).List(labels.Everything()) + if err != nil { + return nil, err + } + + // If any adoptions are attempted, we should first recheck for deletion + // with an uncached quorum read sometime after listing Pods (see #42639). + + canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := jc.Controller.GetJobFromAPIClient(job.GetNamespace(), job.GetName()) + if err != nil { + return nil, err + } + if fresh.GetUID() != job.GetUID() { + return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.GetUID(), job.GetUID()) + } + return fresh, nil + }) + cm := controller.NewPodControllerRefManager(jc.PodControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc) + return cm.ClaimPods(pods) +} diff --git a/pkg/controller.v2/jobcontroller/service.go b/pkg/controller.v2/jobcontroller/service.go new file mode 100644 index 0000000000..f194ce424c --- /dev/null +++ b/pkg/controller.v2/jobcontroller/service.go @@ -0,0 +1,100 @@ +package jobcontroller + +import ( + "fmt" + + log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/kubernetes/pkg/controller" + + "github.com/kubeflow/tf-operator/pkg/control" +) + +// When a service is created, enqueue the controller that manages it and update its expectations. +func (jc *JobController) AddService(obj interface{}) { + service := obj.(*v1.Service) + if service.DeletionTimestamp != nil { + // on a restart of the controller controller, it's possible a new service shows up in a state that + // is already pending deletion. Prevent the service from being a creation observation. + // tc.deleteService(service) + return + } + + // If it has a ControllerRef, that's all that matters. + if controllerRef := metav1.GetControllerOf(service); controllerRef != nil { + job := jc.resolveControllerRef(service.Namespace, controllerRef) + if job == nil { + return + } + + jobKey, err := controller.KeyFunc(job) + if err != nil { + return + } + + if _, ok := service.Labels[jc.Controller.GetReplicaTypeLabelKey()]; !ok { + log.Infof("This service maybe not created by %v", jc.Controller.ControllerName()) + return + } + + rtype := service.Labels[jc.Controller.GetReplicaTypeLabelKey()] + expectationServicesKey := GenExpectationServicesKey(jobKey, rtype) + + jc.Expectations.CreationObserved(expectationServicesKey) + // TODO: we may need add backoff here + jc.WorkQueue.Add(jobKey) + + return + } + +} + +// When a service is updated, figure out what job/s manage it and wake them up. +// If the labels of the service have changed we need to awaken both the old +// and new replica set. old and cur must be *v1.Service types. +func (jc *JobController) UpdateService(old, cur interface{}) { + // TODO(CPH): handle this gracefully. +} + +// When a service is deleted, enqueue the job that manages the service and update its expectations. +// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. +func (jc *JobController) DeleteService(obj interface{}) { + // TODO(CPH): handle this gracefully. +} + +// getServicesForJob returns the set of services that this job should manage. +// It also reconciles ControllerRef by adopting/orphaning. +// Note that the returned services are pointers into the cache. +func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error) { + // Create selector + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: jc.GenLabels(job.GetName()), + }) + + if err != nil { + return nil, fmt.Errorf("couldn't convert Job selector: %v", err) + } + // List all services to include those that don't match the selector anymore + // but have a ControllerRef pointing to this controller. + services, err := jc.ServiceLister.Services(job.GetNamespace()).List(labels.Everything()) + if err != nil { + return nil, err + } + + // If any adoptions are attempted, we should first recheck for deletion + // with an uncached quorum read sometime after listing services (see #42639). + canAdoptFunc := RecheckDeletionTimestamp(func() (metav1.Object, error) { + fresh, err := jc.Controller.GetJobFromInformerCache(job.GetNamespace(), job.GetName()) + if err != nil { + return nil, err + } + if fresh.GetUID() != job.GetUID() { + return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.GetUID(), job.GetUID()) + } + return fresh, nil + }) + cm := control.NewServiceControllerRefManager(jc.ServiceControl, job, selector, jc.Controller.GetAPIGroupVersionKind(), canAdoptFunc) + return cm.ClaimServices(services) +} diff --git a/pkg/controller.v2/jobcontroller/jobcontroller_util.go b/pkg/controller.v2/jobcontroller/util.go similarity index 84% rename from pkg/controller.v2/jobcontroller/jobcontroller_util.go rename to pkg/controller.v2/jobcontroller/util.go index ccebdd398a..511660d07c 100644 --- a/pkg/controller.v2/jobcontroller/jobcontroller_util.go +++ b/pkg/controller.v2/jobcontroller/util.go @@ -42,3 +42,11 @@ func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() er return nil } } + +func GenExpectationPodsKey(jobKey, replicaType string) string { + return jobKey + "/" + strings.ToLower(replicaType) + "/pods" +} + +func GenExpectationServicesKey(jobKey, replicaType string) string { + return jobKey + "/" + strings.ToLower(replicaType) + "/services" +} diff --git a/pkg/controller.v2/jobcontroller/jobcontroller_util_test.go b/pkg/controller.v2/jobcontroller/util_test.go similarity index 100% rename from pkg/controller.v2/jobcontroller/jobcontroller_util_test.go rename to pkg/controller.v2/jobcontroller/util_test.go diff --git a/pkg/controller.v2/tfcontroller/informer.go b/pkg/controller.v2/tfcontroller/informer.go index 3f9675d59c..41a5fd2af0 100644 --- a/pkg/controller.v2/tfcontroller/informer.go +++ b/pkg/controller.v2/tfcontroller/informer.go @@ -33,7 +33,7 @@ var ( func NewUnstructuredTFJobInformer(restConfig *restclientset.Config) tfjobinformersv1alpha2.TFJobInformer { dynClientPool := dynamic.NewDynamicClientPool(restConfig) - dclient, err := dynClientPool.ClientForGroupVersionKind(controllerKind) + dclient, err := dynClientPool.ClientForGroupVersionKind(tfv1alpha2.SchemeGroupVersionKind) if err != nil { panic(err) } diff --git a/pkg/controller.v2/tfcontroller/controller_pod.go b/pkg/controller.v2/tfcontroller/pod.go similarity index 62% rename from pkg/controller.v2/tfcontroller/controller_pod.go rename to pkg/controller.v2/tfcontroller/pod.go index 0ff720b170..d296d12bc0 100644 --- a/pkg/controller.v2/tfcontroller/controller_pod.go +++ b/pkg/controller.v2/tfcontroller/pod.go @@ -17,7 +17,6 @@ package tfcontroller import ( "fmt" - "reflect" "strconv" "strings" @@ -27,8 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/controller" tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2" "github.com/kubeflow/tf-operator/pkg/controller.v2/jobcontroller" @@ -134,7 +131,7 @@ func (tc *TFJobController) createNewPod(tfjob *tfv1alpha2.TFJob, rt, index strin utilruntime.HandleError(fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfjob, err)) return err } - expectationPodsKey := genExpectationPodsKey(tfjobKey, rt) + expectationPodsKey := jobcontroller.GenExpectationPodsKey(tfjobKey, rt) err = tc.Expectations.ExpectCreations(expectationPodsKey, 1) if err != nil { return err @@ -243,143 +240,3 @@ func filterPodsForTFReplicaType(pods []*v1.Pod, tfReplicaType string) ([]*v1.Pod } return result, nil } - -func genExpectationPodsKey(tfjobKey, replicaType string) string { - return tfjobKey + "/" + strings.ToLower(replicaType) + "/pods" -} - -// When a pod is created, enqueue the tfjob that manages it and update its expectations. -func (tc *TFJobController) addPod(obj interface{}) { - pod := obj.(*v1.Pod) - if pod.DeletionTimestamp != nil { - // on a restart of the controller controller, it's possible a new pod shows up in a state that - // is already pending deletion. Prevent the pod from being a creation observation. - // tc.deletePod(pod) - return - } - - // If it has a ControllerRef, that's all that matters. - if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { - tfjob := tc.resolveControllerRef(pod.Namespace, controllerRef) - - logger := tflogger.LoggerForPod(pod, tfv1alpha2.Kind) - - if tfjob == nil { - logger.Info("This pod's tfjob does not exists") - return - } - - tfjobKey, err := KeyFunc(tfjob) - if err != nil { - logger.Infof("Failed to get the key of the tfjob: %v", err) - return - } - - if _, ok := pod.Labels[tfReplicaTypeLabel]; !ok { - logger.Info("This pod maybe not created by tf-operator") - return - } - - rtype := pod.Labels[tfReplicaTypeLabel] - expectationPodsKey := genExpectationPodsKey(tfjobKey, rtype) - - tc.Expectations.CreationObserved(expectationPodsKey) - tc.enqueueTFJob(tfjob) - - return - } - - // Otherwise, it's an orphan. Get a list of all matching controllers and sync - // them to see if anyone wants to adopt it. - // DO NOT observe creation because no controller should be waiting for an - // orphan. - // for _, tfjob := range tc.getPodJobs(pod) { - // tc.enqueueTFJob(tfjob) - // } -} - -// When a pod is updated, figure out what tfjob/s manage it and wake them up. -// If the labels of the pod have changed we need to awaken both the old -// and new replica set. old and cur must be *v1.Pod types. -func (tc *TFJobController) updatePod(old, cur interface{}) { - curPod := cur.(*v1.Pod) - oldPod := old.(*v1.Pod) - if curPod.ResourceVersion == oldPod.ResourceVersion { - // Periodic resync will send update events for all known pods. - // Two different versions of the same pod will always have different RVs. - return - } - - logger := tflogger.LoggerForPod(curPod, tfv1alpha2.Kind) - curControllerRef := metav1.GetControllerOf(curPod) - oldControllerRef := metav1.GetControllerOf(oldPod) - controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) - if controllerRefChanged && oldControllerRef != nil { - // The ControllerRef was changed. Sync the old controller, if any. - if job := tc.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { - logger.Infof("pod ControllerRef updated: %v, %v", curPod, oldPod) - tc.enqueueTFJob(job) - } - } - - // If it has a ControllerRef, that's all that matters. - if curControllerRef != nil { - job := tc.resolveControllerRef(curPod.Namespace, curControllerRef) - if job == nil { - return - } - logger.Infof("pod has a ControllerRef: %v, %v", curPod, oldPod) - tc.enqueueTFJob(job) - return - } -} - -// When a pod is deleted, enqueue the tfjob that manages the pod and update its expectations. -// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. -func (tc *TFJobController) deletePod(obj interface{}) { - pod, ok := obj.(*v1.Pod) - - logger := tflogger.LoggerForPod(pod, tfv1alpha2.Kind) - - // When a delete is dropped, the relist will notice a pod in the store not - // in the list, leading to the insertion of a tombstone object which contains - // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new job will not be woken up till the periodic resync. - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) - return - } - pod, ok = tombstone.Obj.(*v1.Pod) - if !ok { - utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) - return - } - } - - controllerRef := metav1.GetControllerOf(pod) - if controllerRef == nil { - // No controller should care about orphans being deleted. - return - } - tfJob := tc.resolveControllerRef(pod.Namespace, controllerRef) - if tfJob == nil { - return - } - tfJobKey, err := controller.KeyFunc(tfJob) - if err != nil { - return - } - - if _, ok := pod.Labels[tfReplicaTypeLabel]; !ok { - logger.Info("This pod maybe not created by tf-operator") - return - } - - rtype := pod.Labels[tfReplicaTypeLabel] - expectationPodsKey := genExpectationPodsKey(tfJobKey, rtype) - - tc.Expectations.DeletionObserved(expectationPodsKey) - tc.enqueueTFJob(tfJob) -} diff --git a/pkg/controller.v2/tfcontroller/controller_pod_test.go b/pkg/controller.v2/tfcontroller/pod_test.go similarity index 99% rename from pkg/controller.v2/tfcontroller/controller_pod_test.go rename to pkg/controller.v2/tfcontroller/pod_test.go index b2f2ad56c5..15d437eddd 100644 --- a/pkg/controller.v2/tfcontroller/controller_pod_test.go +++ b/pkg/controller.v2/tfcontroller/pod_test.go @@ -75,7 +75,7 @@ func TestAddPod(t *testing.T) { t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err) } pod := testutil.NewPod(tfJob, testutil.LabelWorker, 0, t) - ctr.addPod(pod) + ctr.AddPod(pod) syncChan <- "sync" if key != testutil.GetKey(tfJob, t) { diff --git a/pkg/controller.v2/tfcontroller/controller_service.go b/pkg/controller.v2/tfcontroller/service.go similarity index 74% rename from pkg/controller.v2/tfcontroller/controller_service.go rename to pkg/controller.v2/tfcontroller/service.go index e83ac923d5..db92038185 100644 --- a/pkg/controller.v2/tfcontroller/controller_service.go +++ b/pkg/controller.v2/tfcontroller/service.go @@ -102,7 +102,7 @@ func (tc *TFJobController) createNewService(tfjob *tfv1alpha2.TFJob, rtype tfv1a // Convert TFReplicaType to lower string. rt := strings.ToLower(string(rtype)) - expectationServicesKey := genExpectationServicesKey(tfjobKey, rt) + expectationServicesKey := jobcontroller.GenExpectationServicesKey(tfjobKey, rt) err = tc.Expectations.ExpectCreations(expectationServicesKey, 1) if err != nil { return err @@ -175,58 +175,3 @@ func filterServicesForTFReplicaType(services []*v1.Service, tfReplicaType string } return result, nil } - -func genExpectationServicesKey(tfjobKey, replicaType string) string { - return tfjobKey + "/" + strings.ToLower(replicaType) + "/services" -} - -// When a service is created, enqueue the controller that manages it and update its expectations. -func (tc *TFJobController) addService(obj interface{}) { - service := obj.(*v1.Service) - if service.DeletionTimestamp != nil { - // on a restart of the controller controller, it's possible a new service shows up in a state that - // is already pending deletion. Prevent the service from being a creation observation. - // tc.deleteService(service) - return - } - - // If it has a ControllerRef, that's all that matters. - if controllerRef := metav1.GetControllerOf(service); controllerRef != nil { - tfjob := tc.resolveControllerRef(service.Namespace, controllerRef) - if tfjob == nil { - return - } - - tfjobKey, err := KeyFunc(tfjob) - if err != nil { - return - } - - if _, ok := service.Labels[tfReplicaTypeLabel]; !ok { - log.Infof("This service maybe not created by tf-operator") - return - } - - rtype := service.Labels[tfReplicaTypeLabel] - expectationServicesKey := genExpectationServicesKey(tfjobKey, rtype) - - tc.Expectations.CreationObserved(expectationServicesKey) - tc.enqueueTFJob(tfjob) - - return - } - -} - -// When a service is updated, figure out what tfjob/s manage it and wake them up. -// If the labels of the service have changed we need to awaken both the old -// and new replica set. old and cur must be *v1.Service types. -func (tc *TFJobController) updateService(old, cur interface{}) { - // TODO(CPH): handle this gracefully. -} - -// When a service is deleted, enqueue the tfjob that manages the service and update its expectations. -// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. -func (tc *TFJobController) deleteService(obj interface{}) { - // TODO(CPH): handle this gracefully. -} diff --git a/pkg/controller.v2/tfcontroller/controller_service_test.go b/pkg/controller.v2/tfcontroller/service_test.go similarity index 99% rename from pkg/controller.v2/tfcontroller/controller_service_test.go rename to pkg/controller.v2/tfcontroller/service_test.go index 612179b808..a56b969e6c 100644 --- a/pkg/controller.v2/tfcontroller/controller_service_test.go +++ b/pkg/controller.v2/tfcontroller/service_test.go @@ -75,7 +75,7 @@ func TestAddService(t *testing.T) { t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err) } service := testutil.NewService(tfJob, testutil.LabelWorker, 0, t) - ctr.addService(service) + ctr.AddService(service) syncChan <- "sync" if key != testutil.GetKey(tfJob, t) { diff --git a/pkg/controller.v2/tfcontroller/controller_status.go b/pkg/controller.v2/tfcontroller/status.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_status.go rename to pkg/controller.v2/tfcontroller/status.go diff --git a/pkg/controller.v2/tfcontroller/controller_status_test.go b/pkg/controller.v2/tfcontroller/status_test.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_status_test.go rename to pkg/controller.v2/tfcontroller/status_test.go diff --git a/pkg/controller.v2/tfcontroller/controller_tensorflow.go b/pkg/controller.v2/tfcontroller/tensorflow.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_tensorflow.go rename to pkg/controller.v2/tfcontroller/tensorflow.go diff --git a/pkg/controller.v2/tfcontroller/tfcontroller.go b/pkg/controller.v2/tfcontroller/tfcontroller.go index c96faf54f8..26f162ba8f 100644 --- a/pkg/controller.v2/tfcontroller/tfcontroller.go +++ b/pkg/controller.v2/tfcontroller/tfcontroller.go @@ -27,11 +27,7 @@ import ( kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - "k8s.io/kubernetes/pkg/controller" "github.com/kubeflow/tf-operator/cmd/tf-operator.v2/app/options" tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2" @@ -40,7 +36,6 @@ import ( tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions" tfjobinformersv1alpha2 "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions/kubeflow/v1alpha2" tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/kubeflow/v1alpha2" - "github.com/kubeflow/tf-operator/pkg/control" "github.com/kubeflow/tf-operator/pkg/controller.v2/jobcontroller" tflogger "github.com/kubeflow/tf-operator/pkg/logger" "k8s.io/apimachinery/pkg/runtime/schema" @@ -57,9 +52,6 @@ const ( ) var ( - // controllerKind is GroupVersionKind for this controller type. - controllerKind = tfv1alpha2.SchemeGroupVersionKind - // KeyFunc is the short name to DeletionHandlingMetaNamespaceKeyFunc. // IndexerInformer uses a delta queue, therefore for deletes we have to use this // key function but it should be just fine for non delete events. @@ -114,45 +106,22 @@ func NewTFJobController( tfjobscheme.AddToScheme(scheme.Scheme) - log.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(log.Infof) - eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")}) - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) - - realPodControl := control.RealPodControl{ - KubeClient: kubeClientSet, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}), - } - - realServiceControl := control.RealServiceControl{ - KubeClient: kubeClientSet, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}), - } - + log.Info("Creating TFJob controller") // Create new TFJobController. tc := &TFJobController{ - JobController: jobcontroller.JobController{ - Config: jobcontroller.JobControllerConfiguration{ - ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 15 * time.Second}, - EnableGangScheduling: option.EnableGangScheduling, - }, - PodControl: realPodControl, - ServiceControl: realServiceControl, - KubeClientSet: kubeClientSet, - Expectations: controller.NewControllerExpectations(), - WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), tfv1alpha2.Plural), - Recorder: recorder, - }, tfJobClientSet: tfJobClientSet, } - tc.Controller = tc + + // Create base controller + log.Info("Creating Job controller") + jc := jobcontroller.NewJobController(tc, metav1.Duration{Duration: 15 * time.Second}, + option.EnableGangScheduling, kubeClientSet, kubeInformerFactory, tfv1alpha2.Plural) + tc.JobController = jc // Set sync handler. tc.syncHandler = tc.syncTFJob tc.updateStatusHandler = tc.updateTFJobStatus // set delete handler. tc.deleteTFJobHandler = tc.deleteTFJob - // Set up an event handler for when tfjob resources change. tfJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: tc.addTFJob, @@ -171,9 +140,9 @@ func NewTFJobController( // Set up an event handler for when pod resources change podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: tc.addPod, - UpdateFunc: tc.updatePod, - DeleteFunc: tc.deletePod, + AddFunc: jc.AddPod, + UpdateFunc: jc.UpdatePod, + DeleteFunc: jc.DeletePod, }) tc.PodLister = podInformer.Lister() @@ -184,9 +153,9 @@ func NewTFJobController( // Set up an event handler for when service resources change. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: tc.addService, - UpdateFunc: tc.updateService, - DeleteFunc: tc.deleteService, + AddFunc: jc.AddService, + UpdateFunc: jc.UpdateService, + DeleteFunc: jc.DeleteService, }) tc.ServiceLister = serviceInformer.Lister() @@ -204,7 +173,7 @@ func (tc *TFJobController) Run(threadiness int, stopCh <-chan struct{}) error { defer tc.WorkQueue.ShutDown() // Start the informer factories to begin populating the informer caches. - log.Info("Starting TFJob controller: version 6") + log.Info("Starting TFJob controller") // Wait for the caches to be synced before starting workers. log.Info("Waiting for informer caches to sync") @@ -442,49 +411,23 @@ func (tc *TFJobController) satisfiedExpectations(tfjob *tfv1alpha2.TFJob) bool { for rtype := range tfjob.Spec.TFReplicaSpecs { // Check the expectations of the pods. - expectationPodsKey := genExpectationPodsKey(tfjobKey, string(rtype)) + expectationPodsKey := jobcontroller.GenExpectationPodsKey(tfjobKey, string(rtype)) satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationPodsKey) // Check the expectations of the services. - expectationServicesKey := genExpectationServicesKey(tfjobKey, string(rtype)) + expectationServicesKey := jobcontroller.GenExpectationServicesKey(tfjobKey, string(rtype)) satisfied = satisfied || tc.Expectations.SatisfiedExpectations(expectationServicesKey) } return satisfied } -// resolveControllerRef returns the tfjob referenced by a ControllerRef, -// or nil if the ControllerRef could not be resolved to a matching tfjob -// of the correct Kind. -func (tc *TFJobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *tfv1alpha2.TFJob { - // We can't look up by UID, so look up by Name and then verify UID. - // Don't even try to look up by Name if it's the wrong Kind. - if controllerRef.Kind != controllerKind.Kind { - return nil - } - tfjob, err := tc.getTFJobFromName(namespace, controllerRef.Name) - if err != nil { - return nil - } - if tfjob.UID != controllerRef.UID { - // The controller we found with this Name is not the same one that the - // ControllerRef points to. - return nil - } - return tfjob +func (tc *TFJobController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { + return tc.getTFJobFromName(namespace, name) } -func (tc *TFJobController) AdoptFunc(job metav1.Object) func() (metav1.Object, error) { - return func() (metav1.Object, error) { - fresh, err := tc.tfJobClientSet.KubeflowV1alpha2().TFJobs(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{}) - if err != nil { - return nil, err - } - if fresh.UID != job.GetUID() { - return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", job.GetNamespace(), job.GetName(), fresh.UID, job.GetUID()) - } - return fresh, nil - } +func (tc *TFJobController) GetJobFromAPIClient(namespace, name string) (metav1.Object, error) { + return tc.tfJobClientSet.KubeflowV1alpha2().TFJobs(namespace).Get(name, metav1.GetOptions{}) } func (tc *TFJobController) GetAPIGroupVersionKind() schema.GroupVersionKind { @@ -495,14 +438,26 @@ func (tc *TFJobController) GetAPIGroupVersion() schema.GroupVersion { return tfv1alpha2.SchemeGroupVersion } -func (tc *TFJobController) GetGroupNameLabel() string { +func (tc *TFJobController) GetGroupNameLabelKey() string { return labelGroupName } -func (tc *TFJobController) GetJobNameLabel() string { +func (tc *TFJobController) GetJobNameLabelKey() string { return labelTFJobName } -func (tc *TFJobController) GetJobGroupName() string { +func (tc *TFJobController) GetGroupNameLabelValue() string { return tfv1alpha2.GroupName } + +func (tc *TFJobController) GetReplicaTypeLabelKey() string { + return tfReplicaTypeLabel +} + +func (tc *TFJobController) GetReplicaIndexLabelKey() string { + return tfReplicaIndexLabel +} + +func (tc *TFJobController) ControllerName() string { + return controllerName +} diff --git a/pkg/controller.v2/tfcontroller/controller_tfjob.go b/pkg/controller.v2/tfcontroller/tfjob.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_tfjob.go rename to pkg/controller.v2/tfcontroller/tfjob.go diff --git a/pkg/controller.v2/tfcontroller/controller_tfjob_test.go b/pkg/controller.v2/tfcontroller/tfjob_test.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_tfjob_test.go rename to pkg/controller.v2/tfcontroller/tfjob_test.go diff --git a/pkg/controller.v2/tfcontroller/controller_util.go b/pkg/controller.v2/tfcontroller/util.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_util.go rename to pkg/controller.v2/tfcontroller/util.go diff --git a/pkg/controller.v2/tfcontroller/controller_util_test.go b/pkg/controller.v2/tfcontroller/util_test.go similarity index 100% rename from pkg/controller.v2/tfcontroller/controller_util_test.go rename to pkg/controller.v2/tfcontroller/util_test.go