Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

feat(common) add reconciler.v1 #141

Merged
merged 1 commit into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 6 additions & 123 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package common
import (
"fmt"
"reflect"
"sort"
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
"github.com/kubeflow/common/pkg/util/k8sutil"

Expand Down Expand Up @@ -49,49 +49,7 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i

// recordAbnormalPods records the active pod whose latest condition is not in True status.
func (jc *JobController) recordAbnormalPods(activePods []*v1.Pod, object runtime.Object) {
for _, pod := range activePods {
// If the pod starts running, should checks the container statuses rather than the conditions.
recordContainerStatus := func(status *v1.ContainerStatus) {
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
terminated := status.State.Terminated
jc.Recorder.Eventf(object, v1.EventTypeWarning, terminated.Reason,
"Error pod %s container %s exitCode: %d terminated message: %s",
pod.Name, status.Name, terminated.ExitCode, terminated.Message)
}
// The terminated state and waiting state don't simultaneously exists, checks them at the same time.
if status.State.Waiting != nil && status.State.Waiting.Message != "" {
wait := status.State.Waiting
jc.Recorder.Eventf(object, v1.EventTypeWarning, wait.Reason,
"Error pod %s container %s waiting message: %s", pod.Name, status.Name, wait.Message)
}
}
if len(pod.Status.ContainerStatuses) != 0 {
for _, status := range pod.Status.ContainerStatuses {
recordContainerStatus(&status)
}
// If the pod has container status info, that means the init container statuses are normal.
continue
}
if len(pod.Status.InitContainerStatuses) != 0 {
for _, status := range pod.Status.InitContainerStatuses {
recordContainerStatus(&status)
}
continue
}
if len(pod.Status.Conditions) == 0 {
continue
}
// Should not modify the original pod which is stored in the informer cache.
status := pod.Status.DeepCopy()
sort.Slice(status.Conditions, func(i, j int) bool {
return status.Conditions[i].LastTransitionTime.After(status.Conditions[j].LastTransitionTime.Time)
})
condition := status.Conditions[0]
if condition.Status == v1.ConditionTrue {
continue
}
jc.Recorder.Eventf(object, v1.EventTypeWarning, condition.Reason, "Error pod %s condition message: %s", pod.Name, condition.Message)
}
core.RecordAbnormalPods(activePods, object, jc.Recorder)
}

// ReconcileJobs checks and updates replicas for each given ReplicaSpec.
Expand Down Expand Up @@ -340,7 +298,7 @@ func (jc *JobController) ReconcileJobs(
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
for rtype := range replicas {
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
jc.Expectations.SetExpectations(expectationPodsKey, 0, 0)
Expand All @@ -351,54 +309,14 @@ func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.Rep

// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
return false
}
now := metav1.Now()
start := jobStatus.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*runPolicy.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
return core.PastActiveDeadline(runPolicy, jobStatus)
}

// PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure or Always
func (jc *JobController) PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*v1.Pod) (bool, error) {
if runPolicy.BackoffLimit == nil {
return false, nil
}
result := int32(0)
for rtype, spec := range replicas {
if spec.RestartPolicy != apiv1.RestartPolicyOnFailure && spec.RestartPolicy != apiv1.RestartPolicyAlways {
log.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, jobName)
continue
}
// Convert ReplicaType to lower string.
pods, err := jc.FilterPodsForReplicaType(pods, rtype)
if err != nil {
return false, err
}
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
}

if *runPolicy.BackoffLimit == 0 {
return result > 0, nil
}
return result >= *runPolicy.BackoffLimit, nil
return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, jc.FilterPodsForReplicaType)
}

func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, job interface{}) error {
Expand Down Expand Up @@ -435,40 +353,5 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
}

func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *v1.ResourceList {
var replicasPriority ReplicasPriority
for t, replica := range replicas {
rp := ReplicaPriority{0, *replica}
pc := replica.Template.Spec.PriorityClassName

priorityClass, err := jc.PriorityClassLister.Get(pc)
if err != nil || priorityClass == nil {
log.Warnf("Ignore task %s priority class %s: %v", t, pc, err)
} else {
rp.priority = priorityClass.Value
}

replicasPriority = append(replicasPriority, rp)
}

sort.Sort(replicasPriority)

minAvailableTasksRes := v1.ResourceList{}
podCnt := int32(0)
for _, task := range replicasPriority {
if task.Replicas == nil {
continue
}

for i := int32(0); i < *task.Replicas; i++ {
if podCnt >= minMember {
break
}
podCnt++
for _, c := range task.Template.Spec.Containers {
AddResourceList(minAvailableTasksRes, c.Resources.Requests, c.Resources.Limits)
}
}
}

return &minAvailableTasksRes
return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get)
}
62 changes: 4 additions & 58 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
utillabels "github.com/kubeflow/common/pkg/util/labels"
trainutil "github.com/kubeflow/common/pkg/util/train"
Expand Down Expand Up @@ -255,59 +256,13 @@ func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
var result []*v1.Pod

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
})

for _, pod := range pods {
set := labels.Set(pod.Labels)
if !selector.Matches(set) && !deprecatedSelector.Matches(set) {
continue
}
result = append(result, pod)
}
return result, nil
return core.FilterPodsForReplicaType(pods, replicaType)
}

// getPodSlices returns a slice, which element is the slice of pod.
// It gives enough information to caller to make decision to up/down scale resources.
func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod {
podSlices := make([][]*v1.Pod, calculatePodSliceSize(pods, replicas))
for _, pod := range pods {
index, err := utillabels.ReplicaIndex(pod.Labels)
if err != nil {
logger.Warningf("Error obtaining replica index from Pod %s/%s: %v", pod.Namespace, pod.Name, err)
continue
}
if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d, pod: %s/%s", index, pod.Namespace, pod.Name)
}

podSlices[index] = append(podSlices[index], pod)
}
return podSlices
}

// calculatePodSliceSize compare max pod index with desired replicas and return larger size
func calculatePodSliceSize(pods []*v1.Pod, replicas int) int {
size := 0
for _, pod := range pods {
index, err := utillabels.ReplicaIndex(pod.Labels)
if err != nil {
continue
}
size = MaxInt(size, index)
}

// size comes from index, need to +1 to indicate real size
return MaxInt(size+1, replicas)
return core.GetPodSlices(pods, replicas, logger)
}

// ReconcilePods checks and updates pods for each given ReplicaSpec.
Expand Down Expand Up @@ -462,7 +417,7 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
logger.Warning(errMsg)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podTemplate, spec)
core.SetRestartPolicy(podTemplate, spec)

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
Expand Down Expand Up @@ -512,15 +467,6 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
return nil
}

func setRestartPolicy(podTemplateSpec *v1.PodTemplateSpec, spec *apiv1.ReplicaSpec) {
// This is necessary since restartPolicyExitCode is not supported in v1.PodTemplateSpec
if spec.RestartPolicy == apiv1.RestartPolicyExitCode {
podTemplateSpec.Spec.RestartPolicy = v1.RestartPolicyNever
} else {
podTemplateSpec.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}
}

func isNonGangSchedulerSet(replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) bool {
for _, spec := range replicas {
if spec.Template.Spec.SchedulerName != "" && spec.Template.Spec.SchedulerName != gangSchedulerName {
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller.v1/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package common
import (
"testing"

v12 "github.com/kubeflow/common/test_job/test_util/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/core"
testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1"
v12 "github.com/kubeflow/common/test_job/test_util/v1"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestSetRestartPolicy(t *testing.T) {
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestSetRestartPolicy(t *testing.T) {
for _, c := range testCase {
spec := c.testJob.Spec.TestReplicaSpecs[c.expectedType]
podTemplate := spec.Template
setRestartPolicy(&podTemplate, spec)
core.SetRestartPolicy(&podTemplate, spec)
if podTemplate.Spec.RestartPolicy != c.expectedRestartPolicy {
t.Errorf("Expected %s, got %s", c.expectedRestartPolicy, podTemplate.Spec.RestartPolicy)
}
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestCalculatePodSliceSize(t *testing.T) {
}

for _, tc := range testCases {
result := calculatePodSliceSize(tc.pods, tc.replicas)
result := core.CalculatePodSliceSize(tc.pods, tc.replicas)
assert.Equal(t, tc.expectedSize, result)
}
}
Expand Down
69 changes: 4 additions & 65 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
utillabels "github.com/kubeflow/common/pkg/util/labels"

Expand Down Expand Up @@ -139,60 +140,14 @@ func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service

// FilterServicesForReplicaType returns service belong to a replicaType.
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
var result []*v1.Service

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
})

for _, service := range services {
set := labels.Set(service.Labels)
if !selector.Matches(set) && !deprecatedSelector.Matches(set) {
continue
}
result = append(result, service)
}
return result, nil
return core.FilterServicesForReplicaType(services, replicaType)
}

// GetServiceSlices returns a slice, which element is the slice of service.
// Assume the return object is serviceSlices, then serviceSlices[i] is an
// array of pointers to services corresponding to Services for replica i.
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service {
serviceSlices := make([][]*v1.Service, calculateServiceSliceSize(services, replicas))
for _, service := range services {
index, err := utillabels.ReplicaIndex(service.Labels)
if err != nil {
logger.Warningf("Error obtaining index for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d, service: %s/%s", index, service.Namespace, service.Name)
}

serviceSlices[index] = append(serviceSlices[index], service)
}
return serviceSlices
}

// calculateServiceSliceSize compare max pod index with desired replicas and return larger size
func calculateServiceSliceSize(services []*v1.Service, replicas int) int {
size := 0
for _, svc := range services {
index, err := utillabels.ReplicaIndex(svc.Labels)
if err != nil {
continue
}
size = MaxInt(size, index)
}

// size comes from index, need to +1 to indicate real size
return MaxInt(size+1, replicas)
return core.GetServiceSlices(services, replicas, logger)
}

// reconcileServices checks and updates services for each given ReplicaSpec.
Expand Down Expand Up @@ -245,23 +200,7 @@ func (jc *JobController) ReconcileServices(

// GetPortsFromJob gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed.
func (jc *JobController) GetPortsFromJob(spec *apiv1.ReplicaSpec) (map[string]int32, error) {
ports := make(map[string]int32)

containers := spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == jc.Controller.GetDefaultContainerName() {
containerPorts := container.Ports
if len(containerPorts) == 0 {
return nil, nil
}
for _, port := range containerPorts {
ports[port.Name] = port.ContainerPort
}
return ports, nil
}
}

return nil, fmt.Errorf("failed to find the port")
return core.GetPortsFromJob(spec, jc.Controller.GetDefaultContainerName())
}

// createNewService creates a new service for the given index and type.
Expand Down
Loading