Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: Refactor controller_pod #548

Merged
merged 9 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 73 additions & 177 deletions pkg/controller.v2/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,227 +37,123 @@ func (tc *TFJobController) reconcilePods(
pods []*v1.Pod,
rtype tfv1alpha2.TFReplicaType,
spec *tfv1alpha2.TFReplicaSpec) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}

// Convert TFReplicaType to lower string.
rt := strings.ToLower(string(rtype))
// Get all pods for the type rt.
pods = filterPodsForTFReplicaType(pods, rt)
activePods := FilterActivePods(pods)
succeeded, failed := getPodStatus(pods)
runningPods := filterRunningPods(pods)
running := len(runningPods)
replicas := int(*spec.Replicas)

// Expect to have `replicas - succeeded` pods alive.
expected := *spec.Replicas - succeeded

// All workers are succeeded, leave a succeeded condition.
if expected == 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
now := metav1.Now()
tfjob.Status.CompletionTime = &now
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
podSlices := getPodSlices(pods, replicas, loggerForTFJob(tfjob))
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
loggerForTFJob(tfjob).Warning("We have to many pods for the worker %d", index)
// TODO(gaocegege): Kill some pods.
}
}

// Some workers are still running, leave a running condition.
if len(runningPods) > 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
if len(podSlice) == 0 {
loggerForTFJob(tfjob).Infof("need to create new pod: %s-%d", rt, index)
err := tc.createNewPod(tfjob, rt, string(index), spec)
if err != nil {
return err
}
}
// We already have one, and check if it is succeeded or something else.
}

// All workers are running, set StartTime
if len(runningPods) == int(*spec.Replicas) && rtype == tfv1alpha2.TFReplicaTypeWorker {
now := metav1.Now()
tfjob.Status.StartTime = &now
}
return tc.UpdateStatus(tfjob, rtype, replicas, running, succeeded, failed)
}

// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
func (tc *TFJobController) createNewPod(tfjob *tfv1alpha2.TFJob, rt, index string, spec *tfv1alpha2.TFReplicaSpec) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
expectationPodsKey := genExpectationPodsKey(tfjobKey, rt)
err = tc.expectations.ExpectCreations(expectationPodsKey, 1)
if err != nil {
return err
}

// TODO(gaocegege): Use syncPods to sync all replicas to ensure that all replicas only has one pod running/succeeded.
diff := len(activePods) - int(expected)

if diff < 0 {
// Need to create new pods.
diffIndexes := getDiffPodIndexes(activePods, expected, loggerForTFJob(tfjob))
if diff+len(diffIndexes) != 0 {
// This should never happened.
return fmt.Errorf("pods diff(%d) is not equal to length(%d) of diffIndexes", diff, len(diffIndexes))
}

expectationPodsKey := genExpectationPodsKey(tfjobKey, rt)
err := tc.expectations.ExpectCreations(expectationPodsKey, diff)
if err != nil {
return err
}

for _, index := range diffIndexes {
loggerForTFJob(tfjob).Infof("need to create new pod: %s-%s", rt, index)

// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)

// Append tfReplicaTypeLabel and tfReplicaIndexLabel labels.
pTemplate := spec.Template.DeepCopy()

labels := genLabels(tfjobKey)
// Set type and index for the worker.
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index
// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)

if pTemplate.Labels == nil {
pTemplate.Labels = make(map[string]string)
}
// Set type and index for the worker.
labels := genLabels(tfjobKey)
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index

for key, value := range labels {
pTemplate.Labels[key] = value
}
podTemplate := spec.Template.DeepCopy()

// Generate TF_CONFIG JSON string.
tfConfigStr := genTFConfigJSONStr(tfjob, rt, index)
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}

if tfConfigStr == "" {
return nil
}
// Add TF_CONFIG environment variable.
for i, _ := range pTemplate.Spec.Containers {
if len(pTemplate.Spec.Containers[i].Env) == 0 {
pTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
}
pTemplate.Spec.Containers[i].Env = append(pTemplate.Spec.Containers[i].Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: tfConfigStr,
})
}
for key, value := range labels {
podTemplate.Labels[key] = value
}

// Set restart policy
if spec.RestartPolicy != tfv1alpha2.RestartPolicyExitCode {
pTemplate.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}
// Generate TF_CONFIG JSON string.
tfConfigStr := genTFConfigJSONStr(tfjob, rt, index)

err := tc.podControl.CreatePodsWithControllerRef(tfjob.Namespace, pTemplate, tfjob, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
} else if err != nil {
return err
}
if tfConfigStr == "" {
return nil
}
// Add TF_CONFIG environment variable.
for i, _ := range podTemplate.Spec.Containers {
if len(podTemplate.Spec.Containers[i].Env) == 0 {
podTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
}
} else if diff > 0 {
// TODO(CPH): Need to delete pods.
loggerForTFJob(tfjob).Infof("need to delete pod but it is not implemented yet")
podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: tfConfigStr,
})
}

if tfjob.Status.TFReplicaStatuses == nil {
tfjob.Status.TFReplicaStatuses = make(map[tfv1alpha2.TFReplicaType]*tfv1alpha2.TFReplicaStatus)
// TODO(gaocegege): Deal with RestartPolicyExitCode.
// Set restart policy
if spec.RestartPolicy != tfv1alpha2.RestartPolicyExitCode {
podTemplate.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}

if _, ok := tfjob.Status.TFReplicaStatuses[rtype]; !ok {
tfjob.Status.TFReplicaStatuses[rtype] = &tfv1alpha2.TFReplicaStatus{}
err = tc.podControl.CreatePodsWithControllerRef(tfjob.Namespace, podTemplate, tfjob, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
} else if err != nil {
return err
}

// Update the active status since we have created -diff pods during the loop.
tfjob.Status.TFReplicaStatuses[rtype].Active = expected
tfjob.Status.TFReplicaStatuses[rtype].Succeeded = succeeded
tfjob.Status.TFReplicaStatuses[rtype].Failed = failed
return nil
}

func (tc *TFJobController) syncPods(pods []*v1.Pod, replicas int, logger *log.Entry) {
podSlices := getPodSlices(pods, replicas, logger)
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
logger.Warning("We have to many pods for the worker %d", index)
// Kill some
}
if len(podSlice) == 0 {
// Create one
}
// We already have one, and check if it is succeede or something else.
// pod := podSlice[0]
}
}

func getPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod {
podSlices := make([][]*v1.Pod, 0)
podSlices := make([][]*v1.Pod, replicas)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use a map for podSlices, like this:

podSlices := make(map[int64][]*v1.Pod, replicas)

for _, pod := range pods {
if _, ok := pod.Labels[tfReplicaIndexLabel]; !ok {
logger.Warning("The pod do not have the index label.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

continue
}
index, err := strconv.Atoi(pod.Labels[tfReplicaIndexLabel])
if err != nil {
logger.Warning("Error when strconv.Atoi: %v", err)
continue
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we break here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I rebased the master and can not see it anymore, could you have another review and point it out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has already been addressed.

if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d", index)
}

podSlices[index] = append(podSlices[index], pod)
}
return podSlices
}

// getDiffPodIndexes checks and gets diff indexes from desired and current.
func getDiffPodIndexes(activePods []*v1.Pod, replicas int32, logger *log.Entry) []string {
desiredIndexes := make(map[string]string)

for i := int32(0); i < replicas; i++ {
desiredIndexes[fmt.Sprintf("%d", i)] = noHit
}

for _, pod := range activePods {
if _, ok := pod.Labels[tfReplicaIndexLabel]; !ok {
continue
}

index := pod.Labels[tfReplicaIndexLabel]
indexNum, err := strconv.Atoi(index)
if err != nil {
logger.Warningf("The label index should be integer: %s", index)
} else {
// The situation should not happen.
if indexNum < 0 || indexNum >= int(replicas) {
logger.Warningf("The label index is not expected: %d", indexNum)
}
}

if _, ok := desiredIndexes[index]; ok {
desiredIndexes[index] = hit
podSlices[index] = append(podSlices[index], pod)
}
}

diffIndexes := []string{}
for index, hit := range desiredIndexes {
if hit == noHit {
diffIndexes = append(diffIndexes, index)
}
}

return diffIndexes
return podSlices
}

// getPodsForTFJob returns the set of pods that this tfjob should manage.
Expand Down
66 changes: 66 additions & 0 deletions pkg/controller.v2/controller_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package controller
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add copyright here :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that. I will add it soon.


import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
)

func (tc *TFJobController) UpdateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType, replicas, running int, succeeded, failed int32) error {
// Expect to have `replicas - succeeded` pods alive.
expected := int32(replicas) - succeeded

// All workers are succeeded, leave a succeeded condition.
if expected == 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
now := metav1.Now()
tfjob.Status.CompletionTime = &now
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// Some workers are still running, leave a running condition.
if running > 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// All workers are running, set StartTime
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if running == replicas && rtype == tfv1alpha2.TFReplicaTypeWorker {
now := metav1.Now()
tfjob.Status.StartTime = &now
}

// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

if tfjob.Status.TFReplicaStatuses == nil {
tfjob.Status.TFReplicaStatuses = make(map[tfv1alpha2.TFReplicaType]*tfv1alpha2.TFReplicaStatus)
}

if _, ok := tfjob.Status.TFReplicaStatuses[rtype]; !ok {
tfjob.Status.TFReplicaStatuses[rtype] = &tfv1alpha2.TFReplicaStatus{}
}

// Update the active status since we have created -diff pods during the loop.
tfjob.Status.TFReplicaStatuses[rtype].Active = expected
tfjob.Status.TFReplicaStatuses[rtype].Succeeded = succeeded
tfjob.Status.TFReplicaStatuses[rtype].Failed = failed
return nil
}
24 changes: 12 additions & 12 deletions pkg/controller.v2/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,18 @@ func TestNormalPath(t *testing.T) {
&tfJobRunning, tfJobRunningReason,
false,
},
// "Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending, 1 worker is succeeded": {
// 4, 2,
// nil, true,
// 2, 0, 1, 0,
// 1, 0, 0, 0,
// 3, 1,
// 1, 0, 1,
// 3, 0, 0,
// 2, 0, 0,
// &tfJobRunning, tfJobRunningReason,
// false,
// },
"Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending, 1 worker is succeeded": {
4, 2,
nil, true,
2, 0, 1, 0,
1, 0, 0, 0,
3, 1,
2, 0, 2,
3, 1, 0,
2, 0, 0,
nil, "",
false,
},
"Distributed TFJob (4 workers, 2 PS) is succeeded": {
4, 2,
nil, true,
Expand Down