Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Add skeleton code for reconcile service #25

Merged
merged 10 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions job_controller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,12 @@ func (jc *JobController) ReconcileJobs(
return err
}

// TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place
// services, err := jc.GetServicesForJob(metaObject)
//
// if err != nil {
// log.Warnf("GetServicesForJob error %v", err)
// return err
// }
services, err := jc.GetServicesForJob(metaObject)

if err != nil {
log.Warnf("GetServicesForJob error %v", err)
return err
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)
Expand Down Expand Up @@ -203,13 +202,12 @@ func (jc *JobController) ReconcileJobs(
return err
}

// TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place
// err = jc.ReconcileServices(metaObject, services, rtype, spec)
//
// if err != nil {
// log.Warnf("ReconcileServices error %v", err)
// return err
// }
err = jc.ReconcileServices(metaObject, services, rtype, spec)

if err != nil {
log.Warnf("ReconcileServices error %v", err)
return err
}
}

// No need to update the job status if the status hasn't changed since last time.
Expand Down
3 changes: 3 additions & 0 deletions job_controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type ControllerInterface interface {
// Returns the default container name in pod
GetDefaultContainerName() string

// Get the default container port number
GetDefaultContainerPortNumber() string

// Returns if this replica type with index specified is a master role.
// MasterRole pod will have "job-role=master" set in its label
IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
Expand Down
113 changes: 113 additions & 0 deletions job_controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ package job_controller
import (
"fmt"
"strconv"
"strings"

commonv1 "github.com/kubeflow/common/operator/v1"
commonutil "github.com/kubeflow/common/util"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/controller"
)

Expand Down Expand Up @@ -158,3 +162,112 @@ func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int,
}
return serviceSlices
}

// reconcileServices checks and updates services for each given ReplicaSpec.
// It will requeue the job in case of an error while creating/deleting services.
func (jc *JobController) ReconcileServices(
job metav1.Object,
services []*v1.Service,
rtype commonv1.ReplicaType,
spec *commonv1.ReplicaSpec) error {

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))

replicas := int(*spec.Replicas)
// Get all services for the type rt.
services, err := jc.FilterServicesForReplicaType(services, rt)
if err != nil {
return err
}

serviceSlices := jc.GetServiceSlices(services, replicas, commonutil.LoggerForReplica(job, rt))

for index, serviceSlice := range serviceSlices {
if len(serviceSlice) > 1 {
commonutil.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index)
} else if len(serviceSlice) == 0 {
commonutil.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index)
err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index))
if err != nil {
return err
}
}
}
return nil
}

// GetPortFromJob gets the port of job container.
func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetPortFromReplicaSpec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the input is the replicaSpec, then the underline logic is the replicaSpec related container. thus, I prefer the container name for further understand. if you think replicaSpec is better for understanding, I can change it.

containers := spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == jc.Controller.GetDefaultContainerName() {
ports := container.Ports
for _, port := range ports {
if port.Name == jc.Controller.GetDefaultContainerPortNumber(){
return port.ContainerPort, nil
}
}
}
}
return -1, fmt.Errorf("failed to find the port")
}

// createNewService creates a new service for the given index and type.
func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.ReplicaType,
spec *commonv1.ReplicaSpec, index string) error {
jobKey, err := KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))
expectationServicesKey := GenExpectationServicesKey(jobKey, rt)
err = jc.Expectations.ExpectCreations(expectationServicesKey, 1)
if err != nil {
return err
}

// Append ReplicaTypeLabel and ReplicaIndexLabel labels.
labels := jc.GenLabels(job.GetName())
labels[commonutil.ReplicaTypeLabel] = rt
labels[commonutil.ReplicaIndexLabel] = index

port, err := jc.GetPortFromJob(spec)
if err != nil {
return err
}

service := &v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Ports: []v1.ServicePort{
{
Name: jc.Controller.GetDefaultContainerPortNumber(),
Port: port,
},
},
},
}

service.Name = GenGeneralName(job.GetName(), rt, index)
service.Labels = labels

err = jc.Controller.CreateService(job, service)
if err != nil && errors.IsTimeout(err) {
// Service is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the service keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// service when the expectation expires.
return nil
} else if err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions job_controller/test_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (TestJobController) GetJobRoleKey() string {
return util.LabelJobRole
}

func (TestJobController) GetDefaultContainerPortNumber() string {
return "9999"
}

func (t *TestJobController) GetJobFromInformerCache(namespace, name string) (v1.Object, error) {
return t.job, nil
}
Expand Down