Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Add skeleton code for reconcile service #25

Merged
merged 10 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions job_controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type ControllerInterface interface {
// Returns the Group Name(value) in the labels of the job
GetGroupNameLabelValue() string

// Returns the Replica Type(key) in the labels of the job
GetReplicaTypeLabelKey() string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of merge conflict? #29 removes these labels

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. @merlintang Could you rebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is fixed. thanks for your suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These interfaces are supposed to be removed and use this instead


// Returns the Replica Index(value) in the labels of the job
GetReplicaIndexLabelKey() string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two also need to be fixed.

// TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place

// TODO(terrytangyuan): Uncomment this once service reconciliation logic is in place

// Returns the Job from Informer Cache
GetJobFromInformerCache(namespace, name string) (metav1.Object, error)

Expand Down Expand Up @@ -81,6 +87,9 @@ type ControllerInterface interface {
// Returns the default container name in pod
GetDefaultContainerName() string

// Get the default container port number
GetDefaultContainerPortNumber() string

// Returns if this replica type with index specified is a master role.
// MasterRole pod will have "job-role=master" set in its label
IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
Expand Down
114 changes: 114 additions & 0 deletions job_controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ package job_controller
import (
"fmt"
"strconv"
"strings"

commonv1 "github.com/kubeflow/common/operator/v1"
"github.com/kubeflow/common/util"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/controller"
)

Expand Down Expand Up @@ -157,3 +162,112 @@ func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int,
}
return serviceSlices
}

// reconcileServices checks and updates services for each given ReplicaSpec.
// It will requeue the job in case of an error while creating/deleting services.
func (jc *JobController) ReconcileServices(
job metav1.Object,
services []*v1.Service,
rtype commonv1.ReplicaType,
spec *commonv1.ReplicaSpec) error {

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))

replicas := int(*spec.Replicas)
// Get all services for the type rt.
services, err := jc.FilterServicesForReplicaType(services, rt)
if err != nil {
return err
}

serviceSlices := jc.GetServiceSlices(services, replicas, util.LoggerForReplica(job, rt))

for index, serviceSlice := range serviceSlices {
if len(serviceSlice) > 1 {
util.LoggerForReplica(job, rt).Warningf("We have too many services for %s %d", rt, index)
} else if len(serviceSlice) == 0 {
util.LoggerForReplica(job, rt).Infof("need to create new service: %s-%d", rt, index)
err = jc.CreateNewService(job, rtype, spec, strconv.Itoa(index))
if err != nil {
return err
}
}
}
return nil
}

// GetPortFromJob gets the port of job container.
func (jc *JobController) GetPortFromJob(spec *commonv1.ReplicaSpec) (int32, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetPortFromReplicaSpec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the input is the replicaSpec, then the underline logic is the replicaSpec related container. thus, I prefer the container name for further understand. if you think replicaSpec is better for understanding, I can change it.

containers := spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == jc.Controller.GetDefaultContainerName() {
ports := container.Ports
for _, port := range ports {
if port.Name == jc.Controller.GetDefaultContainerPortNumber(){
return port.ContainerPort, nil
}
}
}
}
return -1, fmt.Errorf("failed to find the port")
}

// createNewService creates a new service for the given index and type.
func (jc *JobController) CreateNewService(job metav1.Object, rtype commonv1.ReplicaType,
spec *commonv1.ReplicaSpec, index string) error {
jobKey, err := KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}

// Convert ReplicaType to lower string.
rt := strings.ToLower(string(rtype))
expectationServicesKey := GenExpectationServicesKey(jobKey, rt)
err = jc.Expectations.ExpectCreations(expectationServicesKey, 1)
if err != nil {
return err
}

// Append ReplicaTypeLabel and ReplicaIndexLabel labels.
labels := jc.GenLabels(job.GetName())
labels[jc.Controller.GetReplicaTypeLabelKey()] = rt
labels[jc.Controller.GetReplicaIndexLabelKey()] = index

port, err := jc.GetPortFromJob(spec)
if err != nil {
return err
}

service := &v1.Service{
Spec: v1.ServiceSpec{
ClusterIP: "None",
Selector: labels,
Ports: []v1.ServicePort{
{
Name: jc.Controller.GetDefaultContainerPortNumber(),
Port: port,
},
},
},
}

service.Name = GenGeneralName(job.GetName(), rt, index)
service.Labels = labels

err = jc.Controller.CreateService(job, service)
if err != nil && errors.IsTimeout(err) {
// Service is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the service keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// service when the expectation expires.
return nil
} else if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions job_controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
LabelGroupName = "group-name"
LabelJobName = "job-name"
LabelJobRole = "job-role"
DefaultContainerPortNumber = "9999"
)

func isSucceeded(status common.JobStatus) bool {
Expand Down
16 changes: 16 additions & 0 deletions job_controller/test_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func (TestJobController) GetGroupNameLabelValue() string {
return testv1.GroupName
}

func (TestJobController) GetReplicaTypeLabelKey() string {
return ReplicaTypeLabel
}

func (TestJobController) GetReplicaIndexLabelKey() string {
return ReplicaIndexLabel
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same remove these two

func (TestJobController) GetJobRoleKey() string {
return LabelJobRole
}

func (TestJobController) GetDefaultContainerPortNumber() string {
return DefaultContainerPortNumber
}

func (t *TestJobController) GetJobFromInformerCache(namespace, name string) (v1.Object, error) {
return t.job, nil
}
Expand Down