Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
feat(common) add reconciler.v1 (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
zw0610 authored Aug 19, 2021
1 parent 7d3b7d9 commit 2f3f636
Show file tree
Hide file tree
Showing 28 changed files with 2,692 additions and 266 deletions.
129 changes: 6 additions & 123 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package common
import (
"fmt"
"reflect"
"sort"
"time"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
"github.com/kubeflow/common/pkg/util/k8sutil"

Expand Down Expand Up @@ -49,49 +49,7 @@ func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job i

// recordAbnormalPods records the active pod whose latest condition is not in True status.
func (jc *JobController) recordAbnormalPods(activePods []*v1.Pod, object runtime.Object) {
for _, pod := range activePods {
// If the pod starts running, should checks the container statuses rather than the conditions.
recordContainerStatus := func(status *v1.ContainerStatus) {
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
terminated := status.State.Terminated
jc.Recorder.Eventf(object, v1.EventTypeWarning, terminated.Reason,
"Error pod %s container %s exitCode: %d terminated message: %s",
pod.Name, status.Name, terminated.ExitCode, terminated.Message)
}
// The terminated state and waiting state don't simultaneously exists, checks them at the same time.
if status.State.Waiting != nil && status.State.Waiting.Message != "" {
wait := status.State.Waiting
jc.Recorder.Eventf(object, v1.EventTypeWarning, wait.Reason,
"Error pod %s container %s waiting message: %s", pod.Name, status.Name, wait.Message)
}
}
if len(pod.Status.ContainerStatuses) != 0 {
for _, status := range pod.Status.ContainerStatuses {
recordContainerStatus(&status)
}
// If the pod has container status info, that means the init container statuses are normal.
continue
}
if len(pod.Status.InitContainerStatuses) != 0 {
for _, status := range pod.Status.InitContainerStatuses {
recordContainerStatus(&status)
}
continue
}
if len(pod.Status.Conditions) == 0 {
continue
}
// Should not modify the original pod which is stored in the informer cache.
status := pod.Status.DeepCopy()
sort.Slice(status.Conditions, func(i, j int) bool {
return status.Conditions[i].LastTransitionTime.After(status.Conditions[j].LastTransitionTime.Time)
})
condition := status.Conditions[0]
if condition.Status == v1.ConditionTrue {
continue
}
jc.Recorder.Eventf(object, v1.EventTypeWarning, condition.Reason, "Error pod %s condition message: %s", pod.Name, condition.Message)
}
core.RecordAbnormalPods(activePods, object, jc.Recorder)
}

// ReconcileJobs checks and updates replicas for each given ReplicaSpec.
Expand Down Expand Up @@ -340,7 +298,7 @@ func (jc *JobController) ReconcileJobs(
}

// ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) {
for rtype := range replicas {
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, rtype)
jc.Expectations.SetExpectations(expectationPodsKey, 0, 0)
Expand All @@ -351,54 +309,14 @@ func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.Rep

// PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool {
if runPolicy.ActiveDeadlineSeconds == nil || jobStatus.StartTime == nil {
return false
}
now := metav1.Now()
start := jobStatus.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*runPolicy.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
return core.PastActiveDeadline(runPolicy, jobStatus)
}

// PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure or Always
func (jc *JobController) PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*v1.Pod) (bool, error) {
if runPolicy.BackoffLimit == nil {
return false, nil
}
result := int32(0)
for rtype, spec := range replicas {
if spec.RestartPolicy != apiv1.RestartPolicyOnFailure && spec.RestartPolicy != apiv1.RestartPolicyAlways {
log.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, jobName)
continue
}
// Convert ReplicaType to lower string.
pods, err := jc.FilterPodsForReplicaType(pods, rtype)
if err != nil {
return false, err
}
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
}

if *runPolicy.BackoffLimit == 0 {
return result > 0, nil
}
return result >= *runPolicy.BackoffLimit, nil
return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, jc.FilterPodsForReplicaType)
}

func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, job interface{}) error {
Expand Down Expand Up @@ -435,40 +353,5 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
}

func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *v1.ResourceList {
var replicasPriority ReplicasPriority
for t, replica := range replicas {
rp := ReplicaPriority{0, *replica}
pc := replica.Template.Spec.PriorityClassName

priorityClass, err := jc.PriorityClassLister.Get(pc)
if err != nil || priorityClass == nil {
log.Warnf("Ignore task %s priority class %s: %v", t, pc, err)
} else {
rp.priority = priorityClass.Value
}

replicasPriority = append(replicasPriority, rp)
}

sort.Sort(replicasPriority)

minAvailableTasksRes := v1.ResourceList{}
podCnt := int32(0)
for _, task := range replicasPriority {
if task.Replicas == nil {
continue
}

for i := int32(0); i < *task.Replicas; i++ {
if podCnt >= minMember {
break
}
podCnt++
for _, c := range task.Template.Spec.Containers {
AddResourceList(minAvailableTasksRes, c.Resources.Requests, c.Resources.Limits)
}
}
}

return &minAvailableTasksRes
return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get)
}
62 changes: 4 additions & 58 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
utillabels "github.com/kubeflow/common/pkg/util/labels"
trainutil "github.com/kubeflow/common/pkg/util/train"
Expand Down Expand Up @@ -255,59 +256,13 @@ func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)

// FilterPodsForReplicaType returns pods belong to a replicaType.
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType apiv1.ReplicaType) ([]*v1.Pod, error) {
var result []*v1.Pod

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
})

for _, pod := range pods {
set := labels.Set(pod.Labels)
if !selector.Matches(set) && !deprecatedSelector.Matches(set) {
continue
}
result = append(result, pod)
}
return result, nil
return core.FilterPodsForReplicaType(pods, replicaType)
}

// getPodSlices returns a slice, which element is the slice of pod.
// It gives enough information to caller to make decision to up/down scale resources.
func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod {
podSlices := make([][]*v1.Pod, calculatePodSliceSize(pods, replicas))
for _, pod := range pods {
index, err := utillabels.ReplicaIndex(pod.Labels)
if err != nil {
logger.Warningf("Error obtaining replica index from Pod %s/%s: %v", pod.Namespace, pod.Name, err)
continue
}
if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d, pod: %s/%s", index, pod.Namespace, pod.Name)
}

podSlices[index] = append(podSlices[index], pod)
}
return podSlices
}

// calculatePodSliceSize compare max pod index with desired replicas and return larger size
func calculatePodSliceSize(pods []*v1.Pod, replicas int) int {
size := 0
for _, pod := range pods {
index, err := utillabels.ReplicaIndex(pod.Labels)
if err != nil {
continue
}
size = MaxInt(size, index)
}

// size comes from index, need to +1 to indicate real size
return MaxInt(size+1, replicas)
return core.GetPodSlices(pods, replicas, logger)
}

// ReconcilePods checks and updates pods for each given ReplicaSpec.
Expand Down Expand Up @@ -462,7 +417,7 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
logger.Warning(errMsg)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg)
}
setRestartPolicy(podTemplate, spec)
core.SetRestartPolicy(podTemplate, spec)

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
Expand Down Expand Up @@ -512,15 +467,6 @@ func (jc *JobController) createNewPod(job interface{}, rt apiv1.ReplicaType, ind
return nil
}

func setRestartPolicy(podTemplateSpec *v1.PodTemplateSpec, spec *apiv1.ReplicaSpec) {
// This is necessary since restartPolicyExitCode is not supported in v1.PodTemplateSpec
if spec.RestartPolicy == apiv1.RestartPolicyExitCode {
podTemplateSpec.Spec.RestartPolicy = v1.RestartPolicyNever
} else {
podTemplateSpec.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}
}

func isNonGangSchedulerSet(replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) bool {
for _, spec := range replicas {
if spec.Template.Spec.SchedulerName != "" && spec.Template.Spec.SchedulerName != gangSchedulerName {
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller.v1/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package common
import (
"testing"

v12 "github.com/kubeflow/common/test_job/test_util/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/core"
testjobv1 "github.com/kubeflow/common/test_job/apis/test_job/v1"
v12 "github.com/kubeflow/common/test_job/test_util/v1"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestSetRestartPolicy(t *testing.T) {
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestSetRestartPolicy(t *testing.T) {
for _, c := range testCase {
spec := c.testJob.Spec.TestReplicaSpecs[c.expectedType]
podTemplate := spec.Template
setRestartPolicy(&podTemplate, spec)
core.SetRestartPolicy(&podTemplate, spec)
if podTemplate.Spec.RestartPolicy != c.expectedRestartPolicy {
t.Errorf("Expected %s, got %s", c.expectedRestartPolicy, podTemplate.Spec.RestartPolicy)
}
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestCalculatePodSliceSize(t *testing.T) {
}

for _, tc := range testCases {
result := calculatePodSliceSize(tc.pods, tc.replicas)
result := core.CalculatePodSliceSize(tc.pods, tc.replicas)
assert.Equal(t, tc.expectedSize, result)
}
}
Expand Down
69 changes: 4 additions & 65 deletions pkg/controller.v1/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/control"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
"github.com/kubeflow/common/pkg/core"
commonutil "github.com/kubeflow/common/pkg/util"
utillabels "github.com/kubeflow/common/pkg/util/labels"

Expand Down Expand Up @@ -139,60 +140,14 @@ func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service

// FilterServicesForReplicaType returns service belong to a replicaType.
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType apiv1.ReplicaType) ([]*v1.Service, error) {
var result []*v1.Service

selector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabel: string(replicaType),
})

// TODO(#149): Remove deprecated selector.
deprecatedSelector := labels.SelectorFromValidatedSet(labels.Set{
apiv1.ReplicaTypeLabelDeprecated: string(replicaType),
})

for _, service := range services {
set := labels.Set(service.Labels)
if !selector.Matches(set) && !deprecatedSelector.Matches(set) {
continue
}
result = append(result, service)
}
return result, nil
return core.FilterServicesForReplicaType(services, replicaType)
}

// GetServiceSlices returns a slice, which element is the slice of service.
// Assume the return object is serviceSlices, then serviceSlices[i] is an
// array of pointers to services corresponding to Services for replica i.
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service {
serviceSlices := make([][]*v1.Service, calculateServiceSliceSize(services, replicas))
for _, service := range services {
index, err := utillabels.ReplicaIndex(service.Labels)
if err != nil {
logger.Warningf("Error obtaining index for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d, service: %s/%s", index, service.Namespace, service.Name)
}

serviceSlices[index] = append(serviceSlices[index], service)
}
return serviceSlices
}

// calculateServiceSliceSize compare max pod index with desired replicas and return larger size
func calculateServiceSliceSize(services []*v1.Service, replicas int) int {
size := 0
for _, svc := range services {
index, err := utillabels.ReplicaIndex(svc.Labels)
if err != nil {
continue
}
size = MaxInt(size, index)
}

// size comes from index, need to +1 to indicate real size
return MaxInt(size+1, replicas)
return core.GetServiceSlices(services, replicas, logger)
}

// reconcileServices checks and updates services for each given ReplicaSpec.
Expand Down Expand Up @@ -245,23 +200,7 @@ func (jc *JobController) ReconcileServices(

// GetPortsFromJob gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed.
func (jc *JobController) GetPortsFromJob(spec *apiv1.ReplicaSpec) (map[string]int32, error) {
ports := make(map[string]int32)

containers := spec.Template.Spec.Containers
for _, container := range containers {
if container.Name == jc.Controller.GetDefaultContainerName() {
containerPorts := container.Ports
if len(containerPorts) == 0 {
return nil, nil
}
for _, port := range containerPorts {
ports[port.Name] = port.ContainerPort
}
return ports, nil
}
}

return nil, fmt.Errorf("failed to find the port")
return core.GetPortsFromJob(spec, jc.Controller.GetDefaultContainerName())
}

// createNewService creates a new service for the given index and type.
Expand Down
Loading

0 comments on commit 2f3f636

Please sign in to comment.