Skip to content

Commit

Permalink
Add condition logic code
Browse files Browse the repository at this point in the history
  • Loading branch information
ScorpioCPH committed Mar 26, 2018
1 parent 78395f5 commit b965914
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 6 deletions.
6 changes: 3 additions & 3 deletions pkg/apis/tensorflow/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ type TFJobCondition struct {
type TFJobConditionType string

const (
// TFJobCreated means all sub-resources (e.g. services/pods) of this TFJob
// have been successfully created.
// But they are waiting to be scheduled and launched.
// TFJobCreated means the tfjob has been accepted by the system,
// but one or more of the pods/services has not been started.
// This includes time before pods being scheduled and launched.
TFJobCreated TFJobConditionType = "Created"

// TFJobRunning means all sub-resources (e.g. services/pods) of this TFJob
Expand Down
89 changes: 88 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ const (
noHit = "no-hit"

defaultPortStr = "2222"

// tfJobCreatedReason is added in a tfjob when it is created.
tfJobCreatedReason = "TFJobCreated"
// tfJobSucceededReason is added in a tfjob when it is succeeded.
tfJobSucceededReason = "TFJobSucceeded"
// tfJobSucceededReason is added in a tfjob when it is running.
tfJobRunningReason = "TFJobRunning"
// tfJobSucceededReason is added in a tfjob when it is failed.
tfJobFailedReason = "TFJobFailed"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
Expand Down Expand Up @@ -431,8 +440,18 @@ func genLabels(tfjobKey string) map[string]string {
// When a pod is added, set the defaults and enqueue the current tfjob.
func (tc *TFJobController) addTFJob(obj interface{}) {
tfjob := obj.(*tfv1alpha2.TFJob)
log.Infof("Adding tfjob: %s", tfjob.Name)
msg := fmt.Sprintf("TFJob %s is created.", tfjob.Name)
log.Info(msg)
scheme.Scheme.Default(tfjob)

// Leave a created condition.
newTFJob := tfjob.DeepCopy()
err := tc.updateTFJobConditions(newTFJob, tfv1alpha2.TFJobCreated, tfJobCreatedReason, msg)
if err != nil {
log.Infof("Append tfjob condition error: %v", err)
return
}

tc.enqueueTFJob(obj)
}

Expand All @@ -448,6 +467,13 @@ func (tc *TFJobController) updateTFJobStatus(tfjob *tfv1alpha2.TFJob) error {
return err
}

func (tc *TFJobController) updateTFJobConditions(tfjob *tfv1alpha2.TFJob, conditionType tfv1alpha2.TFJobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(&tfjob.Status, condition)
err := tc.updateStatusHandler(tfjob)
return err
}

// resolveControllerRef returns the tfjob referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching tfjob
// of the correct Kind.
Expand Down Expand Up @@ -482,3 +508,64 @@ func genOwnerReference(tfjob *tfv1alpha2.TFJob) *metav1.OwnerReference {

return controllerRef
}

// newCondition creates a new tfjob condition.
func newCondition(conditionType tfv1alpha2.TFJobConditionType, reason, message string) tfv1alpha2.TFJobCondition {
return tfv1alpha2.TFJobCondition{
Type: conditionType,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
}

// getCondition returns the condition with the provided type.
func getCondition(status tfv1alpha2.TFJobStatus, condType tfv1alpha2.TFJobConditionType) *tfv1alpha2.TFJobCondition {
for i := range status.Conditions {
c := status.Conditions[i]
if c.Type == condType {
return &c
}
}
return nil
}

// setCondition updates the tfjob to include the provided condition.
// If the condition that we are about to add already exists
// and has the same status and reason then we are not going to update.
func setCondition(status *tfv1alpha2.TFJobStatus, condition tfv1alpha2.TFJobCondition) {
currentCond := getCondition(*status, condition.Type)

// Do nothing if condition doesn't change
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
return
}

// Do not update lastTransitionTime if the status of the condition doesn't change.
if currentCond != nil && currentCond.Status == condition.Status {
condition.LastTransitionTime = currentCond.LastTransitionTime
}

// Append the updated condition to the
newConditions := filterOutCondition(status.Conditions, condition.Type)
status.Conditions = append(newConditions, condition)
}

// removeCondition removes the tfjob condition with the provided type.
func removementCondition(status *tfv1alpha2.TFJobStatus, condType tfv1alpha2.TFJobConditionType) {
status.Conditions = filterOutCondition(status.Conditions, condType)
}

// filterOutCondition returns a new slice of tfjob conditions without conditions with the provided type.
func filterOutCondition(conditions []tfv1alpha2.TFJobCondition, condType tfv1alpha2.TFJobConditionType) []tfv1alpha2.TFJobCondition {
var newConditions []tfv1alpha2.TFJobCondition
for _, c := range conditions {
if c.Type == condType {
continue
}
newConditions = append(newConditions, c)
}
return newConditions
}
42 changes: 42 additions & 0 deletions pkg/controller/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,48 @@ func (tc *TFJobController) reconcilePods(

// Expect to have `replicas - succeeded` pods alive.
expected := *spec.Replicas - succeeded

// All workers are succeeded, leave a succeeded condition.
if expected == 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
log.Infof("Append tfjob condition error: %v", err)
return err
}
}

// Some workers are still active (running or pending), leave a running condition.
if len(activePods) > 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
log.Infof("Append tfjob condition error: %v", err)
return err
}
}

// All workers are running, set StartTime
if len(activePods) == int(*spec.Replicas) && rtype == tfv1alpha2.TFReplicaTypeWorker {
now := metav1.Now()
tfjob.Status.StartTime = &now
err := tc.updateStatusHandler(tfjob)
if err != nil {
log.Infof("Set tfjob start time error: %v", err)
return err
}
}

// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobRunningReason, msg)
if err != nil {
log.Infof("Append tfjob condition error: %v", err)
return err
}
}

diff := len(activePods) - int(expected)

if diff < 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func getKey(tfJob *tfv1alpha2.TFJob, t *testing.T) string {
return key
}

func getCondition(tfJob *tfv1alpha2.TFJob, condition tfv1alpha2.TFJobConditionType, reason string) bool {
func checkCondition(tfJob *tfv1alpha2.TFJob, condition tfv1alpha2.TFJobConditionType, reason string) bool {
for _, v := range tfJob.Status.Conditions {
if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
return true
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestNormalPath(t *testing.T) {
// t.Errorf("%s: .status.startTime was not set", name)
// }
// Validate conditions.
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
if tc.expectedCondition != nil && !checkCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
}
}
Expand Down

0 comments on commit b965914

Please sign in to comment.