Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark trial as failed when job fails #791

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/controller.v1alpha3/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,18 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha3.Trial) error {
logger.Error(err, "Update trial status observation error")
return err
}
// Update Trial job status only if observation field is available.

jobConditionType, err := r.GetDeployedJobStatus(deployedJob)
if err != nil {
logger.Error(err, "Get deployed status error")
return err
}
// Update Trial job status only
// if job has succeded and if observation field is available.
// if job has failed
// This will ensure that trial is set to be complete only if metric is collected at least once
if isTrialObservationAvailable(instance) {
if err = r.UpdateTrialStatusCondition(instance, deployedJob); err != nil {
logger.Error(err, "Update trial status condition error")
return err
}
if isTrialComplete(instance, *jobConditionType) {
r.UpdateTrialStatusCondition(instance, *jobConditionType)
}
}
return nil
Expand Down
57 changes: 35 additions & 22 deletions pkg/controller.v1alpha3/trial/trial_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ import (
commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1"
)

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured) error {

func (r *ReconcileTrial) GetDeployedJobStatus(deployedJob *unstructured.Unstructured) (*commonv1.JobConditionType, error) {
jobConditionType := commonv1.JobRunning
kind := deployedJob.GetKind()
status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status")
now := metav1.Now()

if ok {
statusMap := status.(map[string]interface{})
Expand All @@ -44,43 +43,46 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Tri
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Error(err, "Convert unstructured to status error")
return err
return nil, err
}
if jobStatus.Active == 0 && jobStatus.Succeeded > 0 {
msg := "Trial has succeeded"
instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg)
instance.Status.CompletionTime = &now
jobConditionType = commonv1.JobSucceeded
} else if jobStatus.Failed > 0 {
msg := "Trial has failed"
instance.MarkTrialStatusFailed(TrialFailedReason, msg)
instance.Status.CompletionTime = &now
jobConditionType = commonv1.JobFailed
}
default:
jobStatus := commonv1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)

if err != nil {
log.Error(err, "Convert unstructured to status error")
return err
return nil, err
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
if lc.Type == commonv1.JobSucceeded {
msg := "Trial has succeeded"
instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg)
instance.Status.CompletionTime = &now
} else if lc.Type == commonv1.JobFailed {
msg := "Trial has failed"
instance.MarkTrialStatusFailed(TrialFailedReason, msg)
instance.Status.CompletionTime = &now
}
jobConditionType = lc.Type
}
}
} else if unerr != nil {
log.Error(unerr, "NestedFieldCopy unstructured to status error")
return unerr
return nil, unerr
}
return nil
return &jobConditionType, nil
}

func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1alpha3.Trial, jobCondition commonv1.JobConditionType) {
now := metav1.Now()
if jobCondition == commonv1.JobSucceeded {
msg := "Trial has succeeded"
instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg)
instance.Status.CompletionTime = &now
} else if jobCondition == commonv1.JobFailed {
msg := "Trial has failed"
instance.MarkTrialStatusFailed(TrialFailedReason, msg)
instance.Status.CompletionTime = &now
}
//else nothing to do
return
}

func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1alpha3.Trial, deployedJob *unstructured.Unstructured) error {
Expand Down Expand Up @@ -121,6 +123,17 @@ func isTrialObservationAvailable(instance *trialsv1alpha3.Trial) bool {
return false
}

func isTrialComplete(instance *trialsv1alpha3.Trial, jobConditionType commonv1.JobConditionType) bool {
if jobConditionType == commonv1.JobSucceeded && isTrialObservationAvailable(instance) {
return true
}
if jobConditionType == commonv1.JobFailed {
return true
}

return false
}

func getBestObjectiveMetricValue(metricLogs []*api_pb.MetricLog, objectiveType commonv1alpha3.ObjectiveType) *float64 {
metricLogSize := len(metricLogs)
if metricLogSize == 0 {
Expand Down