Skip to content

Commit

Permalink
controller: Update status in time (#476)
Browse files Browse the repository at this point in the history
* controller: Fix the status outdate problem

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* test: Add check for status update

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* test: Remove call for KeyFunc

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* pod: Add comment and remove debug statements

Signed-off-by: Ce Gao <gaoce@caicloud.io>
  • Loading branch information
gaocegege authored Mar 20, 2018
1 parent 71a5a26 commit dd6f106
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 15 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type TFJobController struct {
// To allow injection of syncTFJob for testing.
syncHandler func(tfJobKey string) (bool, error)

updateStatusHandler func(tfjob *tfv1alpha2.TFJob) error

// Listers for TFJob, Pod and Service
// tfJobLister can list/get tfjobs from the shared informer's store.
tfJobLister tfjoblisters.TFJobLister
Expand Down Expand Up @@ -191,6 +193,7 @@ func NewTFJobController(

// Set sync handler.
tc.syncHandler = tc.syncTFJob
tc.updateStatusHandler = tc.updateTFJobStatus

// Create tfjob informer.
tfJobInformer := tfJobInformerFactory.Kubeflow().V1alpha2().TFJobs()
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (tc *TFJobController) reconcilePods(
if tfConfigStr == "" {
return nil
}

// Add TF_CONFIG environment variable.
for _, c := range pTemplate.Spec.Containers {
if len(c.Env) == 0 {
Expand All @@ -114,8 +113,9 @@ func (tc *TFJobController) reconcilePods(
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
} else if err != nil {
return err
}
return err
}
} else if diff > 0 {
// TODO(CPH): Need to delete pods.
Expand All @@ -129,12 +129,13 @@ func (tc *TFJobController) reconcilePods(
tfjob.Status.TFReplicaStatuses[rtype] = &tfv1alpha2.TFReplicaStatus{}
}

tfjob.Status.TFReplicaStatuses[rtype].Active = int32(len(activePods))
// Update the active status since we have created -diff pods during the loop.
tfjob.Status.TFReplicaStatuses[rtype].Active -= int32(diff)
tfjob.Status.TFReplicaStatuses[rtype].Succeeded = succeeded
tfjob.Status.TFReplicaStatuses[rtype].Failed = failed

// TODO(CPH): Add check here, no need to update the tfjob if the status hasn't changed since last time.
if err := tc.updateTFJobStatus(tfjob); err != nil {
if err := tc.updateStatusHandler(tfjob); err != nil {
return err
}

Expand Down
106 changes: 95 additions & 11 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ func getKey(tfJob *tfv1alpha2.TFJob, t *testing.T) string {
}

func newPod(name string, tfJob *tfv1alpha2.TFJob) *v1.Pod {
tfjobKey, err := KeyFunc(tfJob)
if err != nil {
fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfJob, err)
var tfjobKey string
if len(tfJob.Namespace) > 0 {
tfjobKey = fmt.Sprintf("%s/%s", tfJob.Namespace, tfJob.Name)
} else {
tfjobKey = tfJob.Name
}

return &v1.Pod{
Expand Down Expand Up @@ -144,6 +146,15 @@ func setPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1alpha2.TFJob, pendingP
}
}

func getCondition(tfJob *tfv1alpha2.TFJob, condition tfv1alpha2.TFJobConditionType, reason string) bool {
for _, v := range tfJob.Status.Conditions {
if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
return true
}
}
return false
}

func TestNormalPath(t *testing.T) {
testCases := map[string]struct {
worker int
Expand All @@ -162,17 +173,25 @@ func TestNormalPath(t *testing.T) {
// expectations
expectedCreations int32
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
expectedFailed int32

expectedActiveWorkerPods int32
expectedSucceededWorkerPods int32
expectedFailedWorkerPods int32

expectedActivePSPods int32
expectedSucceededPSPods int32
expectedFailedPSPods int32
// TODO(gaocegege): Add condition check.
// expectedCondition *tfv1alpha2.TFJobConditionType
// expectedConditionReason string
expectedCondition *tfv1alpha2.TFJobConditionType
expectedConditionReason string
}{
"Local TFJob created": {
1, 0,
nil, true, 0, 0, 0, 0,
1, 0, 1, 0, 0,
1, 0,
1, 0, 0,
0, 0, 0,
nil, "",
},
}

Expand All @@ -196,6 +215,11 @@ func TestNormalPath(t *testing.T) {
controller.tfJobListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
controller.serviceListerSynced = alwaysReady
var actual *tfv1alpha2.TFJob
controller.updateStatusHandler = func(tfJob *tfv1alpha2.TFJob) error {
actual = tfJob
return nil
}

// Run the test logic.
tfJob := newTFJob(tc.worker, tc.ps)
Expand All @@ -217,8 +241,68 @@ func TestNormalPath(t *testing.T) {
if forget != tc.jobKeyForget {
t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget)
}
if int32(len(controller.podControl.(*FakePodControl).Templates)) != tc.expectedCreations {
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(controller.podControl.(*FakePodControl).Templates))

fakePodControl := controller.podControl.(*FakePodControl)
if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
}
if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
}
// Each create should have an accompanying ControllerRef.
if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
t.Errorf("%s: unexpected number of ControllerRefs. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.ControllerRefs))
}
// Make sure the ControllerRefs are correct.
for _, controllerRef := range fakePodControl.ControllerRefs {
if got, want := controllerRef.APIVersion, tfv1alpha2.SchemeGroupVersion.String(); got != want {
t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
}
if got, want := controllerRef.Kind, tfv1alpha2.TFJobResourceKind; got != want {
t.Errorf("controllerRef.Kind = %q, want %q", got, want)
}
if got, want := controllerRef.Name, tfJob.Name; got != want {
t.Errorf("controllerRef.Name = %q, want %q", got, want)
}
if got, want := controllerRef.UID, tfJob.UID; got != want {
t.Errorf("controllerRef.UID = %q, want %q", got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("controllerRef.Controller is not set to true")
}
}
// Validate worker status.
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker] != nil {
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Active != tc.expectedActiveWorkerPods {
t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActiveWorkerPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Active)
}
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Succeeded != tc.expectedSucceededWorkerPods {
t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceededWorkerPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Succeeded)
}
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Failed != tc.expectedFailedWorkerPods {
t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailedWorkerPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypeWorker].Failed)
}
}
// Validate PS status.
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS] != nil {
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Active != tc.expectedActivePSPods {
t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActivePSPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Active)
}
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Succeeded != tc.expectedSucceededPSPods {
t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceededPSPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Succeeded)
}
if actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Failed != tc.expectedFailedPSPods {
t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailedPSPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Failed)
}
}
// TODO(gaocegege): Set StartTime for the status.
// Validate StartTime.
// if actual.Status.StartTime == nil {
// t.Errorf("%s: .status.startTime was not set", name)
// }
// Validate conditions.
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
}
}
}

0 comments on commit dd6f106

Please sign in to comment.