From e9e10f334c8c2e08beb5991430899af736ef685a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Wed, 24 Apr 2019 14:57:39 +0800 Subject: [PATCH 1/7] reconcile service --- job_controller/job_controller.go | 6 ++ job_controller/service.go | 115 ++++++++++++++++++++++++++ job_controller/status.go | 2 + job_controller/test_job_controller.go | 8 ++ 4 files changed, 131 insertions(+) diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index aa6d1e6d..9a24ccaf 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -89,6 +89,12 @@ type ControllerInterface interface { // SetClusterSpec sets the cluster spec for the pod SetClusterSpec(job interface{}, podTemplate *v1.PodTemplate, rtype, index string) error + + // Get the default container name + GetDefaultContainerName() string + + // Get the deafult container port number + GetDefaultContainerPortName() string } // JobControllerConfiguration contains configuration of operator. diff --git a/job_controller/service.go b/job_controller/service.go index 9fa4f8f6..0192348e 100644 --- a/job_controller/service.go +++ b/job_controller/service.go @@ -16,11 +16,16 @@ package job_controller import ( "fmt" "strconv" + "strings" + commonv1 "github.com/kubeflow/common/operator/v1" + "github.com/kubeflow/common/util" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/kubernetes/pkg/controller" ) @@ -157,3 +162,113 @@ func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, } return serviceSlices } + +// reconcileServices checks and updates services for each given ReplicaSpec. +// It will requeue the job in case of an error while creating/deleting services. +func (jc *JobController) reconcileServices( + job metav1.Object, + services []*v1.Service, + rtype commonv1.ReplicaType, + spec *commonv1.ReplicaSpec) error { + + // Convert ReplicaType to lower string. + rt := strings.ToLower(string(rtype)) + + replicas := int(*spec.Replicas) + // Get all services for the type rt. + services, err := jc.FilterServicesForReplicaType(services, rt) + if err != nil { + return err + } + + serviceSlices := jc.GetServiceSlices(services, replicas, util.LoggerForReplica(job, rt)) + + for index, serviceSlice := range serviceSlices { + if len(serviceSlice) > 1 { + util.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index) + // TODO(gaocegege): Kill some services. + } else if len(serviceSlice) == 0 { + util.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index) + err = jc.createNewService(job, rtype, spec, strconv.Itoa(index)) + if err != nil { + return err + } + } + } + return nil +} + +// GetPortFromJob gets the port of tensorflow container. +func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, error) { + containers := spec.Template.Spec.Containers + for _, container := range containers { + if container.Name == jc.Controller.GetDefaultContainerName() { + ports := container.Ports + for _, port := range ports { + if port.Name == jc.Controller.GetDefaultContainerPortName(){ + return port.ContainerPort, nil + } + } + } + } + return -1, fmt.Errorf("failed to found the port") +} + +// createNewService creates a new service for the given index and type. +func (jc *JobController) createNewService(job metav1.Object, rtype commonv1.ReplicaType, + spec *commonv1.ReplicaSpec, index string) error { + tfjobKey, err := KeyFunc(job) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) + return err + } + + // Convert ReplicaType to lower string. + rt := strings.ToLower(string(rtype)) + expectationServicesKey := GenExpectationServicesKey(tfjobKey, rt) + err = jc.Expectations.ExpectCreations(expectationServicesKey, 1) + if err != nil { + return err + } + + // Append ReplicaTypeLabel and ReplicaIndexLabel labels. + labels := jc.GenLabels(job.GetName()) + labels[jc.Controller.GetReplicaTypeLabelKey()] = rt + labels[jc.Controller.GetReplicaIndexLabelKey()] = index + + port, err := jc.GetPortFromJob(spec) + if err != nil { + return err + } + + service := &v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "None", + Selector: labels, + Ports: []v1.ServicePort{ + { + Name: jc.Controller.GetDefaultContainerPortName(), + Port: port, + }, + }, + }, + } + + service.Name = GenGeneralName(job.GetName(), rt, index) + service.Labels = labels + + err = jc.Controller.CreateService(job, service) + if err != nil && errors.IsTimeout(err) { + // Service is created but its initialization has timed out. + // If the initialization is successful eventually, the + // controller will observe the creation via the informer. + // If the initialization fails, or if the service keeps + // uninitialized for a long time, the informer will not + // receive any update, and the controller will create a new + // service when the expectation expires. + return nil + } else if err != nil { + return err + } + return nil +} diff --git a/job_controller/status.go b/job_controller/status.go index 84b8e64e..dccfbfcf 100644 --- a/job_controller/status.go +++ b/job_controller/status.go @@ -24,6 +24,8 @@ const ( LabelGroupName = "group-name" LabelJobName = "job-name" LabelJobRole = "job-role" + DefaultContainerName = "job-container" + DefaultContainerPortName = "9999" ) func isSucceeded(status common.JobStatus) bool { diff --git a/job_controller/test_job_controller.go b/job_controller/test_job_controller.go index b87f9170..62ab5c8c 100644 --- a/job_controller/test_job_controller.go +++ b/job_controller/test_job_controller.go @@ -51,6 +51,14 @@ func (TestJobController) GetJobRoleKey() string { return LabelJobRole } +func (TestJobController) GetDefaultContainerName() string { + return DefaultContainerName +} + +func (TestJobController) GetDefaultContainerPortName() string { + return DefaultContainerPortName +} + func (t *TestJobController) GetJobFromInformerCache(namespace, name string) (v1.Object, error) { return t.job, nil } From f58e0df1835a595546d8548a15fd65f7c5d74bd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Wed, 24 Apr 2019 15:37:52 +0800 Subject: [PATCH 2/7] remove the duplicate code --- job_controller/job_controller.go | 3 --- job_controller/status.go | 1 - job_controller/test_job_controller.go | 4 ---- 3 files changed, 8 deletions(-) diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index e575597d..b75a0f36 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -87,9 +87,6 @@ type ControllerInterface interface { // DeletePod deletes the pod DeletePod(job interface{}, pod *v1.Pod) error - // Get the default container name - GetDefaultContainerName() string - // Get the deafult container port number GetDefaultContainerPortName() string diff --git a/job_controller/status.go b/job_controller/status.go index dccfbfcf..d896fcac 100644 --- a/job_controller/status.go +++ b/job_controller/status.go @@ -24,7 +24,6 @@ const ( LabelGroupName = "group-name" LabelJobName = "job-name" LabelJobRole = "job-role" - DefaultContainerName = "job-container" DefaultContainerPortName = "9999" ) diff --git a/job_controller/test_job_controller.go b/job_controller/test_job_controller.go index 436c6dca..e1c3bf02 100644 --- a/job_controller/test_job_controller.go +++ b/job_controller/test_job_controller.go @@ -51,10 +51,6 @@ func (TestJobController) GetJobRoleKey() string { return LabelJobRole } -func (TestJobController) GetDefaultContainerName() string { - return DefaultContainerName -} - func (TestJobController) GetDefaultContainerPortName() string { return DefaultContainerPortName } From 6115438a404a336bca8f3093b80c422ef9e65b9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Wed, 24 Apr 2019 15:57:01 +0800 Subject: [PATCH 3/7] update with comments --- job_controller/job_controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index b75a0f36..67c69a5b 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -87,15 +87,15 @@ type ControllerInterface interface { // DeletePod deletes the pod DeletePod(job interface{}, pod *v1.Pod) error - // Get the deafult container port number - GetDefaultContainerPortName() string - - // SetClusterSpec sets the cluster spec for the pod + // SetClusterSpec sets the cluster spec for the pod SetClusterSpec(job interface{}, podTemplate *v1.PodTemplateSpec, rtype, index string) error // Returns the default container name in pod GetDefaultContainerName() string + // Get the deafult container port number + GetDefaultContainerPortName() string + // Returns if this replica type with index specified is a master role. // MasterRole pod will have "job-role=master" set in its label IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool From 1a3caa95c5f871c7a355665e98fe821aecbbc332 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Thu, 25 Apr 2019 13:26:12 +0800 Subject: [PATCH 4/7] update with terry's comments --- job_controller/job_controller.go | 2 +- job_controller/service.go | 16 ++++++++-------- job_controller/status.go | 2 +- job_controller/test_job_controller.go | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index 67c69a5b..578b711e 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -94,7 +94,7 @@ type ControllerInterface interface { GetDefaultContainerName() string // Get the deafult container port number - GetDefaultContainerPortName() string + GetDefaultContainerPortNumber() string // Returns if this replica type with index specified is a master role. // MasterRole pod will have "job-role=master" set in its label diff --git a/job_controller/service.go b/job_controller/service.go index 0192348e..71728f09 100644 --- a/job_controller/service.go +++ b/job_controller/service.go @@ -165,7 +165,7 @@ func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, // reconcileServices checks and updates services for each given ReplicaSpec. // It will requeue the job in case of an error while creating/deleting services. -func (jc *JobController) reconcileServices( +func (jc *JobController) ReconcileServices( job metav1.Object, services []*v1.Service, rtype commonv1.ReplicaType, @@ -189,7 +189,7 @@ func (jc *JobController) reconcileServices( // TODO(gaocegege): Kill some services. } else if len(serviceSlice) == 0 { util.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index) - err = jc.createNewService(job, rtype, spec, strconv.Itoa(index)) + err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index)) if err != nil { return err } @@ -198,14 +198,14 @@ func (jc *JobController) reconcileServices( return nil } -// GetPortFromJob gets the port of tensorflow container. +// GetPortFromJob gets the port of job container. func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, error) { containers := spec.Template.Spec.Containers for _, container := range containers { if container.Name == jc.Controller.GetDefaultContainerName() { ports := container.Ports for _, port := range ports { - if port.Name == jc.Controller.GetDefaultContainerPortName(){ + if port.Name == jc.Controller.GetDefaultContainerPortNumber(){ return port.ContainerPort, nil } } @@ -215,9 +215,9 @@ func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, erro } // createNewService creates a new service for the given index and type. -func (jc *JobController) createNewService(job metav1.Object, rtype commonv1.ReplicaType, +func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.ReplicaType, spec *commonv1.ReplicaSpec, index string) error { - tfjobKey, err := KeyFunc(job) + jobKey, err := KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) return err @@ -225,7 +225,7 @@ func (jc *JobController) createNewService(job metav1.Object, rtype commonv1.Repl // Convert ReplicaType to lower string. rt := strings.ToLower(string(rtype)) - expectationServicesKey := GenExpectationServicesKey(tfjobKey, rt) + expectationServicesKey := GenExpectationServicesKey(jobKey, rt) err = jc.Expectations.ExpectCreations(expectationServicesKey, 1) if err != nil { return err @@ -247,7 +247,7 @@ func (jc *JobController) createNewService(job metav1.Object, rtype commonv1.Repl Selector: labels, Ports: []v1.ServicePort{ { - Name: jc.Controller.GetDefaultContainerPortName(), + Name: jc.Controller.GetDefaultContainerPortNumber(), Port: port, }, }, diff --git a/job_controller/status.go b/job_controller/status.go index d896fcac..f122e698 100644 --- a/job_controller/status.go +++ b/job_controller/status.go @@ -24,7 +24,7 @@ const ( LabelGroupName = "group-name" LabelJobName = "job-name" LabelJobRole = "job-role" - DefaultContainerPortName = "9999" + DefaultContainerPortNumber = "9999" ) func isSucceeded(status common.JobStatus) bool { diff --git a/job_controller/test_job_controller.go b/job_controller/test_job_controller.go index e1c3bf02..671276af 100644 --- a/job_controller/test_job_controller.go +++ b/job_controller/test_job_controller.go @@ -51,8 +51,8 @@ func (TestJobController) GetJobRoleKey() string { return LabelJobRole } -func (TestJobController) GetDefaultContainerPortName() string { - return DefaultContainerPortName +func (TestJobController) GetDefaultContainerPortNumber() string { + return DefaultContainerPortNumber } func (t *TestJobController) GetJobFromInformerCache(namespace, name string) (v1.Object, error) { From 15d97d55ca9d488f23b94b988b0699330d40fcc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Sat, 27 Apr 2019 09:58:43 +0800 Subject: [PATCH 5/7] update with Liu's comment --- job_controller/job_controller.go | 2 +- job_controller/service.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index 578b711e..21825388 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -93,7 +93,7 @@ type ControllerInterface interface { // Returns the default container name in pod GetDefaultContainerName() string - // Get the deafult container port number + // Get the default container port number GetDefaultContainerPortNumber() string // Returns if this replica type with index specified is a master role. diff --git a/job_controller/service.go b/job_controller/service.go index 71728f09..70cd0cc8 100644 --- a/job_controller/service.go +++ b/job_controller/service.go @@ -186,7 +186,6 @@ func (jc *JobController) ReconcileServices( for index, serviceSlice := range serviceSlices { if len(serviceSlice) > 1 { util.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index) - // TODO(gaocegege): Kill some services. } else if len(serviceSlice) == 0 { util.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index) err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index)) @@ -211,7 +210,7 @@ func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, erro } } } - return -1, fmt.Errorf("failed to found the port") + return -1, fmt.Errorf("failed to find the port") } // createNewService creates a new service for the given index and type. From 62b6c04c74210ce45a43196352dfb8bcb6b80231 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Mon, 6 May 2019 13:29:44 -0700 Subject: [PATCH 6/7] update with jian's comments --- job_controller/job.go | 25 ++++++++++++------------- job_controller/job_controller.go | 6 ------ job_controller/service.go | 4 ++-- job_controller/test_job_controller.go | 8 -------- 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/job_controller/job.go b/job_controller/job.go index 58e33b6e..eb8e761a 100644 --- a/job_controller/job.go +++ b/job_controller/job.go @@ -100,13 +100,12 @@ func (jc *JobController) ReconcileJobs( return err } - // TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place - // services, err := jc.GetServicesForJob(metaObject) - // - // if err != nil { - // log.Warnf("GetServicesForJob error %v", err) - // return err - // } + services, err := jc.GetServicesForJob(metaObject) + + if err != nil { + log.Warnf("GetServicesForJob error %v", err) + return err + } // retrieve the previous number of retry previousRetry := jc.WorkQueue.NumRequeues(jobKey) @@ -204,12 +203,12 @@ func (jc *JobController) ReconcileJobs( } // TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place - // err = jc.ReconcileServices(metaObject, services, rtype, spec) - // - // if err != nil { - // log.Warnf("ReconcileServices error %v", err) - // return err - // } + err = jc.ReconcileServices(metaObject, services, rtype, spec) + + if err != nil { + log.Warnf("ReconcileServices error %v", err) + return err + } } // No need to update the job status if the status hasn't changed since last time. diff --git a/job_controller/job_controller.go b/job_controller/job_controller.go index ec80623b..f63eeee3 100644 --- a/job_controller/job_controller.go +++ b/job_controller/job_controller.go @@ -49,12 +49,6 @@ type ControllerInterface interface { // Returns the Group Name(value) in the labels of the job GetGroupNameLabelValue() string - // Returns the Replica Type(key) in the labels of the job - GetReplicaTypeLabelKey() string - - // Returns the Replica Index(value) in the labels of the job - GetReplicaIndexLabelKey() string - // Returns the Job from Informer Cache GetJobFromInformerCache(namespace, name string) (metav1.Object, error) diff --git a/job_controller/service.go b/job_controller/service.go index b2fab240..01d009f8 100644 --- a/job_controller/service.go +++ b/job_controller/service.go @@ -232,8 +232,8 @@ func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.Repl // Append ReplicaTypeLabel and ReplicaIndexLabel labels. labels := jc.GenLabels(job.GetName()) - labels[jc.Controller.GetReplicaTypeLabelKey()] = rt - labels[jc.Controller.GetReplicaIndexLabelKey()] = index + labels[commonutil.ReplicaTypeLabel] = rt + labels[commonutil.ReplicaIndexLabel] = index port, err := jc.GetPortFromJob(spec) if err != nil { diff --git a/job_controller/test_job_controller.go b/job_controller/test_job_controller.go index e1d0313f..6c3af21f 100644 --- a/job_controller/test_job_controller.go +++ b/job_controller/test_job_controller.go @@ -32,14 +32,6 @@ func (TestJobController) GetGroupNameLabelValue() string { return testv1.GroupName } -func (TestJobController) GetReplicaTypeLabelKey() string { - return util.ReplicaTypeLabel -} - -func (TestJobController) GetReplicaIndexLabelKey() string { - return util.ReplicaIndexLabel -} - func (TestJobController) GetJobRoleKey() string { return util.LabelJobRole } From 5cc88a3a28f8acde87c0721c06790848d2b0765e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=81=E6=87=BF?= Date: Mon, 6 May 2019 14:12:29 -0700 Subject: [PATCH 7/7] remove the todo comment --- job_controller/job.go | 1 - 1 file changed, 1 deletion(-) diff --git a/job_controller/job.go b/job_controller/job.go index eb8e761a..a0f4ef3b 100644 --- a/job_controller/job.go +++ b/job_controller/job.go @@ -202,7 +202,6 @@ func (jc *JobController) ReconcileJobs( return err } - // TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place err = jc.ReconcileServices(metaObject, services, rtype, spec) if err != nil {